diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index 9097ff49e..99dd9efc4 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -1,6 +1,7 @@ use super::super::{Address, Event, Message}; use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RawNode, Role, Term, Ticks}; use crate::error::{Error, Result}; +use crate::raft::Index; use ::log::{debug, info}; use std::collections::HashSet; @@ -150,6 +151,10 @@ impl RawNode { Ok(self.into()) } + pub(super) fn applied(&mut self, index: Index, result: Result>) -> Result<()> { + Ok(()) + } + /// Campaign for leadership by increasing the term, voting for ourself, and /// soliciting votes from all peers. pub(super) fn campaign(&mut self) -> Result<()> { @@ -168,6 +173,7 @@ impl RawNode { #[cfg(test)] mod tests { + use super::super::super::state::tests::TestState; use super::super::super::{Entry, Instruction, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; @@ -182,7 +188,9 @@ mod tests { )> { let (node_tx, node_rx) = mpsc::unbounded_channel(); let (state_tx, state_rx) = mpsc::unbounded_channel(); + let state = Box::new(TestState::new(0)); let mut log = Log::new(storage::engine::Memory::new(), false)?; + log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; log.append(2, Some(vec![0x03]))?; @@ -194,8 +202,8 @@ mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 3, log, + state, node_tx, - state_tx, role: Candidate::new(), }; node.role.votes.insert(1); diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index be3cdc7b4..7edb8dea0 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -1,6 +1,7 @@ -use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response}; +use super::super::{Address, Event, Log, Message, RequestID, Response, State}; use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks}; use crate::error::{Error, Result}; +use crate::raft::Index; use ::log::{debug, info}; use std::collections::HashSet; @@ -43,12 +44,12 @@ impl RawNode { id: NodeID, peers: HashSet, mut log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, ) -> Result { let (term, voted_for) = log.get_term()?; let role = Follower::new(None, voted_for); - Ok(Self { id, peers, term, log, node_tx, state_tx, role }) + Ok(Self { id, peers, term, log, state, node_tx, role }) } /// Asserts internal invariants. @@ -150,20 +151,19 @@ impl RawNode { None => self = self.into_follower(Some(from), msg.term)?, } - // Advance commit index and apply entries if possible. + // Respond to the heartbeat. let has_committed = self.log.has(commit_index, commit_term)?; - let (old_commit_index, _) = self.log.get_commit_index(); - if has_committed && commit_index > old_commit_index { - self.log.commit(commit_index)?; - let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?; - while let Some(entry) = scan.next().transpose()? { - self.state_tx.send(Instruction::Apply { entry })?; - } - } self.send( msg.from, Event::ConfirmLeader { commit_index, has_committed, read_seq }, )?; + + // Advance commit index and apply entries if possible. + let (old_commit_index, _) = self.log.get_commit_index(); + if has_committed && commit_index > old_commit_index { + self.log.commit(commit_index)?; + } + self.maybe_apply()?; } // Replicate entries from the leader. If we don't have a leader in @@ -249,6 +249,11 @@ impl RawNode { Ok(self.into()) } + /// When an entry is applied, don't do anything. + pub(super) fn applied(&mut self, index: Index, result: Result>) -> Result<()> { + Ok(()) + } + /// Processes a logical clock tick. pub fn tick(mut self) -> Result { self.assert()?; @@ -281,7 +286,9 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO pub mod tests { + use super::super::super::state::tests::TestState; use super::super::super::{Entry, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; @@ -624,7 +631,6 @@ pub mod tests { fn step_appendentries_base0() -> Result<()> { // TODO: Move this into a setup function. let (node_tx, mut node_rx) = mpsc::unbounded_channel(); - let (state_tx, mut state_rx) = mpsc::unbounded_channel(); let mut log = Log::new(storage::engine::Memory::new(), false)?; log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; @@ -636,8 +642,8 @@ pub mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 1, log, + state: Box::new(TestState::new(0)), node_tx, - state_tx, role: Follower::new(Some(2), None), }; @@ -667,7 +673,6 @@ pub mod tests { event: Event::AcceptEntries { last_index: 2 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index 5c6d19684..411875bd3 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -273,6 +273,10 @@ impl RawNode { Ok(self.into()) } + pub(super) fn applied(&mut self, index: Index, result: Result>) -> Result<()> { + Ok(()) + } + /// Broadcasts a heartbeat to all peers. pub(super) fn heartbeat(&mut self) -> Result<()> { let (commit_index, commit_term) = self.log.get_commit_index(); @@ -392,6 +396,7 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO mod tests { use super::super::super::{Entry, Log}; use super::super::tests::{assert_messages, assert_node}; diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 4a8e3eb56..d3e548f05 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -2,8 +2,8 @@ mod candidate; mod follower; mod leader; -use super::{Address, Driver, Event, Index, Instruction, Log, Message, State}; -use crate::error::Result; +use super::{Address, Event, Index, Log, Message, State}; +use crate::error::{Error, Result}; use candidate::Candidate; use follower::Follower; use leader::Leader; @@ -68,16 +68,11 @@ impl Node { pub async fn new( id: NodeID, peers: HashSet, - mut log: Log, - mut state: Box, + log: Log, + state: Box, node_tx: mpsc::UnboundedSender, ) -> Result { - let (state_tx, state_rx) = mpsc::unbounded_channel(); - let mut driver = Driver::new(id, state_rx, node_tx.clone()); - driver.apply_log(&mut *state, &mut log)?; - tokio::spawn(driver.drive(state)); - - let node = RawNode::new(id, peers, log, node_tx, state_tx)?; + let node = RawNode::new(id, peers, log, state, node_tx)?; if node.peers.is_empty() { // If there are no peers, become leader immediately. return Ok(node.into_candidate()?.into_leader()?.into()); @@ -144,8 +139,8 @@ pub struct RawNode { peers: HashSet, term: Term, log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, role: R, } @@ -157,12 +152,34 @@ impl RawNode { peers: self.peers, term: self.term, log: self.log, + state: self.state, node_tx: self.node_tx, - state_tx: self.state_tx, role, } } + /// Applies committed log entries to the state machine. Returns the applied + /// index if any new entries were advanced, or None if there were no new + /// entries to apply. + fn maybe_apply(&mut self) -> Result> { + let applied_index = self.state.get_applied_index(); + let (commit_index, _) = self.log.get_commit_index(); + assert!(applied_index <= commit_index, "applied index above commit index"); + if applied_index >= commit_index { + return Ok(None); + } + + let mut scan = self.log.scan((applied_index + 1)..=commit_index)?; + while let Some(entry) = scan.next().transpose()? { + debug!("Applying {:?}", entry); + match self.state.apply(entry) { + Err(error @ Error::Internal(_)) => return Err(error), + result => self.role.applied(entry.index, result)?, + }; + } + Ok(Some(self.state.get_applied_index())) + } + /// Returns the quorum size of the cluster. fn quorum(&self) -> u64 { (self.peers.len() as u64 + 1) / 2 + 1 @@ -382,15 +399,14 @@ mod tests { peers: Vec, ) -> Result<(RawNode, mpsc::UnboundedReceiver)> { let (node_tx, node_rx) = mpsc::unbounded_channel(); - let (state_tx, _) = mpsc::unbounded_channel(); let node = RawNode { role: Follower::new(None, None), id: 1, peers: HashSet::from_iter(peers), term: 1, log: Log::new(storage::engine::Memory::new(), false)?, + state: Box::new(TestState::new(0)), node_tx, - state_tx, }; Ok((node, node_rx)) }