Skip to content

Commit

Permalink
Tweak Raft state transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 7, 2024
1 parent 64e3ef8 commit bcbe4f7
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 35 deletions.
28 changes: 16 additions & 12 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,39 @@ impl RawNode<Candidate> {
Ok(())
}

/// Transforms the node into a follower. We either lost the election
/// and follow the winner, or we discovered a new term in which case
/// we step into it as a leaderless follower.
fn become_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
/// Transitions the candidate to a follower. We either lost the election and
/// follow the winner, or we discovered a new term in which case we step
/// into it as a leaderless follower.
pub(super) fn into_follower(
mut self,
term: Term,
leader: Option<NodeID>,
) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);

if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
let voted_for = Some(self.id); // by definition
Ok(self.become_role(Follower::new(Some(leader), voted_for)))
Ok(self.into_role(Follower::new(Some(leader), voted_for)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
Ok(self.become_role(Follower::new(None, None)))
Ok(self.into_role(Follower::new(None, None)))
}
}

/// Transition to leader role.
pub(super) fn become_leader(self) -> Result<RawNode<Leader>> {
/// Transitions the candidate to a leader. We won the election.
pub(super) fn into_leader(self) -> Result<RawNode<Leader>> {
info!("Won election for term {}, becoming leader", self.term);
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.become_role(Leader::new(peers, last_index));
let mut node = self.into_role(Leader::new(peers, last_index));
node.heartbeat()?;

// Propose an empty command when assuming leadership, to disambiguate
Expand All @@ -98,7 +102,7 @@ impl RawNode<Candidate> {
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.become_follower(msg.term, None)?.step(msg);
return self.into_follower(msg.term, None)?.step(msg);
}

match msg.event {
Expand All @@ -110,14 +114,14 @@ impl RawNode<Candidate> {
Event::GrantVote => {
self.role.votes.insert(msg.from.unwrap());
if self.role.votes.len() as u64 >= self.quorum() {
return Ok(self.become_leader()?.into());
return Ok(self.into_leader()?.into());
}
}

// If we receive a heartbeat or entries in this term, we lost the
// election and have a new leader. Follow it and step the message.
Event::Heartbeat { .. } | Event::AppendEntries { .. } => {
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
return self.into_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
}

// Abort any inbound client requests while candidate.
Expand Down
27 changes: 16 additions & 11 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,25 @@ impl RawNode<Follower> {
Ok(())
}

/// Transforms the node into a candidate, by campaigning for leadership in a
/// new term.
pub(super) fn become_candidate(mut self) -> Result<RawNode<Candidate>> {
/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
pub(super) fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;

let mut node = self.become_role(Candidate::new());
let mut node = self.into_role(Candidate::new());
node.campaign()?;
Ok(node)
}

/// Transforms the node into a follower, either a leaderless follower in a
/// new term or following a leader in the current term.
fn become_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
/// Transitions the candidate into a follower, either a leaderless follower
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
pub(super) fn into_follower(
mut self,
leader: Option<NodeID>,
term: Term,
) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);

// Abort any forwarded requests. These must be retried with new leader.
Expand Down Expand Up @@ -125,7 +130,7 @@ impl RawNode<Follower> {
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.become_follower(None, msg.term)?.step(msg);
return self.into_follower(None, msg.term)?.step(msg);
}

// Record when we last saw a message from the leader (if any).
Expand All @@ -142,7 +147,7 @@ impl RawNode<Follower> {
let from = msg.from.unwrap();
match self.role.leader {
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
None => self = self.become_follower(Some(from), msg.term)?,
None => self = self.into_follower(Some(from), msg.term)?,
}

// Advance commit index and apply entries if possible.
Expand All @@ -165,7 +170,7 @@ impl RawNode<Follower> {
let from = msg.from.unwrap();
match self.role.leader {
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
None => self = self.become_follower(Some(from), msg.term)?,
None => self = self.into_follower(Some(from), msg.term)?,
}

// Append the entries, if possible.
Expand Down Expand Up @@ -250,7 +255,7 @@ impl RawNode<Follower> {

self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.become_candidate()?.into());
return Ok(self.into_candidate()?.into());
}
Ok(self.into())
}
Expand Down
11 changes: 6 additions & 5 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ impl RawNode<Leader> {
Ok(())
}

/// Transforms the leader into a follower. This can only happen if we find a
/// new term, so we become a leaderless follower.
fn become_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Subsequently
/// stepping the received message may discover the leader, if there is one.
pub(super) fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);
assert!(term > self.term, "Can only become follower in later term");

info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
self.state_tx.send(Instruction::Abort)?;
Ok(self.become_role(Follower::new(None, None)))
Ok(self.into_role(Follower::new(None, None)))
}

/// Processes a message.
Expand All @@ -73,7 +74,7 @@ impl RawNode<Leader> {
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.become_follower(msg.term)?.step(msg);
return self.into_follower(msg.term)?.step(msg);
}

match msg.event {
Expand Down
24 changes: 17 additions & 7 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ pub struct Status {
pub storage_size: u64,
}

/// The local Raft node state machine.
/// A Raft node, with a dynamic role. The node is driven synchronously by
/// processing inbound messages via step() or by advancing time via tick().
/// These methods consume the current node, and return a new one with a possibly
/// different type. Outbound messages are sent via the given node_tx channel.
///
/// This enum wraps the NodeState<Role> types, which implement the actual
/// node logic. It exists for ergonomic use across role transitions, i.e
/// node = node.step()?.
pub enum Node {
Candidate(RawNode<Candidate>),
Follower(RawNode<Follower>),
Expand All @@ -73,7 +80,7 @@ impl Node {
let node = RawNode::new(id, peers, log, node_tx, state_tx)?;
if node.peers.is_empty() {
// If there are no peers, become leader immediately.
return Ok(node.become_candidate()?.become_leader()?.into());
return Ok(node.into_candidate()?.into_leader()?.into());
}
Ok(node.into())
}
Expand Down Expand Up @@ -128,7 +135,10 @@ impl From<RawNode<Leader>> for Node {
/// A Raft role: leader, follower, or candidate.
pub trait Role: Clone + std::fmt::Debug + PartialEq {}

// A Raft node with role R
/// A Raft node with the concrete role R.
///
/// This implements the typestate pattern, where individual node states (roles)
/// are encoded as RawNode<Role>. See: http://cliffle.com/blog/rust-typestate/
pub struct RawNode<R: Role = Follower> {
id: NodeID,
peers: HashSet<NodeID>,
Expand All @@ -140,8 +150,8 @@ pub struct RawNode<R: Role = Follower> {
}

impl<R: Role> RawNode<R> {
/// Transforms the node into another role.
fn become_role<T: Role>(self, role: T) -> RawNode<T> {
/// Helper for role transitions.
fn into_role<T: Role>(self, role: T) -> RawNode<T> {
RawNode {
id: self.id,
peers: self.peers,
Expand Down Expand Up @@ -481,10 +491,10 @@ mod tests {
}

#[test]
fn become_role() -> Result<()> {
fn into_role() -> Result<()> {
let (node, _) = setup_rolenode()?;
let role = Candidate::new();
let new = node.become_role(role.clone());
let new = node.into_role(role.clone());
assert_eq!(new.id, 1);
assert_eq!(new.term, 1);
assert_eq!(new.peers, HashSet::from([2, 3]));
Expand Down

0 comments on commit bcbe4f7

Please sign in to comment.