State Machine Replication with Raft

In this post, we want to review the Raft algorithm which is a leader-based consensus algorithm. Raft is a simpler and more understandable alternative to Multi-Paxos, and since its relatively recent invention, it has been widely adopted in the industry. Unlike Multi-Paxos that extends an algorithm that is originally designed for consensus on a single value, Raft directly considers the problem of consensus on a sequence of values. This makes Raft much more practical and understandable than Multi-Paxos. 

Outline 

Introduction 

In an earlier post, we reviewed the consensus problem and the Paxos algorithm as a solution for it. We saw that the Paxos runs in two phases. In the first phase, the algorithm grants temporary leadership to one of the proposers. The selected proposer has a short amount of time to get acceptance for its value from a majority of the acceptors in the second phase. An instance of the Paxos algorithm decides a single value. Thus, if we want to decide on a sequence of values instead of just a single value, with the basic Paxos algorithm, we have to repeat these phases again and again. 

Multi-Paxos is an important optimization to the basic Paxos algorithm to decide on a sequence of values that eliminates the need for running two phases of Paxos for each value. Basically, the idea of the Multi-Paxos algorithm is to make the leadership longed-lived, i.e., when the algorithm picks a leader, it does not change the leader until the current leader dies. In practice, most of the systems that use Paxos, actually implement Multi-Paxos. 

Raft is an alternative to Multi-Paxos that is significantly simpler to understand while providing the same performance and availability as Multi-Paxos. Since its relatively recent invention, Raft has been widely adopted in the industry due to its simplicity and understandability.

The Raft Algorithm

This section helps you get familiar with Raft. Once got an overall understanding of the algorithm, I strongly suggest you refer to the condensed summary provided in the original paper, shown in Figure 1 here, as a cheat sheet to know the details in case you want to implement Raft or you just want to remember how exactly the algorithm works. 


Figure 1. A condensed summary of Raft [1]

Basics

In this section, we review basic concepts of Raft that are necessary to understand the algorithm. 

Log and State Machine

In Raft, we have the notion of the state machine. The state machine is a fancy name for the program that we want to replicate. For example, suppose we want to create a replicated key-value store. Here, the state machine is our key-value store that has two basic operations PUT(key, value) and GET(key). If all replicas of our key-value store start with the same state (e.g., all being empty at the beginning), and perform exactly the same operations in the same order, they will end up in the exact same state. This is known as state machine replication, i.e., making replicas of a state machine and feed them with the same sequence of commands. 

Raft maintains the sequence of the operations on a log. However, it is critical to understand that not every entry that is appended to the log will be applied to the state machine. Sometimes, a node appends some entry to its log, but later that entry is removed to keep logs of different nodes consistent with each other. Only the log entries that are considered committed are allowed to be applied to the state machine. We will explain under which situation a node considers an entry committed, but for now, note that appended and committed are two different states for a log entry.  

Raft guarantees the following important safety property which is exactly what we need for state machine replication: 

State Machine Safety: if a server has applied a log entry at a given index to its state machine, no other server will ever apply a different log entry for the same index.[1]


Figure 2
. State machine replication with Raft

The Notion of Term in Raft

Term in Raft is a scalar value that basically serves as a logical time shared between nodes in the Raft cluster. Each node has its own knowledge of the current term. Although ideally, we want all nodes to assume the same term at any given time, due to distributed nature of the system, different nodes may assume different terms at any given time, i.e., at any given time, some nodes may consider a higher term while some others may assume a lower term. When nodes communicate with each other, they include the value of the term that they think is the current term in their message. The term is used as follows in the algorithm: 

  • When a node receives a message with a term smaller than its own term, the node responds negatively to the message. 
    • It basically says, sorry, you are from the past! so you are not valid to me. 
  • When a node receives a message with a term higher than its own term, it first, updates its term and then processes the message. 
Raft guarantees that we have at most one leader per term. Note that we said at most. Thus, some terms end without having any leader. The purpose of including the term in the communication between nodes is to fence messages from the past terms of the leadership. All nodes start with term 0, and we will explain when they increase their term. Note that any time that a node changes its term, it first writes it to the persistent storage before doing anything else.

