Skip to content

Commit

Permalink
Apply noop commands to state machine.
Browse files Browse the repository at this point in the history
This simplifies applied_index tracking.
  • Loading branch information
erikgrinaker committed Nov 18, 2023
1 parent 411a01d commit afb320a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 61 deletions.
23 changes: 9 additions & 14 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,23 +479,18 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn new_state_apply_missing() -> Result<()> {
#[should_panic(expected = "applied index above commit index")]
async fn new_state_apply_missing() {
let (node_tx, _) = mpsc::unbounded_channel();
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
log.append(1, Some(vec![0x01]))?;
log.append(2, None)?;
log.append(2, Some(vec![0x02]))?;
log.commit(3)?;
log.append(2, Some(vec![0x03]))?;
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false).unwrap();
log.append(1, Some(vec![0x01])).unwrap();
log.append(2, None).unwrap();
log.append(2, Some(vec![0x02])).unwrap();
log.commit(3).unwrap();
log.append(2, Some(vec![0x03])).unwrap();
let state = Box::new(TestState::new(4));

assert_eq!(
Node::new(1, vec![2, 3], log, state.clone(), node_tx).await.err(),
Some(Error::Internal(
"State machine applied index 4 greater than log commit index 3".into()
))
);
Ok(())
Node::new(1, vec![2, 3], log, state.clone(), node_tx).await.unwrap();
}

