diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index 2b8f4bb46..364237e89 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -1,19 +1,16 @@ use super::super::{Address, Event, Message}; -use super::{ - Follower, Leader, Node, NodeID, RoleNode, Term, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN, -}; +use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RoleNode, Term, Ticks}; use crate::error::{Error, Result}; use ::log::{debug, error, info, warn}; -use rand::Rng as _; /// A candidate is campaigning to become a leader. #[derive(Debug)] pub struct Candidate { /// Ticks elapsed since election start. - election_ticks: u64, + election_duration: Ticks, /// Election timeout, in ticks. - election_timeout: u64, + election_timeout: Ticks, /// Votes received (including ourself). votes: u64, } @@ -23,9 +20,8 @@ impl Candidate { pub fn new() -> Self { Self { votes: 1, // We always start with a vote for ourselves. - election_ticks: 0, - election_timeout: rand::thread_rng() - .gen_range(ELECTION_TIMEOUT_MIN..=ELECTION_TIMEOUT_MAX), + election_duration: 0, + election_timeout: rand_election_timeout(), } } } @@ -122,8 +118,8 @@ impl RoleNode { /// Processes a logical clock tick. pub fn tick(mut self) -> Result { // If the election times out, start a new one for the next term. - self.role.election_ticks += 1; - if self.role.election_ticks >= self.role.election_timeout { + self.role.election_duration += 1; + if self.role.election_duration >= self.role.election_timeout { info!("Election timed out, starting new election for term {}", self.term + 1); let (last_index, last_term) = self.log.get_last_index(); self.term += 1; diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index 6bf757d3e..242dd6c43 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -1,9 +1,8 @@ use super::super::{Address, Event, Instruction, Message, RequestID, Response}; -use super::{Candidate, Node, NodeID, RoleNode, Term, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN}; +use super::{rand_election_timeout, Candidate, Node, NodeID, RoleNode, Term, Ticks}; use crate::error::{Error, Result}; use ::log::{debug, error, info, warn}; -use rand::Rng as _; use std::collections::HashSet; // A follower replicates state from a leader. @@ -12,9 +11,9 @@ pub struct Follower { /// The leader, or None if just initialized. leader: Option, /// The number of ticks since the last message from the leader. - leader_seen_ticks: u64, - /// The timeout before triggering an election. - leader_seen_timeout: u64, + leader_seen: Ticks, + /// The leader_seen timeout before triggering an election. + election_timeout: Ticks, /// The node we voted for in the current term, if any. voted_for: Option, // Local client requests that have been forwarded to the leader. These are @@ -28,9 +27,8 @@ impl Follower { Self { leader, voted_for, - leader_seen_ticks: 0, - leader_seen_timeout: rand::thread_rng() - .gen_range(ELECTION_TIMEOUT_MIN..=ELECTION_TIMEOUT_MAX), + leader_seen: 0, + election_timeout: rand_election_timeout(), forwarded: HashSet::new(), } } @@ -113,7 +111,7 @@ impl RoleNode { // Record when we last saw a message from the leader (if any). if self.is_leader(&msg.from) { - self.role.leader_seen_ticks = 0 + self.role.leader_seen = 0 } match msg.event { @@ -230,8 +228,8 @@ impl RoleNode { /// Processes a logical clock tick. pub fn tick(mut self) -> Result { - self.role.leader_seen_ticks += 1; - if self.role.leader_seen_ticks >= self.role.leader_seen_timeout { + self.role.leader_seen += 1; + if self.role.leader_seen >= self.role.election_timeout { Ok(self.become_candidate()?.into()) } else { Ok(self.into()) @@ -991,7 +989,7 @@ pub mod tests { #[test] fn tick() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; - let timeout = follower.role.leader_seen_timeout; + let timeout = follower.role.election_timeout; let mut node = Node::Follower(follower); // Make sure heartbeats reset election timeout diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index fe1ca136b..d6ad0f4c1 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -1,5 +1,5 @@ use super::super::{Address, Event, Index, Instruction, Message, Request, Response, Status}; -use super::{Follower, Node, NodeID, RoleNode, Term, HEARTBEAT_INTERVAL}; +use super::{Follower, Node, NodeID, RoleNode, Term, Ticks, HEARTBEAT_INTERVAL}; use crate::error::{Error, Result}; use ::log::{debug, error, info}; @@ -9,7 +9,7 @@ use std::collections::HashMap; #[derive(Debug)] pub struct Leader { /// Number of ticks since last heartbeat. - heartbeat_ticks: u64, + since_heartbeat: Ticks, /// The next index to replicate to a peer. peer_next_index: HashMap, /// The last index known to be replicated on a peer. @@ -20,7 +20,7 @@ impl Leader { /// Creates a new leader role. pub fn new(peers: Vec, last_index: Index) -> Self { let mut leader = Self { - heartbeat_ticks: 0, + since_heartbeat: 0, peer_next_index: HashMap::new(), peer_last_index: HashMap::new(), }; @@ -217,11 +217,11 @@ impl RoleNode { /// Processes a logical clock tick. pub fn tick(mut self) -> Result { if !self.peers.is_empty() { - self.role.heartbeat_ticks += 1; - if self.role.heartbeat_ticks >= HEARTBEAT_INTERVAL { - self.role.heartbeat_ticks = 0; + self.role.since_heartbeat += 1; + if self.role.since_heartbeat >= HEARTBEAT_INTERVAL { let (commit_index, commit_term) = self.log.get_commit_index(); self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?; + self.role.since_heartbeat = 0; } } Ok(self.into()) diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 0d35c4802..62ff119c3 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -9,6 +9,7 @@ use follower::Follower; use leader::Leader; use ::log::{debug, info}; +use rand::Rng as _; use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::mpsc; @@ -19,14 +20,20 @@ pub type NodeID = u8; /// A leader term. pub type Term = u64; +/// A logical clock interval as number of ticks. +pub type Ticks = u8; + /// The interval between leader heartbeats, in ticks. -const HEARTBEAT_INTERVAL: u64 = 1; +const HEARTBEAT_INTERVAL: Ticks = 3; -/// The minimum election timeout, in ticks. -const ELECTION_TIMEOUT_MIN: u64 = 8 * HEARTBEAT_INTERVAL; +/// The randomized election timeout range (min-max), in ticks. This is +/// randomized per node to avoid ties. +const ELECTION_TIMEOUT_RANGE: std::ops::Range = 10..20; -/// The maximum election timeout, in ticks. -const ELECTION_TIMEOUT_MAX: u64 = 15 * HEARTBEAT_INTERVAL; +/// Generates a randomized election timeout. +fn rand_election_timeout() -> Ticks { + rand::thread_rng().gen_range(ELECTION_TIMEOUT_RANGE) +} /// Node status #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/raft/server.rs b/src/raft/server.rs index 17e1961ac..4a7238848 100644 --- a/src/raft/server.rs +++ b/src/raft/server.rs @@ -12,8 +12,9 @@ use tokio_stream::StreamExt as _; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use uuid::Uuid; -/// The duration of a Raft tick, the unit of time for e.g. heartbeats and elections. -const TICK: Duration = Duration::from_millis(100); +/// The interval between Raft ticks, the unit of time for e.g. heartbeats and +/// elections. +const TICK_INTERVAL: Duration = Duration::from_millis(100); /// A Raft server. pub struct Server { @@ -71,7 +72,7 @@ impl Server { let mut tcp_rx = UnboundedReceiverStream::new(tcp_rx); let mut client_rx = UnboundedReceiverStream::new(client_rx); - let mut ticker = tokio::time::interval(TICK); + let mut ticker = tokio::time::interval(TICK_INTERVAL); let mut requests = HashMap::, oneshot::Sender>>::new(); loop { tokio::select! {