Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 7, 2024
1 parent 5bc600d commit cd2c800
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 30 deletions.
10 changes: 9 additions & 1 deletion src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::super::{Address, Event, Message};
use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RawNode, Role, Term, Ticks};
use crate::error::{Error, Result};
use crate::raft::Index;

use ::log::{debug, info};
use std::collections::HashSet;
Expand Down Expand Up @@ -150,6 +151,10 @@ impl RawNode<Candidate> {
Ok(self.into())
}

pub(super) fn applied(&mut self, index: Index, result: Result<Vec<u8>>) -> Result<()> {
Ok(())
}

/// Campaign for leadership by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
pub(super) fn campaign(&mut self) -> Result<()> {
Expand All @@ -168,6 +173,7 @@ impl RawNode<Candidate> {

#[cfg(test)]
mod tests {
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Instruction, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
Expand All @@ -182,7 +188,9 @@ mod tests {
)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = mpsc::unbounded_channel();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::engine::Memory::new(), false)?;

log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
Expand All @@ -194,8 +202,8 @@ mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 3,
log,
state,
node_tx,
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Expand Down
35 changes: 20 additions & 15 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
use crate::error::{Error, Result};
use crate::raft::Index;

use ::log::{debug, info};
use std::collections::HashSet;
Expand Down Expand Up @@ -43,12 +44,12 @@ impl RawNode<Follower> {
id: NodeID,
peers: HashSet<NodeID>,
mut log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
state_tx: mpsc::UnboundedSender<Instruction>,
) -> Result<Self> {
let (term, voted_for) = log.get_term()?;
let role = Follower::new(None, voted_for);
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
Ok(Self { id, peers, term, log, state, node_tx, role })
}

/// Asserts internal invariants.
Expand Down Expand Up @@ -150,20 +151,19 @@ impl RawNode<Follower> {
None => self = self.into_follower(Some(from), msg.term)?,
}

// Advance commit index and apply entries if possible.
// Respond to the heartbeat.
let has_committed = self.log.has(commit_index, commit_term)?;
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
self.send(
msg.from,
Event::ConfirmLeader { commit_index, has_committed, read_seq },
)?;

// Advance commit index and apply entries if possible.
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
}
self.maybe_apply()?;
}

// Replicate entries from the leader. If we don't have a leader in
Expand Down Expand Up @@ -249,6 +249,11 @@ impl RawNode<Follower> {
Ok(self.into())
}

/// When an entry is applied, don't do anything.
pub(super) fn applied(&mut self, index: Index, result: Result<Vec<u8>>) -> Result<()> {
Ok(())
}

/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
self.assert()?;
Expand Down Expand Up @@ -281,7 +286,9 @@ impl RawNode<Follower> {
}

#[cfg(test)]
#[cfg(never)] // TODO
pub mod tests {
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
Expand Down Expand Up @@ -624,7 +631,6 @@ pub mod tests {
fn step_appendentries_base0() -> Result<()> {
// TODO: Move this into a setup function.
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
let mut log = Log::new(storage::engine::Memory::new(), false)?;
log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
Expand All @@ -636,8 +642,8 @@ pub mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 1,
log,
state: Box::new(TestState::new(0)),
node_tx,
state_tx,
role: Follower::new(Some(2), None),
};

Expand Down Expand Up @@ -667,7 +673,6 @@ pub mod tests {
event: Event::AcceptEntries { last_index: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ impl RawNode<Leader> {
Ok(self.into())
}

pub(super) fn applied(&mut self, index: Index, result: Result<Vec<u8>>) -> Result<()> {
Ok(())
}

/// Broadcasts a heartbeat to all peers.
pub(super) fn heartbeat(&mut self) -> Result<()> {
let (commit_index, commit_term) = self.log.get_commit_index();
Expand Down Expand Up @@ -392,6 +396,7 @@ impl RawNode<Leader> {
}

#[cfg(test)]
#[cfg(never)] // TODO
mod tests {
use super::super::super::{Entry, Log};
use super::super::tests::{assert_messages, assert_node};
Expand Down
44 changes: 30 additions & 14 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ mod candidate;
mod follower;
mod leader;

use super::{Address, Driver, Event, Index, Instruction, Log, Message, State};
use crate::error::Result;
use super::{Address, Event, Index, Log, Message, State};
use crate::error::{Error, Result};
use candidate::Candidate;
use follower::Follower;
use leader::Leader;
Expand Down Expand Up @@ -68,16 +68,11 @@ impl Node {
pub async fn new(
id: NodeID,
peers: HashSet<NodeID>,
mut log: Log,
mut state: Box<dyn State>,
log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
) -> Result<Self> {
let (state_tx, state_rx) = mpsc::unbounded_channel();
let mut driver = Driver::new(id, state_rx, node_tx.clone());
driver.apply_log(&mut *state, &mut log)?;
tokio::spawn(driver.drive(state));

let node = RawNode::new(id, peers, log, node_tx, state_tx)?;
let node = RawNode::new(id, peers, log, state, node_tx)?;
if node.peers.is_empty() {
// If there are no peers, become leader immediately.
return Ok(node.into_candidate()?.into_leader()?.into());
Expand Down Expand Up @@ -144,8 +139,8 @@ pub struct RawNode<R: Role = Follower> {
peers: HashSet<NodeID>,
term: Term,
log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
state_tx: mpsc::UnboundedSender<Instruction>,
role: R,
}

Expand All @@ -157,12 +152,34 @@ impl<R: Role> RawNode<R> {
peers: self.peers,
term: self.term,
log: self.log,
state: self.state,
node_tx: self.node_tx,
state_tx: self.state_tx,
role,
}
}

/// Applies committed log entries to the state machine. Returns the applied
/// index if any new entries were advanced, or None if there were no new
/// entries to apply.
fn maybe_apply(&mut self) -> Result<Option<Index>> {
let applied_index = self.state.get_applied_index();
let (commit_index, _) = self.log.get_commit_index();
assert!(applied_index <= commit_index, "applied index above commit index");
if applied_index >= commit_index {
return Ok(None);
}

let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
debug!("Applying {:?}", entry);
match self.state.apply(entry) {
Err(error @ Error::Internal(_)) => return Err(error),
result => self.role.applied(entry.index, result)?,
};
}
Ok(Some(self.state.get_applied_index()))
}

/// Returns the quorum size of the cluster.
fn quorum(&self) -> u64 {
(self.peers.len() as u64 + 1) / 2 + 1
Expand Down Expand Up @@ -382,15 +399,14 @@ mod tests {
peers: Vec<NodeID>,
) -> Result<(RawNode<Follower>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, _) = mpsc::unbounded_channel();
let node = RawNode {
role: Follower::new(None, None),
id: 1,
peers: HashSet::from_iter(peers),
term: 1,
log: Log::new(storage::engine::Memory::new(), false)?,
state: Box::new(TestState::new(0)),
node_tx,
state_tx,
};
Ok((node, node_rx))
}
Expand Down

0 comments on commit cd2c800

Please sign in to comment.