Roles

There are three roles in Raft. Don't try to correspond these roles to the three roles in Paxos. Each node can change its state to take any of the following roles during the execution of the algorithm:

  • Follower: it is a passive node that never initiates any communication. It just sits there and responds to requests from other nodes. 
  • Candidate: it is an active node that used to be a follower, but since it suspected that the leader is dead, it steps up to become the new leader. It initiates communication with other nodes to get their acceptance to become the new leader. 
  • Leader: it is an active node that has established its leadership and is the only node that communicates with the client. It initiates communication with other nodes to append new entries to the log upon requests from the client. 
Detecting a higher term ⇒ switching back to the follower state: We will explain below how the state of a node changes between these roles, but before going to the details of the algorithm, it is useful to remember that at any moment in time when a leader or candidate sees a message with a higher term than its own, it immediately updates its term to this value and changes its state to follower. 

RPCs 

Raft uses RPC for node-to-node communication. There are only two RPCs in the entire protocol:
  1. RequestVote: sent by the candidates to other nodes to get their votes. 
  2. AppendEntries: sent by the leader to the other nodes to replicate new entries and also as a heartbeat mechanism. 
Both RPCs are idempotent, i.e., re-delivering an RPC does not cause any harm. The caller of the RPC retries indefinitely until it receives a response from the callee.  

There is another RPC called InstallSnapshot RPC that is not needed for the core logic, and we will talk about it later. 

Leader Election 

When a follower does not receive any RPC (either RequestVote or AppendEntry) for longer than a specific amount of time called election timeout, it says "Hmm...no one is either leader or wants to become leader, maybe I should step up and become the new leader!". In this situation,  it proceeds as follows:
  • It increments its term and transitions to the candidate state. 
  • It votes for itself and issues RequestVote to all other nodes in parallel. In its RPC, the candidate includes the following information:
    • term 
    • candidate's id
    • last log term
    • last log index
  • Then, it waits. 
Each node upon receiving a RequestVote RPC responds positively if ALL of the following 3 conditions are satisfied: 
  1. The RPC term is not smaller than the node's own term. 
  2. The node has not already granted its vote for the given term to another candidate. 
  3. The candidate log is at least as up-to-date as the requesting node's log, i.e., either
    • the candidate's last log index and last long term are the same as those of the node, or
    • the candidate's last long term is larger than that of the node, or
    • the candidate's last long term is equal to that of the node, but the candidate's last log index is larger than that of the node.
The waiting of the candidate terminates by one of the following events:
  • The candidate wins: If a majority of the nodes (including the candidate itself) grants their votes to the candidate, it wins the election. As soon as the candidate receives a majority of the votes, it transitions to the leader state and immediately starts to send AppendEntries message as a heartbeat to establish its authority as the leader and prevent others from stepping up to become a leader. 
  • The candidate discovers a new leader or a higher term: As said above, a candidate transitions back to the follower as soon as it sees an RPC or a response to its RPC with a higher term. In addition to that, if it sees an AppendEntries with a term equal to its term, it also steps down and transitions to the follower state. 
  • The candidate times out: The candidate does not wait forever to receive votes from a majority of the nodes. If it waits for longer than election timeout, it repeats the process explained above, i.e., it increments its term and issues new RequestVote RPCs to the other nodes. 
Note that timeouts may occur due to communication failure or due to split votes, i.e., when no candidate gets the majority of the votes. In either case, the candidate retries to get elected. 

But can't the split vote occur again and again forever? 
To avoid the probability of occurring a split vote, the nodes pick a random value for their election timeout. This way, the probability of several nodes stepping up for the leadership and split vote decreases. In practice, split votes are very rare. 

Important Leader Property in Raft: Condition 3 above guarantees that the log of the leader is always the most up-to-date log in the cluster. Thus, in Raft, we never need to transfer any entry from the followers to the leader, and the fellow of the data is always from the leader to the followers. 

Log Replication

