Skip to content

Commit

Permalink
Move Raft state de/serialization into Raft client.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 16, 2023
1 parent 099a64f commit 6d903de
Showing 1 changed file with 65 additions and 97 deletions.
162 changes: 65 additions & 97 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::error::{Error, Result};
use crate::raft;
use crate::storage::{self, bincode, mvcc::TransactionState};

use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::collections::HashSet;
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -85,18 +85,20 @@ impl Client {
futures::executor::block_on(response_rx)?
}

/// Mutates the Raft state machine.
fn mutate(&self, command: Vec<u8>) -> Result<Vec<u8>> {
match self.execute(raft::Request::Mutate(command))? {
raft::Response::State(response) => Ok(response),
/// Mutates the Raft state machine, deserializing the response into the
/// return type.
fn mutate<V: DeserializeOwned>(&self, mutation: Mutation) -> Result<V> {
match self.execute(raft::Request::Mutate(bincode::serialize(&mutation)?))? {
raft::Response::State(response) => Ok(bincode::deserialize(&response)?),
resp => Err(Error::Internal(format!("Unexpected Raft mutation response {:?}", resp))),
}
}

/// Queries the Raft state machine.
fn query(&self, command: Vec<u8>) -> Result<Vec<u8>> {
match self.execute(raft::Request::Query(command))? {
raft::Response::State(response) => Ok(response),
/// Queries the Raft state machine, deserializing the response into the
/// return type.
fn query<V: DeserializeOwned>(&self, query: Query) -> Result<V> {
match self.execute(raft::Request::Query(bincode::serialize(&query)?))? {
raft::Response::State(response) => Ok(bincode::deserialize(&response)?),
resp => Err(Error::Internal(format!("Unexpected Raft query response {:?}", resp))),
}
}
Expand Down Expand Up @@ -131,20 +133,7 @@ impl Raft {

/// Returns Raft SQL engine status.
pub fn status(&self) -> Result<Status> {
Ok(Status {
raft: self.client.status()?,
mvcc: Raft::deserialize(&self.client.query(Raft::serialize(&Query::Status)?)?)?,
})
}

/// Serializes a command for the Raft SQL state machine.
fn serialize<V: Serialize>(value: &V) -> Result<Vec<u8>> {
bincode::serialize(value)
}

/// Deserializes a command for the Raft SQL state machine.
fn deserialize<'a, V: Deserialize<'a>>(bytes: &'a [u8]) -> Result<V> {
bincode::deserialize(bytes)
Ok(Status { raft: self.client.status()?, mvcc: self.client.query(Query::Status)? })
}
}

Expand Down Expand Up @@ -174,21 +163,9 @@ pub struct Transaction {
impl Transaction {
/// Starts a transaction in the given mode.
fn begin(client: Client, read_only: bool, as_of: Option<u64>) -> Result<Self> {
let state = Raft::deserialize(
&client.mutate(Raft::serialize(&Mutation::Begin { read_only, as_of })?)?,
)?;
let state = client.mutate(Mutation::Begin { read_only, as_of })?;
Ok(Self { client, state })
}

/// Executes a mutation.
fn mutate(&self, mutation: Mutation) -> Result<Vec<u8>> {
self.client.mutate(Raft::serialize(&mutation)?)
}

/// Executes a query.
fn query(&self, query: Query) -> Result<Vec<u8>> {
self.client.query(Raft::serialize(&query)?)
}
}

impl super::Transaction for Transaction {
Expand All @@ -201,108 +178,99 @@ impl super::Transaction for Transaction {
}

fn commit(self) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Commit(self.state.clone()))?)
self.client.mutate(Mutation::Commit(self.state.clone()))
}

fn rollback(self) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Rollback(self.state.clone()))?)
self.client.mutate(Mutation::Rollback(self.state.clone()))
}

fn create(&mut self, table: &str, row: Row) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Create {
self.client.mutate(Mutation::Create {
txn: self.state.clone(),
table: table.to_string(),
row,
})?)
})
}

fn delete(&mut self, table: &str, id: &Value) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Delete {
self.client.mutate(Mutation::Delete {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
})?)
})
}

fn read(&self, table: &str, id: &Value) -> Result<Option<Row>> {
Raft::deserialize(&self.query(Query::Read {
self.client.query(Query::Read {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
})?)
})
}

fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>> {
Raft::deserialize(&self.query(Query::ReadIndex {
self.client.query(Query::ReadIndex {
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
value: value.clone(),
})?)
})
}

fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(&self.query(Query::Scan {
txn: self.state.clone(),
table: table.to_string(),
filter,
})?)?
.into_iter()
.map(Ok),
self.client
.query::<Vec<_>>(Query::Scan {
txn: self.state.clone(),
table: table.to_string(),
filter,
})?
.into_iter()
.map(Ok),
))
}

fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(&self.query(Query::ScanIndex {
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
})?)?
.into_iter()
.map(Ok),
self.client
.query::<Vec<_>>(Query::ScanIndex {
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
})?
.into_iter()
.map(Ok),
))
}

fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Update {
self.client.mutate(Mutation::Update {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
row,
})?)
})
}
}

