From 87d4e465e113c4670ed6ca5a7948bede8836d312 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 19 Nov 2023 15:57:02 +0100 Subject: [PATCH] Tweak Raft leadership logic. --- src/raft/node/candidate.rs | 5 +- src/raft/node/leader.rs | 160 +++++++++++++++++++------------------ 2 files changed, 87 insertions(+), 78 deletions(-) diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index cdd36770d..bb41e7b61 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -58,7 +58,10 @@ impl RoleNode { let (last_index, _) = self.log.get_last_index(); let mut node = self.become_role(Leader::new(peers, last_index)); node.heartbeat()?; - node.append(None)?; + + // Propose an empty command when assuming leadership, to disambiguate + // previous entries in the log. See section 8 in the Raft paper. + node.propose(None)?; Ok(node) } diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index 8045e0969..45f94c1cf 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -46,59 +46,6 @@ impl RoleNode { Ok(self.become_role(Follower::new(None, None))) } - /// Appends an entry to the log and replicates it to peers. - pub fn append(&mut self, command: Option>) -> Result { - let index = self.log.append(self.term, command)?; - for peer in self.peers.clone() { - self.replicate(peer)?; - } - Ok(index) - } - - /// Commits any pending log entries. - fn commit(&mut self) -> Result { - let mut last_indexes = vec![self.log.get_last_index().0]; - last_indexes.extend(self.role.peer_last_index.values()); - last_indexes.sort_unstable(); - last_indexes.reverse(); - let quorum_index = last_indexes[self.quorum() as usize - 1]; - - // We can only safely commit up to an entry from our own term, see figure 8 in Raft paper. - let (commit_index, _) = self.log.get_commit_index(); - if quorum_index > commit_index { - if let Some(entry) = self.log.get(quorum_index)? { - if entry.term == self.term { - self.log.commit(quorum_index)?; - let mut scan = self.log.scan((commit_index + 1)..=quorum_index)?; - while let Some(entry) = scan.next().transpose()? { - self.state_tx.send(Instruction::Apply { entry })?; - } - } - } - } - Ok(quorum_index) - } - - /// Replicates the log to a peer. - fn replicate(&mut self, peer: NodeID) -> Result<()> { - let peer_next = self - .role - .peer_next_index - .get(&peer) - .cloned() - .ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?; - let base_index = if peer_next > 0 { peer_next - 1 } else { 0 }; - let base_term = match self.log.get(base_index)? { - Some(base) => base.term, - None if base_index == 0 => 0, - None => return Err(Error::Internal(format!("Missing base entry {}", base_index))), - }; - let entries = self.log.scan(peer_next..)?.collect::>>()?; - debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer); - self.send(Address::Node(peer), Event::AppendEntries { base_index, base_term, entries })?; - Ok(()) - } - /// Processes a message. pub fn step(mut self, msg: Message) -> Result { // Assert invariants. @@ -128,36 +75,40 @@ impl RoleNode { panic!("Saw other leader {} in term {}", msg.from.unwrap(), msg.term); } + // A follower received one of our heartbeats and confirms that we + // are its leader. If it doesn't have the commit index in its local + // log, replicate the log to it. Event::ConfirmLeader { commit_index, has_committed } => { - if let Address::Node(from) = msg.from { - self.state_tx.send(Instruction::Vote { - term: msg.term, - index: commit_index, - address: msg.from, - })?; - if !has_committed { - self.replicate(from)?; - } + let from = msg.from.unwrap(); + self.state_tx.send(Instruction::Vote { + term: msg.term, + index: commit_index, + address: msg.from, + })?; + if !has_committed { + self.send_log(from)?; } } + // A follower appended log entries we sent it. Event::AcceptEntries { last_index } => { - if let Address::Node(from) = msg.from { - self.role.peer_last_index.insert(from, last_index); - self.role.peer_next_index.insert(from, last_index + 1); - } - self.commit()?; + let from = msg.from.unwrap(); + self.role.peer_last_index.insert(from, last_index); + self.role.peer_next_index.insert(from, last_index + 1); + self.maybe_commit()?; } + // A follower rejected log entries we sent it, typically because it + // does not have the base index in its log. Try from the previous + // entry. Event::RejectEntries => { - if let Address::Node(from) = msg.from { - self.role.peer_next_index.entry(from).and_modify(|i| { - if *i > 1 { - *i -= 1 - } - }); - self.replicate(from)?; - } + let from = msg.from.unwrap(); + self.role.peer_next_index.entry(from).and_modify(|i| { + if *i > 1 { + *i -= 1 + } + }); + self.send_log(from)?; } Event::ClientRequest { id, request: Request::Query(command) } => { @@ -179,10 +130,10 @@ impl RoleNode { } Event::ClientRequest { id, request: Request::Mutate(command) } => { - let index = self.append(Some(command))?; + let index = self.propose(Some(command))?; self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?; if self.peers.is_empty() { - self.commit()?; + self.maybe_commit()?; } } @@ -234,6 +185,61 @@ impl RoleNode { // periodic heartbeats regardless of any on-demand heartbeats. Ok(()) } + + /// Proposes a command for consensus by appending it to our log and + /// replicating it to peers. If successful, it will eventually be committed + /// and applied to the state machine. + pub(super) fn propose(&mut self, command: Option>) -> Result { + let index = self.log.append(self.term, command)?; + for peer in self.peers.clone() { + self.send_log(peer)?; + } + Ok(index) + } + + /// Commits any pending log entries, if possible. + fn maybe_commit(&mut self) -> Result { + let mut last_indexes = vec![self.log.get_last_index().0]; + last_indexes.extend(self.role.peer_last_index.values()); + last_indexes.sort_unstable(); + last_indexes.reverse(); + let quorum_index = last_indexes[self.quorum() as usize - 1]; + + // We can only safely commit up to an entry from our own term, see figure 8 in Raft paper. + let (commit_index, _) = self.log.get_commit_index(); + if quorum_index > commit_index { + if let Some(entry) = self.log.get(quorum_index)? { + if entry.term == self.term { + self.log.commit(quorum_index)?; + let mut scan = self.log.scan((commit_index + 1)..=quorum_index)?; + while let Some(entry) = scan.next().transpose()? { + self.state_tx.send(Instruction::Apply { entry })?; + } + } + } + } + Ok(quorum_index) + } + + /// Sends pending log entries to a peer. + fn send_log(&mut self, peer: NodeID) -> Result<()> { + let peer_next = self + .role + .peer_next_index + .get(&peer) + .cloned() + .ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?; + let base_index = if peer_next > 0 { peer_next - 1 } else { 0 }; + let base_term = match self.log.get(base_index)? { + Some(base) => base.term, + None if base_index == 0 => 0, + None => return Err(Error::Internal(format!("Missing base entry {}", base_index))), + }; + let entries = self.log.scan(peer_next..)?.collect::>>()?; + debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer); + self.send(Address::Node(peer), Event::AppendEntries { base_index, base_term, entries })?; + Ok(()) + } } #[cfg(test)]