Once a node receives a majority of votes, it considers itself the new leader, and the log replication phase begins. In this phase, the leader first determines up to which point the log on each node is consistent with its own log. To do that, it basically first assumes everyone is consistent with itself. It sends AppendEntries to nodes. If they are not consistent, they let the leader know. When a node returns negatively to an AppendEntires, the leader says "OK, let me try an older index, maybe you are consistent up to that point". They continue this back-and-forth, until the leader finds out up to what index they are consistent. Then, the leader sends the missing entries to the nodes to make their logs fully consistent with its own. The leader considers the latest point of its log that is appended in the current term and appended on a majority of nodes as its last finalized entry to be committed to the state machine. Below are the details of this process.

A node immediately does the following things after becoming the leader:
  • It appends a no-op entry to its log and sends AppendEtnries to all other nodes. 
  • It keeps sending AppendEntries, even when it does not have any new entry, to maintain its authority as the leader. 
The purpose of appending this no-op right after winning the election is to let the leader finds out the most recent commit index. Note that although the leader is guaranteed to have all committed log entries, it is not guaranteed that the leader has applied those entries in its state machine. Without sending this no-op the leader may not commit new entries that it has in its log forever if no new client request arrives. 

The client must only talk to the leader. Initially, the client randomly picks one of the nodes and send its request to it. If the node is not the leader, it rejects the request but lets the client know who is the leader. The client then refers to that node. If that node is dead or is not the leader anymore this process continues until the client sends its request to the leader. 

Upon receiving a request from the client, the leader appends an entry to its local log and starts communication with other nodes to commit this entry. The leader returns to the client immediately after applying the entry to its state machine. Thus, in Raft, when the client receives its response from the leader, the client's command is guaranteed to be applied to the state machine of only the leader. Other nodes are not guaranteed to have applied the command once the client receives its response from the Raft cluster. 

Each node keeps track of the index of the last entry applied to its state machine (lastApplied) and the index of the highest entry known to be committed (commitIndex). Whenever, commitIndex > lastApplied, the node first increments lastApplied and then applies the log entry in index lastApplied to the state machine (note that the client applies the entry in the incremented lastApplied index). 

Now, this is the critical part: how nodes increase their commitIndex?

Increasing commitIndex on the Leader

The leader keeps track of the highest index that it believes each node has appended to its log (matchIndex). Thus, it knows the next index that each node has to append to its log (nextIndex). When the node takes the leadership, it initializes matchIndex to 0 and nextIndex to its own last log index + 1 for all nodes. Whenever the leader sees that its own last log index is larger than or equal to the nextIndex of some node, the leader sends AppendEntries including entries in [nextIndex, lastLogIndex] to that node.  In addition to these entries, the leader also includes its current CommitIndex and the prevLogIndex and prevLogTerm that are the index and the term of the last entry right before these new entries in the leader's log, respectively. 

Upon receiving an AppendEntries RPC from the leader, the receiving the node responds positively if both of the following conditions are satisfied:
  1. Like always, the term of the RPC must be equal to or greater than the nodes' current term. 
  2. The node's log contains an entry (no matter in which location of the log) that matches the <prevLogIndex, prevLogTerm>
When the nodes find an entry that matches <prevLogIndexprevLogTerm>, the node says "OK, I am consistent with the leader, up to this entry". If there is any entry after that entry in the node's log, it removes all of them and appends the new entries received from the leader. If the node does not find any entry that matches <prevLogIndexprevLogTerm>, it returns negatively to the leader which basically says "My log is inconsistent with you and I am missing some entries. Please send new entries starting from an older index". Upon receiving this response, the leader decrements nextIndex for this node and repeats the process. This process repeats until for an AppendEntries RPC, the node can find a match to  <prevLogIndexprevLogTerm> in its log. After receiving a positive response to its AppendEntreis RPC from a node, the leader updates the nextIndex and matchIndex for that node. 

Now, this is how the leader updates its commitIndex: The leader sets its commitIndex to N whenever:
  1. for a majority of the nodes matchIndex >= N: meaning the entry in index N is appended to the log of a majority of the nodes. 
  2. The term of entry N is the current term of the leader. 