impl Catalog for Transaction {
fn create_table(&mut self, table: Table) -> Result<()> {
Raft::deserialize(
&self.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })?,
)
self.client.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })
}

fn delete_table(&mut self, table: &str) -> Result<()> {
Raft::deserialize(
&self.mutate(Mutation::DeleteTable {
txn: self.state.clone(),
table: table.to_string(),
})?,
)
self.client
.mutate(Mutation::DeleteTable { txn: self.state.clone(), table: table.to_string() })
}

fn read_table(&self, table: &str) -> Result<Option<Table>> {
Raft::deserialize(
&self.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() })?,
)
self.client.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() })
}

fn scan_tables(&self) -> Result<Tables> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(
&self.query(Query::ScanTables { txn: self.state.clone() })?,
)?
.into_iter(),
self.client.query::<Vec<_>>(Query::ScanTables { txn: self.state.clone() })?.into_iter(),
))
}
}
Expand All @@ -321,7 +289,7 @@ impl<E: storage::engine::Engine> State<E> {
let engine = super::KV::new(engine);
let applied_index = engine
.get_metadata(b"applied_index")?
.map(|b| Raft::deserialize(&b))
.map(|b| bincode::deserialize(&b))
.unwrap_or(Ok(0))?;
Ok(State { engine, applied_index })
}
Expand All @@ -337,26 +305,26 @@ impl<E: storage::engine::Engine> State<E> {
} else {
self.engine.begin_read_only()?
};
Raft::serialize(&txn.state())
bincode::serialize(&txn.state())
}
Mutation::Commit(txn) => Raft::serialize(&self.engine.resume(txn)?.commit()?),
Mutation::Rollback(txn) => Raft::serialize(&self.engine.resume(txn)?.rollback()?),
Mutation::Commit(txn) => bincode::serialize(&self.engine.resume(txn)?.commit()?),
Mutation::Rollback(txn) => bincode::serialize(&self.engine.resume(txn)?.rollback()?),

Mutation::Create { txn, table, row } => {
Raft::serialize(&self.engine.resume(txn)?.create(&table, row)?)
bincode::serialize(&self.engine.resume(txn)?.create(&table, row)?)
}
Mutation::Delete { txn, table, id } => {
Raft::serialize(&self.engine.resume(txn)?.delete(&table, &id)?)
bincode::serialize(&self.engine.resume(txn)?.delete(&table, &id)?)
}
Mutation::Update { txn, table, id, row } => {
Raft::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?)
bincode::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?)
}

Mutation::CreateTable { txn, schema } => {
Raft::serialize(&self.engine.resume(txn)?.create_table(schema)?)
bincode::serialize(&self.engine.resume(txn)?.create_table(schema)?)
}
Mutation::DeleteTable { txn, table } => {
Raft::serialize(&self.engine.resume(txn)?.delete_table(&table)?)
bincode::serialize(&self.engine.resume(txn)?.delete_table(&table)?)
}
}
}
Expand All @@ -370,42 +338,42 @@ impl<E: storage::engine::Engine> raft::State for State<E> {
fn mutate(&mut self, index: u64, command: Vec<u8>) -> Result<Vec<u8>> {
// We don't check that index == applied_index + 1, since the Raft log commits no-op
// entries during leader election which we need to ignore.
match self.apply(Raft::deserialize(&command)?) {
match self.apply(bincode::deserialize(&command)?) {
error @ Err(Error::Internal(_)) => error,
result => {
self.engine.set_metadata(b"applied_index", Raft::serialize(&(index))?)?;
self.engine.set_metadata(b"applied_index", bincode::serialize(&(index))?)?;
self.applied_index = index;
result
}
}
}

fn query(&self, command: Vec<u8>) -> Result<Vec<u8>> {
match Raft::deserialize(&command)? {
match bincode::deserialize(&command)? {
Query::Read { txn, table, id } => {
Raft::serialize(&self.engine.resume(txn)?.read(&table, &id)?)
bincode::serialize(&self.engine.resume(txn)?.read(&table, &id)?)
}
Query::ReadIndex { txn, table, column, value } => {
Raft::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?)
bincode::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?)
}
// FIXME These need to stream rows somehow
Query::Scan { txn, table, filter } => Raft::serialize(
Query::Scan { txn, table, filter } => bincode::serialize(
&self.engine.resume(txn)?.scan(&table, filter)?.collect::<Result<Vec<_>>>()?,
),
Query::ScanIndex { txn, table, column } => Raft::serialize(
Query::ScanIndex { txn, table, column } => bincode::serialize(
&self
.engine
.resume(txn)?
.scan_index(&table, &column)?
.collect::<Result<Vec<_>>>()?,
),
Query::Status => Raft::serialize(&self.engine.kv.status()?),
Query::Status => bincode::serialize(&self.engine.kv.status()?),

Query::ReadTable { txn, table } => {
Raft::serialize(&self.engine.resume(txn)?.read_table(&table)?)
bincode::serialize(&self.engine.resume(txn)?.read_table(&table)?)
}
Query::ScanTables { txn } => {
Raft::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::<Vec<_>>())
bincode::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::<Vec<_>>())
}
}
}
Expand Down

0 comments on commit 6d903de

Please sign in to comment.