Skip to content

Commit

Permalink
Tweak Raft election logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent d76cb23 commit afee78a
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 60 deletions.
8 changes: 6 additions & 2 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,19 @@ pub enum Event {
/// and would like the leader to replicate it.
has_committed: bool,
},
/// Candidates solicit votes from all peers.

/// Candidates solicit votes from all peers when campaigning for leadership.
SolicitVote {
// The index of the candidate's last stored log entry
last_index: Index,
// The term of the candidate's last stored log entry
last_term: Term,
},
/// Followers may grant votes to candidates.

/// Followers may grant a single vote to a candidate per term, on a
/// first-come basis. Candidates implicitly vote for themselves.
GrantVote,

/// Leaders replicate log entries to followers by appending it to their log.
AppendEntries {
/// The index of the log entry immediately preceding the submitted commands.
Expand Down
61 changes: 38 additions & 23 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RoleNode, Ter
use crate::error::{Error, Result};

use ::log::{debug, error, info, warn};
use std::collections::HashSet;

/// A candidate is campaigning to become a leader.
#[derive(Debug)]
pub struct Candidate {
/// Votes received (including ourself).
votes: HashSet<NodeID>,
/// Ticks elapsed since election start.
election_duration: Ticks,
/// Election timeout, in ticks.
election_timeout: Ticks,
/// Votes received (including ourself).
votes: u64,
}

impl Candidate {
/// Creates a new candidate role.
pub fn new() -> Self {
Self {
votes: 1, // We always start with a vote for ourselves.
votes: HashSet::new(),
election_duration: 0,
election_timeout: rand_election_timeout(),
}
Expand Down Expand Up @@ -63,6 +64,10 @@ impl RoleNode<Candidate> {

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Assert invariants.
debug_assert_eq!(self.term, self.log.get_term()?.0, "Term does not match log");
debug_assert_eq!(Some(self.id), self.log.get_term()?.1, "Log vote does not match self");

// Drop invalid messages and messages from past terms.
if let Err(err) = self.validate(&msg) {
error!("Invalid message: {} ({:?})", err, msg);
Expand All @@ -81,29 +86,29 @@ impl RoleNode<Candidate> {
}

match msg.event {
// If we receive a heartbeat or replicated entries in this term, we
// lost the election and have a new leader. Follow it and process
// the message.
Event::Heartbeat { .. } | Event::AppendEntries { .. } => {
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
}
// Ignore other candidates when we're also campaigning.
Event::SolicitVote { .. } => {}

// We received a vote. Record it, and if we have quorum, assume
// leadership.
Event::GrantVote => {
debug!("Received term {} vote from {:?}", self.term, msg.from);
self.role.votes += 1;
if self.role.votes >= self.quorum() {
self.role.votes.insert(msg.from.unwrap());
if self.role.votes.len() as u64 >= self.quorum() {
return Ok(self.become_leader()?.into());
}
}

// If we receive a heartbeat or entries in this term, we lost the
// election and have a new leader. Follow it and step the message.
Event::Heartbeat { .. } | Event::AppendEntries { .. } => {
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
}

// Abort any inbound client requests while candidate.
Event::ClientRequest { id, .. } => {
self.send(msg.from, Event::ClientResponse { id, response: Err(Error::Abort) })?;
}

// Ignore other candidates when we're also campaigning
Event::SolicitVote { .. } => {}

// We're not a leader in this term, nor are we forwarding requests,
// so we shouldn't see these.
Event::ConfirmLeader { .. }
Expand All @@ -116,18 +121,27 @@ impl RoleNode<Candidate> {

/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
// If the election times out, start a new one for the next term.
self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
info!("Election timed out, starting new election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
self.term += 1;
self.log.set_term(self.term, Some(self.id))?;
self.role = Candidate::new();
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
self.campaign()?;
}
Ok(self.into())
}

/// Campaign for leadership by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
pub(super) fn campaign(&mut self) -> Result<()> {
let term = self.term + 1;
info!("Starting new election for term {}", term);
self.role = Candidate::new();
self.role.votes.insert(self.id); // vote for ourself
self.term = term;
self.log.set_term(term, Some(self.id))?;

let (last_index, last_term) = self.log.get_last_index();
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
Ok(())
}
}

#[cfg(test)]
Expand All @@ -153,7 +167,7 @@ mod tests {
log.commit(2)?;
log.set_term(3, Some(1))?;

let node = RoleNode {
let mut node = RoleNode {
id: 1,
peers: vec![2, 3, 4, 5],
term: 3,
Expand All @@ -162,6 +176,7 @@ mod tests {
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Ok((node, node_rx, state_rx))
}

Expand Down
63 changes: 30 additions & 33 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,14 @@ impl Follower {
}

impl RoleNode<Follower> {
/// Transforms the node into a candidate.
/// Transforms the node into a candidate, by campaigning for leadership in a
/// new term.
fn become_candidate(mut self) -> Result<RoleNode<Candidate>> {
// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;

info!("Starting election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
let mut node = self.become_role(Candidate::new());
node.term += 1;
node.log.set_term(node.term, Some(node.id))?;
node.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
node.campaign()?;
Ok(node)
}

Expand Down Expand Up @@ -75,19 +72,11 @@ impl RoleNode<Follower> {
Ok(self)
}

/// Checks if an address is the current leader.
fn is_leader(&self, from: &Address) -> bool {
if let Some(leader) = &self.role.leader {
if let Address::Node(from) = from {
return leader == from;
}
}
false
}

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Assert invariants.
debug_assert_eq!(self.term, self.log.get_term()?.0, "Term does not match log");
debug_assert_eq!(self.role.voted_for, self.log.get_term()?.1, "Vote does not match log");
if self.role.leader.is_none() {
assert!(self.role.forwarded.is_empty(), "Leaderless follower has forwarded requests");
}
Expand Down Expand Up @@ -159,20 +148,21 @@ impl RoleNode<Follower> {
}
}

// A candidate in this term is requesting our vote.
Event::SolicitVote { last_index, last_term } => {
let from = msg.from.unwrap();

// If we already voted for someone else in this term, ignore it.
if let Some(voted_for) = self.role.voted_for {
if msg.from != Address::Node(voted_for) {
if from != voted_for {
return Ok(self.into());
}
}
let (log_last_index, log_last_term) = self.log.get_last_index();
if last_term < log_last_term {
return Ok(self.into());
}
if last_term == log_last_term && last_index < log_last_index {
return Ok(self.into());
}
if let Address::Node(from) = msg.from {

// Only vote if the candidate's log is at least as up-to-date as
// our log.
let (log_index, log_term) = self.log.get_last_index();
if last_term > log_term || last_term == log_term && last_index >= log_index {
info!("Voting for {} in term {} election", from, self.term);
self.send(Address::Node(from), Event::GrantVote)?;
self.log.set_term(self.term, Some(from))?;
Expand Down Expand Up @@ -215,13 +205,11 @@ impl RoleNode<Follower> {
}
}

// Ignore votes which are usually strays from the previous election that we lost.
Event::GrantVote => {}

// We're not a leader in this term, so we shoudn't see these.
// We're not a leader nor candidate in this term, so we shoudn't see these.
Event::ConfirmLeader { .. }
| Event::AcceptEntries { .. }
| Event::RejectEntries { .. } => warn!("Received unexpected message {:?}", msg),
| Event::RejectEntries { .. }
| Event::GrantVote { .. } => warn!("Received unexpected message {:?}", msg),
};
Ok(self.into())
}
Expand All @@ -230,10 +218,9 @@ impl RoleNode<Follower> {
pub fn tick(mut self) -> Result<Node> {
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
Ok(self.become_candidate()?.into())
} else {
Ok(self.into())
return Ok(self.become_candidate()?.into());
}
Ok(self.into())
}

/// Aborts all forwarded requests.
Expand All @@ -244,6 +231,16 @@ impl RoleNode<Follower> {
}
Ok(())
}

/// Checks if an address is the current leader.
fn is_leader(&self, from: &Address) -> bool {
if let Some(leader) = &self.role.leader {
if let Address::Node(from) = from {
return leader == from;
}
}
false
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl RoleNode<Leader> {

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Assert invariants.
debug_assert_eq!(self.term, self.log.get_term()?.0, "Term does not match log");
debug_assert_eq!(Some(self.id), self.log.get_term()?.1, "Log vote does not match self");

// Drop invalid messages and messages from past terms.
if let Err(err) = self.validate(&msg) {
error!("Invalid message: {} ({:?})", err, msg);
Expand Down
5 changes: 4 additions & 1 deletion src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Node {
tokio::spawn(driver.drive(state));

let (term, voted_for) = log.get_term()?;
let node = RoleNode {
let mut node = RoleNode {
id,
peers,
term,
Expand All @@ -81,6 +81,9 @@ impl Node {
};
if node.peers.is_empty() {
info!("No peers specified, starting as leader");
if voted_for != Some(id) {
node.log.set_term(term, Some(id))?;
}
let (last_index, _) = node.log.get_last_index();
Ok(node.become_role(Leader::new(vec![], last_index)).into())
} else {
Expand Down
2 changes: 1 addition & 1 deletion tests/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn status() -> Result<()> {
commit_index: 26,
apply_index: 26,
storage: "bitcask".into(),
storage_size: 1309,
storage_size: 1313,
},
mvcc: mvcc::Status {
versions: 1,
Expand Down

0 comments on commit afee78a

Please sign in to comment.