Note that the second requirement basically says, to commit an entry, it is not enough for the entry to be appended to a majority of nodes; in addition to that, the term of the entry must also be the same as the current term of the leader. Thus, the leader does not commit entries appended to its log in previous terms. 

Then how such entries are committed?
They will be committed indirectly when the leader commits a new entry append in the current term. 

But what if the new leader does not append any new entry in its term?
Then the leader will never commit those entries! That's why it is necessary for the leader to append a no-op to its log right after winning the election. 

But why the second requirement is necessary at all? 
It is necessary to avoid log consistency in weird situations that may occur due to the failure and recovery of the leader. If you are interested refer to the original paper [1] section 5.4. 

Increasing commitIndex on the Followers

For followers, updating the commitIndex is simple. After accepting an AppendEntreis, the node updates its commitIndex to the minimum of the leader's commitIndex (included in the RPC), and the index of the last entry in the new entries received from the leader. 

Online Cluster Membership Change

Cluster membership change is needed when we want to add/remove nodes to/from our Raft cluster. We ideally want an online membership change, i.e., a change that does not require shutting down the system. Raft does online membership change through its log. Specifically, we append the new configuration to the log like other entries. There is an important difference between configuration entries and normal entries: the configuration entries are applied to the state machine of the node as soon as a node learns about them. Thus, unlike the normal entries, a node does not wait for a configuration entry to be committed before applying it to its state machine. 

The problem with configuration change is that we cannot atomically change the configuration for all nodes. Thus, if we start asking nodes to directly switch from the old configuration to the new configuration, we might end up being in a situation that we have two leaders in our cluster, one is the leader of the node with the old configuration and one is the leader of the node with the updated configuration. To avoid having two leaders during a configuration change, Raft first changes all nodes to an intermediate configuration that includes both configurations. 

When the current leader receives a request to change the configuration from C1 to C2, it first generates the intermediate configuration C1.5, appends it to its log, and replicates to other nodes. C1.5 is a special configuration that includes a union of servers in both C1 and C2. To consider this entry committed, the leader waits for two majorities, one from servers in C1 and one from servers in C2. Once the leader received the majorities, it commits the entry and asks other nodes to commit as well. 

Once the leader successfully commits C1.5, it is safe to go ahead to C2. In this situation, the leader creates a configuration entry for C2, appends it to its log, and replicates it to the other nodes. When the C2 is committed, C1 is irrelevant, and it is safe to remove servers in C1 that are not in C2

There are a couple of issues with this transition that we have to deal with. 

Issue: New servers have empty logs
When the new servers join the cluster, they don't have anything in their log. Thus, when the leader tries to commit a new entry, they keep responding negatively to the AppendEntries RPC, requesting older entries. This makes the system unavailable for writing new entries while these new servers are catching up. To avoid this major problem, there is a phase before actually kicking the re-configuration process where the new servers join the cluster as non-voting members that just catch up with the leaders. Once they are ready, we start the re-configuration.

 But how new servers join the cluster anyway if we have a very long log? It might take a huge amount of time for a completely new server to catch up with the existing logs? 
This is where the log compaction and snapshotting come to the picture. We will talk about it in the next section, but the idea is, we don't let logs grow more than a certain size. Thus, periodically, we take a snapshot of the state machine and remove all entries of the log that are included in the snapshot. 

Issue: What happens when the current leader is removed?
As explained above, the configuration entries are special, as the nodes apply them as soon as they see them, even before committing them. Now, suppose the current leader is not part of C2. Once it sees C2, it learns that it is not part of the cluster anymore. However, it is still the leader! so it still needs to append this entry to the other nodes and commit the entry. Thus, it remains the leader but does not include itself in counting the majority. Once it got a vote from a majority of nodes in C2, it commits the entry and steps down as leader. The nodes in C2 will pick a new leader later, once the existing leader becomes silent. 

Issue: Removed servers may disrupt the new cluster
Those servers in C1 that are not in C2, must be terminated. However, until then, they can disrupt the new cluster by sending RPCs. Specifically, the removed servers won't hear from the new leader, because the new leader only works with the servers in C2. Since they don't hear from the leader, they time out and start a leader election. They increase the term and ask other nodes to vote by sending RequestVote RPCs. The current leader of C2 may receive one of these RPCs and step down as the leader. Once this leader becomes silent, nodes in C2 will time out and picks a new leader for C2 again, but removed servers may repeat this process, and causing unavailability for the cluster. 

