Skip to content

Commit

Permalink
Tweak Raft leader replication logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent 52fe6a7 commit 48c5655
Showing 1 changed file with 102 additions and 89 deletions.
191 changes: 102 additions & 89 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@ use crate::error::{Error, Result};
use ::log::{debug, error, info};
use std::collections::{HashMap, HashSet};

/// Peer replication progress.
#[derive(Debug)]
struct Progress {
/// The next index to replicate to the peer.
next: Index,
/// The last index known to be replicated to the peer.
last: Index,
}

// A leader serves requests and replicates the log to followers.
#[derive(Debug)]
pub struct Leader {
/// Peer replication progress.
progress: HashMap<NodeID, Progress>,
/// Number of ticks since last periodic heartbeat.
since_heartbeat: Ticks,
/// The next index to replicate to a peer.
peer_next_index: HashMap<NodeID, Index>,
/// The last index known to be replicated on a peer.
peer_last_index: HashMap<NodeID, Index>,
}

impl Leader {
/// Creates a new leader role.
pub fn new(peers: HashSet<NodeID>, last_index: Index) -> Self {
let mut leader = Self {
since_heartbeat: 0,
peer_next_index: HashMap::new(),
peer_last_index: HashMap::new(),
};
for peer in peers {
leader.peer_next_index.insert(peer, last_index + 1);
leader.peer_last_index.insert(peer, 0);
}
leader
let next = last_index + 1;
let progress = peers.into_iter().map(|p| (p, Progress { next, last: 0 })).collect();
Self { progress, since_heartbeat: 0 }
}
}

Expand Down Expand Up @@ -90,22 +90,33 @@ impl RoleNode<Leader> {
}
}

// A follower appended log entries we sent it.
// A follower appended log entries we sent it. Record its progress
// and attempt to commit new entries.
Event::AcceptEntries { last_index } => {
assert!(
last_index <= self.log.get_last_index().0,
"Follower accepted entries after last index"
);

let from = msg.from.unwrap();
self.role.peer_last_index.insert(from, last_index);
self.role.peer_next_index.insert(from, last_index + 1);
self.role.progress.entry(from).and_modify(|p| {
p.last = last_index;
p.next = last_index + 1;
});
self.maybe_commit()?;
}

