Skip to content

Commit

Permalink
Tweak Raft leadership logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent afee78a commit 87d4e46
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 78 deletions.
5 changes: 4 additions & 1 deletion src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl RoleNode<Candidate> {
let (last_index, _) = self.log.get_last_index();
let mut node = self.become_role(Leader::new(peers, last_index));
node.heartbeat()?;
node.append(None)?;

// Propose an empty command when assuming leadership, to disambiguate
// previous entries in the log. See section 8 in the Raft paper.
node.propose(None)?;
Ok(node)
}

Expand Down
160 changes: 83 additions & 77 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,59 +46,6 @@ impl RoleNode<Leader> {
Ok(self.become_role(Follower::new(None, None)))
}

/// Appends an entry to the log and replicates it to peers.
pub fn append(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(self.term, command)?;
for peer in self.peers.clone() {
self.replicate(peer)?;
}
Ok(index)
}

/// Commits any pending log entries.
fn commit(&mut self) -> Result<Index> {
let mut last_indexes = vec![self.log.get_last_index().0];
last_indexes.extend(self.role.peer_last_index.values());
last_indexes.sort_unstable();
last_indexes.reverse();
let quorum_index = last_indexes[self.quorum() as usize - 1];

// We can only safely commit up to an entry from our own term, see figure 8 in Raft paper.
let (commit_index, _) = self.log.get_commit_index();
if quorum_index > commit_index {
if let Some(entry) = self.log.get(quorum_index)? {
if entry.term == self.term {
self.log.commit(quorum_index)?;
let mut scan = self.log.scan((commit_index + 1)..=quorum_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
}
}
Ok(quorum_index)
}

/// Replicates the log to a peer.
fn replicate(&mut self, peer: NodeID) -> Result<()> {
let peer_next = self
.role
.peer_next_index
.get(&peer)
.cloned()
.ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?;
let base_index = if peer_next > 0 { peer_next - 1 } else { 0 };
let base_term = match self.log.get(base_index)? {
Some(base) => base.term,
None if base_index == 0 => 0,
None => return Err(Error::Internal(format!("Missing base entry {}", base_index))),
};
let entries = self.log.scan(peer_next..)?.collect::<Result<Vec<_>>>()?;
debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer);
self.send(Address::Node(peer), Event::AppendEntries { base_index, base_term, entries })?;
Ok(())
}

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Assert invariants.
Expand Down Expand Up @@ -128,36 +75,40 @@ impl RoleNode<Leader> {
panic!("Saw other leader {} in term {}", msg.from.unwrap(), msg.term);
}

// A follower received one of our heartbeats and confirms that we
// are its leader. If it doesn't have the commit index in its local
// log, replicate the log to it.
Event::ConfirmLeader { commit_index, has_committed } => {
if let Address::Node(from) = msg.from {
self.state_tx.send(Instruction::Vote {
term: msg.term,
index: commit_index,
address: msg.from,
})?;
if !has_committed {
self.replicate(from)?;
}
let from = msg.from.unwrap();
self.state_tx.send(Instruction::Vote {
term: msg.term,
index: commit_index,
address: msg.from,
})?;
if !has_committed {
self.send_log(from)?;
}
}

// A follower appended log entries we sent it.
Event::AcceptEntries { last_index } => {
if let Address::Node(from) = msg.from {
self.role.peer_last_index.insert(from, last_index);
self.role.peer_next_index.insert(from, last_index + 1);
}
self.commit()?;
let from = msg.from.unwrap();
self.role.peer_last_index.insert(from, last_index);
self.role.peer_next_index.insert(from, last_index + 1);
self.maybe_commit()?;
}

// A follower rejected log entries we sent it, typically because it
// does not have the base index in its log. Try from the previous
// entry.
Event::RejectEntries => {
if let Address::Node(from) = msg.from {
self.role.peer_next_index.entry(from).and_modify(|i| {
if *i > 1 {
*i -= 1
}
});
self.replicate(from)?;
}
let from = msg.from.unwrap();
self.role.peer_next_index.entry(from).and_modify(|i| {
if *i > 1 {
*i -= 1
}
});
self.send_log(from)?;
}

Event::ClientRequest { id, request: Request::Query(command) } => {
Expand All @@ -179,10 +130,10 @@ impl RoleNode<Leader> {
}

Event::ClientRequest { id, request: Request::Mutate(command) } => {
let index = self.append(Some(command))?;
let index = self.propose(Some(command))?;
self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?;
if self.peers.is_empty() {
self.commit()?;
self.maybe_commit()?;
}
}

Expand Down Expand Up @@ -234,6 +185,61 @@ impl RoleNode<Leader> {
// periodic heartbeats regardless of any on-demand heartbeats.
Ok(())
}

/// Proposes a command for consensus by appending it to our log and
/// replicating it to peers. If successful, it will eventually be committed
/// and applied to the state machine.
pub(super) fn propose(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(self.term, command)?;
for peer in self.peers.clone() {
self.send_log(peer)?;
}
Ok(index)
}

/// Commits any pending log entries, if possible.
fn maybe_commit(&mut self) -> Result<Index> {
let mut last_indexes = vec![self.log.get_last_index().0];
last_indexes.extend(self.role.peer_last_index.values());
last_indexes.sort_unstable();
last_indexes.reverse();
let quorum_index = last_indexes[self.quorum() as usize - 1];

// We can only safely commit up to an entry from our own term, see figure 8 in Raft paper.
let (commit_index, _) = self.log.get_commit_index();
if quorum_index > commit_index {
if let Some(entry) = self.log.get(quorum_index)? {
if entry.term == self.term {
self.log.commit(quorum_index)?;
let mut scan = self.log.scan((commit_index + 1)..=quorum_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
}
}
Ok(quorum_index)
}

/// Sends pending log entries to a peer.
fn send_log(&mut self, peer: NodeID) -> Result<()> {
let peer_next = self
.role
.peer_next_index
.get(&peer)
.cloned()
.ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?;
let base_index = if peer_next > 0 { peer_next - 1 } else { 0 };
let base_term = match self.log.get(base_index)? {
Some(base) => base.term,
None if base_index == 0 => 0,
None => return Err(Error::Internal(format!("Missing base entry {}", base_index))),
};
let entries = self.log.scan(peer_next..)?.collect::<Result<Vec<_>>>()?;
debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer);
self.send(Address::Node(peer), Event::AppendEntries { base_index, base_term, entries })?;
Ok(())
}
}

#[cfg(test)]
Expand Down

0 comments on commit 87d4e46

Please sign in to comment.