To avoid disruption by removed servers, we have to change the Raft algorithm as follows: the nodes must ignore RequestVote RPCs when they believe the current leader is still alive. Thus, while they are receiving AppednEntries regularly before timeout, they ignore any RequestVote RPC message. This change is not included in Figure 1. 

Log Compaction and Snapshots

As we briefly mentioned above, obviously we cannot let logs grow infinitely. The long logs take up space and make it hard to replay, i.e., reading the log and applying it to the state machine for example for new servers with a clear state. To solve this issue we can use snapshots. 

The idea is that we don't let logs grow more than a certain size. Thus, periodically, we take a snapshot of the state machine and remove all entries of the log that are included in the snapshot. Each server does this process independently. Thus, once in a while the server takes a snapshot of its state and removes all applied entries from its log. It also removes the previous snapshots. The snapshot includes the following information:
  • The current state of the state machine, e.g., if our state machine is a key-value store, its state is basically the value of all keys. 
  • Metadata:
    • index of the last included log entry
    • term of the last included log entry 
    • latest configuration 
The metadata is used for checking the log consistency as before.

As explained in the previous sections, one of the responsibilities of the leader is to replicate log entries to other nodes. Now, what happens when the leader has already deleted a log entry that a node needs? In this situation, the leader has to send the snapshot to the node. The leader does that via the InstallSnapshot RPC. 

Once a node receives a snapshot via this RPC, it checks the index and term of the last included log entry of the snapshot:
  • If the node does not find any entry in its log with that index and log, it knows that the snapshot contains a state newer than what the node already has. Thus, the node completely throws away its log and installs the snapshot. 
  • If the node finds an entry in its log that matches the index and term of the last included log entry of the snapshot, it installs the snapshot and throws away the log entry up through that entry, and keeps the rest of the log entries.

Discussion 

Linearizability of Raft

Linearizability requires the system to respect the real-time order of the events. In simple language, if operation 2 starts after operation 1 returns to the client, operation 2 must take effect after operation 1 and it must see the effect of operation 1. For example, if operation 1 writes a value for a key, and operation 2 reads that key, it must see the value written by operation 1 (or a newer value). 

In Raft, we are guaranteed to get linearizability if we only access the leader for both read and write operations. Note that the leader returns to the client before other nodes commit an entry. Thus, it is quite possible for a client not to find what it just wrote if the client read from one of the followers. 

To provide linearizability even for the case where we exclusively access the leader, Raft needs to take the following measures:
  • The leader is guaranteed to have the most up-to-date log, but it is NOT guaranteed that the new leader has applied all of its log entries. Thus, if the client goes to the leader to read the state, it is possible that the client does not find the state machine in the most up-to-date state. To avoid this situation, the leaders appends and commit a no-op entry to its log as soon as it becomes the new leader. Before applying this entry, the leader does not respond to any client. 
  • It is possible that the leader has changed, but the previous leader is not aware of that! Now, when the client asks this expired leader, it may return a stale state to the client. To avoid this, the leader does not return to the client even for read-only requests without getting positive responses for a heartbeat messages sent after receiving this request from a majority of the servers. 
As you can see, in Raft to provide linearizability, the leader needs acknowledgment from a majority of the replicas even for the read-only operations. Paying the cost of one RTT per read might not acceptable for many applications especially if we deploy our raft cluster on multiple DCs and having large RTTs. 

Leader Lease/Expiry