// A follower rejected log entries we sent it, typically because it
// does not have the base index in its log. Try from the previous
// entry.
// does not have the base index in its log. Try to replicate from
// the previous entry.
//
// This linear probing, as described in the Raft paper, can be very
// slow with long divergent logs, but we keep it simple.
Event::RejectEntries => {
let from = msg.from.unwrap();
self.role.peer_next_index.entry(from).and_modify(|i| {
if *i > 1 {
*i -= 1
self.role.progress.entry(from).and_modify(|p| {
if p.next > 1 {
p.next -= 1
}
});
self.send_log(from)?;
Expand Down Expand Up @@ -139,17 +150,22 @@ impl RoleNode<Leader> {

Event::ClientRequest { id, request: Request::Status } => {
let engine_status = self.log.status()?;
let mut status = Box::new(Status {
let status = Box::new(Status {
server: self.id,
leader: self.id,
term: self.term,
node_last_index: self.role.peer_last_index.clone(),
node_last_index: self
.role
.progress
.iter()
.map(|(id, p)| (*id, p.last))
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
.collect(),
commit_index: self.log.get_commit_index().0,
apply_index: 0,
storage: engine_status.name.clone(),
storage_size: engine_status.size,
});
status.node_last_index.insert(self.id, self.log.get_last_index().0);
self.state_tx.send(Instruction::Status { id, address: msg.from, status })?
}

Expand Down Expand Up @@ -197,45 +213,68 @@ impl RoleNode<Leader> {
Ok(index)
}

/// Commits any pending log entries, if possible.
/// Commits any new log entries that have been replicated to a quorum,
/// and schedules them for state machine application.
fn maybe_commit(&mut self) -> Result<Index> {
let mut last_indexes = vec![self.log.get_last_index().0];
last_indexes.extend(self.role.peer_last_index.values());
// Determine the new commit index, i.e. the last index replicated to a
// quorum of peers.
let mut last_indexes = self
.role
.progress
.values()
.map(|p| p.last)
.chain(std::iter::once(self.log.get_last_index().0))
.collect::<Vec<_>>();
last_indexes.sort_unstable();
last_indexes.reverse();
let quorum_index = last_indexes[self.quorum() as usize - 1];

// We can only safely commit up to an entry from our own term, see figure 8 in Raft paper.
let (commit_index, _) = self.log.get_commit_index();
if quorum_index > commit_index {
if let Some(entry) = self.log.get(quorum_index)? {
if entry.term == self.term {
self.log.commit(quorum_index)?;
let mut scan = self.log.scan((commit_index + 1)..=quorum_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
let commit_index = last_indexes[self.quorum() as usize - 1];

// A 0 commit index means we haven't committed anything yet.
if commit_index == 0 {
return Ok(commit_index);
}

// Make sure the commit index does not regress.
let (prev_commit_index, _) = self.log.get_commit_index();
assert!(
commit_index >= prev_commit_index,
"Commit index regression {} -> {}",
prev_commit_index,
commit_index
);

// We can only safely commit up to an entry from our own term, see
// figure 8 in Raft paper.
match self.log.get(commit_index)? {
Some(entry) if entry.term == self.term => {}
Some(_) => return Ok(prev_commit_index),
None => return Err(Error::Internal(format!("Commit index {} missing", commit_index))),
};

// Commit and apply the new entries.
if commit_index > prev_commit_index {
self.log.commit(commit_index)?;
// TODO: Move application elsewhere, but needs access to applied index.
let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
Ok(quorum_index)
Ok(commit_index)
}

/// Sends pending log entries to a peer.
fn send_log(&mut self, peer: NodeID) -> Result<()> {
let peer_next = self
.role
.peer_next_index
.get(&peer)
.cloned()
.ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?;
let base_index = if peer_next > 0 { peer_next - 1 } else { 0 };
let base_term = match self.log.get(base_index)? {
Some(base) => base.term,
None if base_index == 0 => 0,
None => return Err(Error::Internal(format!("Missing base entry {}", base_index))),
let (base_index, base_term) = match self.role.progress.get(&peer) {
Some(Progress { next, .. }) if *next > 1 => match self.log.get(next - 1)? {
Some(entry) => (entry.index, entry.term),
None => return Err(Error::Internal(format!("Missing base entry {}", next - 1))),
},
Some(_) => (0, 0),
None => return Err(Error::Internal(format!("Unknown peer {}", peer))),
};
let entries = self.log.scan(peer_next..)?.collect::<Result<Vec<_>>>()?;

let entries = self.log.scan((base_index + 1)..)?.collect::<Result<Vec<_>>>()?;
debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer);
self.send(Address::Node(peer), Event::AppendEntries { base_index, base_term, entries })?;
Ok(())
Expand Down Expand Up @@ -486,44 +525,18 @@ mod tests {
}

#[test]
// AcceptEntries quorum for missing future entry
fn step_acceptentries_future_index() -> Result<()> {
let (leader, mut node_rx, mut state_rx) = setup()?;
let peers = leader.peers.clone();
let mut node: Node = leader.into();

for (i, peer) in peers.into_iter().enumerate() {
node = node.step(Message {
from: Address::Node(peer),
#[should_panic(expected = "Follower accepted entries after last index")]
// AcceptEntries panics if last_index is beyond leader's log.
fn step_acceptentries_future_index() {
let (leader, _, _) = setup().unwrap();
leader
.step(Message {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::AcceptEntries { last_index: 7 },
})?;
// The local leader will cast a vote to commit 5, thus when we have votes 2x7, 1x5, 2x0
// we will commit index 5. However, we will correctly ignore the following votes for7.
let c = if i == 0 { 2 } else { 5 };
assert_node(&mut node).is_leader().term(3).committed(c).last(5);
assert_messages(&mut node_rx, vec![]);
if i == 1 {
assert_messages(
&mut state_rx,
vec![
Instruction::Apply {
entry: Entry { index: 3, term: 2, command: Some(vec![0x03]) },
},
Instruction::Apply {
entry: Entry { index: 4, term: 3, command: Some(vec![0x04]) },
},
Instruction::Apply {
entry: Entry { index: 5, term: 3, command: Some(vec![0x05]) },
},
],
);
} else {
assert_messages(&mut state_rx, vec![]);
}
}
Ok(())
})
.unwrap();
}

#[test]
Expand Down

0 comments on commit 48c5655

Please sign in to comment.