#[tokio::test]
Expand Down
61 changes: 28 additions & 33 deletions src/raft/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,16 @@ pub trait State: Send {
/// Returns the last applied index from the state machine.
fn get_applied_index(&self) -> Index;

/// Mutates the state machine. If the state machine returns Error::Internal, the Raft node
/// halts. For any other error, the state is applied and the error propagated to the caller.
fn mutate(&mut self, index: Index, command: Vec<u8>) -> Result<Vec<u8>>;
/// Applies a log entry to the state machine. If it returns Error::Internal,
/// the Raft node halts. Any other error is considered applied and returned
/// to the caller.
///
/// The entry may contain a noop command, which is committed by Raft during
/// leader changes. This still needs to be applied to the state machine to
/// properly track the applied index, and returns an empty result.
///
/// TODO: consider using runtime assertions instead of Error::Internal.
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>>;

/// Queries the state machine. All errors are propagated to the caller.
fn query(&self, command: Vec<u8>) -> Result<Vec<u8>>;
Expand Down Expand Up @@ -51,7 +58,6 @@ struct Query {
pub struct Driver {
state_rx: UnboundedReceiverStream<Instruction>,
node_tx: mpsc::UnboundedSender<Message>,
applied_index: Index,
/// Notify clients when their mutation is applied. <index, (client, id)>
notify: HashMap<Index, (Address, Vec<u8>)>,
/// Execute client queries when they receive a quorum. <index, <id, query>>
Expand All @@ -67,16 +73,14 @@ impl Driver {
Self {
state_rx: UnboundedReceiverStream::new(state_rx),
node_tx,
applied_index: 0,
notify: HashMap::new(),
queries: BTreeMap::new(),
}
}

/// Drives a state machine.
pub async fn drive(mut self, mut state: Box<dyn State>) -> Result<()> {
self.applied_index = state.get_applied_index();
debug!("Starting state machine driver at applied index {}", self.applied_index);
debug!("Starting state machine driver at applied index {}", state.get_applied_index());
while let Some(instruction) = self.state_rx.next().await {
if let Err(error) = self.execute(instruction, &mut *state).await {
error!("Halting state machine due to error: {}", error);
Expand All @@ -91,40 +95,29 @@ impl Driver {
pub fn apply_log(&mut self, state: &mut dyn State, log: &mut Log) -> Result<Index> {
let applied_index = state.get_applied_index();
let (commit_index, _) = log.get_commit_index();
if applied_index > commit_index {
return Err(Error::Internal(format!(
"State machine applied index {} greater than log commit index {}",
applied_index, commit_index
)));
}
assert!(applied_index <= commit_index, "applied index above commit index");

if applied_index < commit_index {
let mut scan = log.scan((applied_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.apply(state, entry)?;
}
}
Ok(self.applied_index)
Ok(state.get_applied_index())
}

/// Applies an entry to the state machine.
pub fn apply(&mut self, state: &mut dyn State, entry: Entry) -> Result<Index> {
// Apply the command, unless it's a noop.
// Apply the command.
debug!("Applying {:?}", entry);
if let Some(command) = entry.command {
match state.mutate(entry.index, command) {
Err(error @ Error::Internal(_)) => return Err(error),
result => self.notify_applied(entry.index, result)?,
};
}
// We have to track applied_index here, separately from the state machine, because
// no-op log entries are significant for whether a query should be executed.
//
// TODO: track noop commands in the state machine.
self.applied_index = entry.index;
match state.apply(entry) {
Err(error @ Error::Internal(_)) => return Err(error),
result => self.notify_applied(state.get_applied_index(), result)?,
};
// Try to execute any pending queries, since they may have been submitted for a
// commit_index which hadn't been applied yet.
self.query_execute(state)?;
Ok(self.applied_index)
Ok(state.get_applied_index())
}

/// Executes a state machine instruction.
Expand Down Expand Up @@ -202,7 +195,7 @@ impl Driver {

/// Executes any queries that are ready.
fn query_execute(&mut self, state: &mut dyn State) -> Result<()> {
for query in self.query_ready(self.applied_index) {
for query in self.query_ready(state.get_applied_index()) {
debug!("Executing query {:?}", query.command);
let result = state.query(query.command);
if let Err(error @ Error::Internal(_)) = result {
Expand Down Expand Up @@ -291,11 +284,13 @@ pub mod tests {
*self.applied_index.lock().unwrap()
}

// Appends the command to the internal commands list.
fn mutate(&mut self, index: Index, command: Vec<u8>) -> Result<Vec<u8>> {
self.commands.lock()?.push(command.clone());
*self.applied_index.lock()? = index;
Ok(command)
// Appends the entry to the internal command list.
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
if let Some(command) = &entry.command {
self.commands.lock()?.push(command.clone());
}
*self.applied_index.lock()? = entry.index;
Ok(entry.command.unwrap_or_default())
}

// Appends the command to the internal commands list.
Expand Down
30 changes: 16 additions & 14 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::super::schema::{Catalog, Table, Tables};
use super::super::types::{Expression, Row, Value};
use super::{Engine as _, IndexScan, Scan, Transaction as _};
use crate::error::{Error, Result};
use crate::raft;
use crate::raft::{self, Entry};
use crate::storage::{self, bincode, mvcc::TransactionState};

use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand Down Expand Up @@ -294,8 +294,8 @@ impl<E: storage::engine::Engine> State<E> {
Ok(State { engine, applied_index })
}

/// Applies a state machine mutation
fn apply(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
/// Mutates the state machine.
fn mutate(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
match mutation {
Mutation::Begin { read_only, as_of } => {
let txn = if !read_only {
Expand Down Expand Up @@ -335,17 +335,19 @@ impl<E: storage::engine::Engine> raft::State for State<E> {
self.applied_index
}

fn mutate(&mut self, index: u64, command: Vec<u8>) -> Result<Vec<u8>> {
// We don't check that index == applied_index + 1, since the Raft log commits no-op
// entries during leader election which we need to ignore.
match self.apply(bincode::deserialize(&command)?) {
error @ Err(Error::Internal(_)) => error,
result => {
self.engine.set_metadata(b"applied_index", bincode::serialize(&(index))?)?;
self.applied_index = index;
result
}
}
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
assert_eq!(entry.index, self.applied_index + 1, "entry index not after applied index");

let result = match &entry.command {
Some(command) => match self.mutate(bincode::deserialize(command)?) {
error @ Err(Error::Internal(_)) => return error, // don't record as applied
result => result,
},
None => Ok(Vec::new()),
};
self.applied_index = entry.index;
self.engine.set_metadata(b"applied_index", bincode::serialize(&entry.index)?)?;
result
}

fn query(&self, command: Vec<u8>) -> Result<Vec<u8>> {
Expand Down

0 comments on commit afb320a

Please sign in to comment.