One solution to avoid the RTT cost for read-only operations is to rely on leader leases. The idea is that whenever a node becomes a leader, it has a certain amount of time to remain the leader no matter what happens in the system. When a node becomes the leader, it does not consider itself as a leader until the lease of the previous leader is over. Thus, the nodes need to use a timer to keep track of the remaining time of the lease for the current leader. An alive leader constantly extends its lease by seeing ApplyEntries RPCs. When the current leader is still alive and network-partitioned, the rest of the nodes will elect a new leader. Here is the important part: we must guarantee that the lease timer of the previous leader starts before the lease timer of the new leader. Thus, the previous leader steps down before the new leader considers himself as the new leader. That can be easily achieved by starting the lease timer of the leader before sending the ApplyEntries RPCs to the followers. Using leader leases, the leader does not need to get a majority of acknowledgments before each read operation and still guarantees linearizability. However, note that to use this method, the amount of clock drift must be small, i.e., the node must measure the passage of time close to each other. Specifically, to guarantee that the timer of the old leader is over before the timer of the new leader, the clock drift must be less than the RPC delay. You can check this blog post to see how YugaByteDB uses this approach.  

With leader leases, we guarantee that the new leader does not consider itself as a leader before the previous leader steps down. An alternative approach is to make sure the old leader steps down before the new leader is elected. To use this approach, we can use a leader expiry; if the leader does not receive acknowledgments from a majority of the nodes for longer than the given expiry, it steps down and become a follower. Now, to make sure that the old leader steps down before the new leader is elected, the expiry must be smaller than the election timeout used by the nodes to start a new leader election. You can check this to see how NuRaft [2] uses this approach. 

As you can see, to have linearizability with Raft, 
  1. We have to access the leader, and 
  2. The node that we access as the leader must make sure that it is still the leader. For this we can do one of the following methods: 
    • The node waits to receive an acknowledgment to an ApplyEntries RPC sent after receiving the read request from a majority of the followers before performing and returning the read operation. 
    • Assuming we have bounded clock drift, we can use leader leases/expiry as explained above. 
So there is no way that we can access followers and have linearizabiltiy? 
With some additional measures, we can read from followers and still guarantee linearizability, but that still requires accessing the leader. For example, we can do this: the client asks the leader for the latest log index and then refers to one of the followers to read. The client sends this log index with its read request to the follower. The follower then blocks this read operation until it applies the log entry at the given index to its state machine. This way, it is guaranteed that the client will find the state machine in its most up-to-date state before the client intends to read the state machine, thereby guaranteeing the linearizability. Note that before returning the log index, the leader has to wait for a majority of nodes to make sure it is not missing any entry. Thus, as you can see, although we are reading the value from one of the followers, we still need to access the leader and wait for one RTT between the leader and a majority of the nodes or use the leader leases. In addition to that, we also need to pay the cost of RTT to the follower. 

De-duplicating Client Commands

Raft guarantees that when a client receives an acknowledgment from the leader its command is applied to the state machine of the leader and it will be eventually applied on the state machines of other nodes. However, when a client times out while waiting for its request, there is no guarantee; the command may or may not be applied to the state machines. When a client gets a timeout, the natural action is to repeat the request, but if the first request was indeed successful, this will cause duplication. This problem occurs in any client-server system and the solution is to assign a unique ID (a.k.a sequence number) to each client's request and when receiving a request with the same ID, we don't execute the command for the second time, and simply return the result to the previous request with the same ID. Note that this de-duplication mechanism must be implemented at the state machine level. Thus, Raft itself is not involved in that. 

Summary 

In this post, we reviewed the Raft algorithm. Unlike Paxos that is designed for agreement on a single value, Raft is designed for agreement on a sequence of values, i.e., a log. Raft is significantly simpler to understand and implement than Multi-Paxos, an alternative algorithm based on Paxos. Implementing consensus protocol is tricky and it is much better to use existing products that are already tested in production. If you are looking for a production-quality Raft implementation, you can use NuRaft [2] which is introduced by our group at eBay. 

References 

[1] Diego Ongaro and John Ousterhout. "In search of an understandable consensus algorithm." In 2014 {USENIX} Annual Technical Conference ({USENIX}{ATC} 14), pp. 305-319. 2014.
[2] NuRaft. eBay's GitHub: https://github.com/eBay/NuRaft

Comments

Popular posts from this blog

In-memory vs. On-disk Databases

ByteGraph: A Graph Database for TikTok

DynamoDB, Ten Years Later

Amazon DynamoDB: ACID Transactions using Timestamp Ordering