Skip to content

Commit

Permalink
Tweak Raft tick handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent 07e915e commit b44b0bf
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 37 deletions.
18 changes: 7 additions & 11 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use super::super::{Address, Event, Message};
use super::{
Follower, Leader, Node, NodeID, RoleNode, Term, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN,
};
use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RoleNode, Term, Ticks};
use crate::error::{Error, Result};

use ::log::{debug, error, info, warn};
use rand::Rng as _;

/// A candidate is campaigning to become a leader.
#[derive(Debug)]
pub struct Candidate {
/// Ticks elapsed since election start.
election_ticks: u64,
election_duration: Ticks,
/// Election timeout, in ticks.
election_timeout: u64,
election_timeout: Ticks,
/// Votes received (including ourself).
votes: u64,
}
Expand All @@ -23,9 +20,8 @@ impl Candidate {
pub fn new() -> Self {
Self {
votes: 1, // We always start with a vote for ourselves.
election_ticks: 0,
election_timeout: rand::thread_rng()
.gen_range(ELECTION_TIMEOUT_MIN..=ELECTION_TIMEOUT_MAX),
election_duration: 0,
election_timeout: rand_election_timeout(),
}
}
}
Expand Down Expand Up @@ -122,8 +118,8 @@ impl RoleNode<Candidate> {
/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
// If the election times out, start a new one for the next term.
self.role.election_ticks += 1;
if self.role.election_ticks >= self.role.election_timeout {
self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
info!("Election timed out, starting new election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
self.term += 1;
Expand Down
22 changes: 10 additions & 12 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::super::{Address, Event, Instruction, Message, RequestID, Response};
use super::{Candidate, Node, NodeID, RoleNode, Term, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN};
use super::{rand_election_timeout, Candidate, Node, NodeID, RoleNode, Term, Ticks};
use crate::error::{Error, Result};

use ::log::{debug, error, info, warn};
use rand::Rng as _;
use std::collections::HashSet;

// A follower replicates state from a leader.
Expand All @@ -12,9 +11,9 @@ pub struct Follower {
/// The leader, or None if just initialized.
leader: Option<NodeID>,
/// The number of ticks since the last message from the leader.
leader_seen_ticks: u64,
/// The timeout before triggering an election.
leader_seen_timeout: u64,
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
/// The node we voted for in the current term, if any.
voted_for: Option<NodeID>,
// Local client requests that have been forwarded to the leader. These are
Expand All @@ -28,9 +27,8 @@ impl Follower {
Self {
leader,
voted_for,
leader_seen_ticks: 0,
leader_seen_timeout: rand::thread_rng()
.gen_range(ELECTION_TIMEOUT_MIN..=ELECTION_TIMEOUT_MAX),
leader_seen: 0,
election_timeout: rand_election_timeout(),
forwarded: HashSet::new(),
}
}
Expand Down Expand Up @@ -113,7 +111,7 @@ impl RoleNode<Follower> {

// Record when we last saw a message from the leader (if any).
if self.is_leader(&msg.from) {
self.role.leader_seen_ticks = 0
self.role.leader_seen = 0
}

match msg.event {
Expand Down Expand Up @@ -230,8 +228,8 @@ impl RoleNode<Follower> {

/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
self.role.leader_seen_ticks += 1;
if self.role.leader_seen_ticks >= self.role.leader_seen_timeout {
self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
Ok(self.become_candidate()?.into())
} else {
Ok(self.into())
Expand Down Expand Up @@ -991,7 +989,7 @@ pub mod tests {
#[test]
fn tick() -> Result<()> {
let (follower, mut node_rx, mut state_rx) = setup()?;
let timeout = follower.role.leader_seen_timeout;
let timeout = follower.role.election_timeout;
let mut node = Node::Follower(follower);

// Make sure heartbeats reset election timeout
Expand Down
12 changes: 6 additions & 6 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::super::{Address, Event, Index, Instruction, Message, Request, Response, Status};
use super::{Follower, Node, NodeID, RoleNode, Term, HEARTBEAT_INTERVAL};
use super::{Follower, Node, NodeID, RoleNode, Term, Ticks, HEARTBEAT_INTERVAL};
use crate::error::{Error, Result};

use ::log::{debug, error, info};
Expand All @@ -9,7 +9,7 @@ use std::collections::HashMap;
#[derive(Debug)]
pub struct Leader {
/// Number of ticks since last heartbeat.
heartbeat_ticks: u64,
since_heartbeat: Ticks,
/// The next index to replicate to a peer.
peer_next_index: HashMap<NodeID, Index>,
/// The last index known to be replicated on a peer.
Expand All @@ -20,7 +20,7 @@ impl Leader {
/// Creates a new leader role.
pub fn new(peers: Vec<NodeID>, last_index: Index) -> Self {
let mut leader = Self {
heartbeat_ticks: 0,
since_heartbeat: 0,
peer_next_index: HashMap::new(),
peer_last_index: HashMap::new(),
};
Expand Down Expand Up @@ -217,11 +217,11 @@ impl RoleNode<Leader> {
/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
if !self.peers.is_empty() {
self.role.heartbeat_ticks += 1;
if self.role.heartbeat_ticks >= HEARTBEAT_INTERVAL {
self.role.heartbeat_ticks = 0;
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= HEARTBEAT_INTERVAL {
let (commit_index, commit_term) = self.log.get_commit_index();
self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
self.role.since_heartbeat = 0;
}
}
Ok(self.into())
Expand Down
17 changes: 12 additions & 5 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use follower::Follower;
use leader::Leader;

use ::log::{debug, info};
use rand::Rng as _;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio::sync::mpsc;
Expand All @@ -19,14 +20,20 @@ pub type NodeID = u8;
/// A leader term.
pub type Term = u64;

/// A logical clock interval as number of ticks.
pub type Ticks = u8;

/// The interval between leader heartbeats, in ticks.
const HEARTBEAT_INTERVAL: u64 = 1;
const HEARTBEAT_INTERVAL: Ticks = 3;

/// The minimum election timeout, in ticks.
const ELECTION_TIMEOUT_MIN: u64 = 8 * HEARTBEAT_INTERVAL;
/// The randomized election timeout range (min-max), in ticks. This is
/// randomized per node to avoid ties.
const ELECTION_TIMEOUT_RANGE: std::ops::Range<u8> = 10..20;

/// The maximum election timeout, in ticks.
const ELECTION_TIMEOUT_MAX: u64 = 15 * HEARTBEAT_INTERVAL;
/// Generates a randomized election timeout.
fn rand_election_timeout() -> Ticks {
rand::thread_rng().gen_range(ELECTION_TIMEOUT_RANGE)
}

/// Node status
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
7 changes: 4 additions & 3 deletions src/raft/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use tokio_stream::StreamExt as _;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use uuid::Uuid;

/// The duration of a Raft tick, the unit of time for e.g. heartbeats and elections.
const TICK: Duration = Duration::from_millis(100);
/// The interval between Raft ticks, the unit of time for e.g. heartbeats and
/// elections.
const TICK_INTERVAL: Duration = Duration::from_millis(100);

/// A Raft server.
pub struct Server {
Expand Down Expand Up @@ -71,7 +72,7 @@ impl Server {
let mut tcp_rx = UnboundedReceiverStream::new(tcp_rx);
let mut client_rx = UnboundedReceiverStream::new(client_rx);

let mut ticker = tokio::time::interval(TICK);
let mut ticker = tokio::time::interval(TICK_INTERVAL);
let mut requests = HashMap::<Vec<u8>, oneshot::Sender<Result<Response>>>::new();
loop {
tokio::select! {
Expand Down

0 comments on commit b44b0bf

Please sign in to comment.