From 6d903de6c6b932cf858625efe981dd3d0be3f886 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Thu, 16 Nov 2023 22:08:11 +0100 Subject: [PATCH] Move Raft state de/serialization into Raft client. --- src/sql/engine/raft.rs | 162 +++++++++++++++++------------------------ 1 file changed, 65 insertions(+), 97 deletions(-) diff --git a/src/sql/engine/raft.rs b/src/sql/engine/raft.rs index f109aa9c0..62970ac77 100644 --- a/src/sql/engine/raft.rs +++ b/src/sql/engine/raft.rs @@ -5,7 +5,7 @@ use crate::error::{Error, Result}; use crate::raft; use crate::storage::{self, bincode, mvcc::TransactionState}; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::collections::HashSet; use tokio::sync::{mpsc, oneshot}; @@ -85,18 +85,20 @@ impl Client { futures::executor::block_on(response_rx)? } - /// Mutates the Raft state machine. - fn mutate(&self, command: Vec) -> Result> { - match self.execute(raft::Request::Mutate(command))? { - raft::Response::State(response) => Ok(response), + /// Mutates the Raft state machine, deserializing the response into the + /// return type. + fn mutate(&self, mutation: Mutation) -> Result { + match self.execute(raft::Request::Mutate(bincode::serialize(&mutation)?))? { + raft::Response::State(response) => Ok(bincode::deserialize(&response)?), resp => Err(Error::Internal(format!("Unexpected Raft mutation response {:?}", resp))), } } - /// Queries the Raft state machine. - fn query(&self, command: Vec) -> Result> { - match self.execute(raft::Request::Query(command))? { - raft::Response::State(response) => Ok(response), + /// Queries the Raft state machine, deserializing the response into the + /// return type. + fn query(&self, query: Query) -> Result { + match self.execute(raft::Request::Query(bincode::serialize(&query)?))? { + raft::Response::State(response) => Ok(bincode::deserialize(&response)?), resp => Err(Error::Internal(format!("Unexpected Raft query response {:?}", resp))), } } @@ -131,20 +133,7 @@ impl Raft { /// Returns Raft SQL engine status. pub fn status(&self) -> Result { - Ok(Status { - raft: self.client.status()?, - mvcc: Raft::deserialize(&self.client.query(Raft::serialize(&Query::Status)?)?)?, - }) - } - - /// Serializes a command for the Raft SQL state machine. - fn serialize(value: &V) -> Result> { - bincode::serialize(value) - } - - /// Deserializes a command for the Raft SQL state machine. - fn deserialize<'a, V: Deserialize<'a>>(bytes: &'a [u8]) -> Result { - bincode::deserialize(bytes) + Ok(Status { raft: self.client.status()?, mvcc: self.client.query(Query::Status)? }) } } @@ -174,21 +163,9 @@ pub struct Transaction { impl Transaction { /// Starts a transaction in the given mode. fn begin(client: Client, read_only: bool, as_of: Option) -> Result { - let state = Raft::deserialize( - &client.mutate(Raft::serialize(&Mutation::Begin { read_only, as_of })?)?, - )?; + let state = client.mutate(Mutation::Begin { read_only, as_of })?; Ok(Self { client, state }) } - - /// Executes a mutation. - fn mutate(&self, mutation: Mutation) -> Result> { - self.client.mutate(Raft::serialize(&mutation)?) - } - - /// Executes a query. - fn query(&self, query: Query) -> Result> { - self.client.query(Raft::serialize(&query)?) - } } impl super::Transaction for Transaction { @@ -201,108 +178,99 @@ impl super::Transaction for Transaction { } fn commit(self) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Commit(self.state.clone()))?) + self.client.mutate(Mutation::Commit(self.state.clone())) } fn rollback(self) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Rollback(self.state.clone()))?) + self.client.mutate(Mutation::Rollback(self.state.clone())) } fn create(&mut self, table: &str, row: Row) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Create { + self.client.mutate(Mutation::Create { txn: self.state.clone(), table: table.to_string(), row, - })?) + }) } fn delete(&mut self, table: &str, id: &Value) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Delete { + self.client.mutate(Mutation::Delete { txn: self.state.clone(), table: table.to_string(), id: id.clone(), - })?) + }) } fn read(&self, table: &str, id: &Value) -> Result> { - Raft::deserialize(&self.query(Query::Read { + self.client.query(Query::Read { txn: self.state.clone(), table: table.to_string(), id: id.clone(), - })?) + }) } fn read_index(&self, table: &str, column: &str, value: &Value) -> Result> { - Raft::deserialize(&self.query(Query::ReadIndex { + self.client.query(Query::ReadIndex { txn: self.state.clone(), table: table.to_string(), column: column.to_string(), value: value.clone(), - })?) + }) } fn scan(&self, table: &str, filter: Option) -> Result { Ok(Box::new( - Raft::deserialize::>(&self.query(Query::Scan { - txn: self.state.clone(), - table: table.to_string(), - filter, - })?)? - .into_iter() - .map(Ok), + self.client + .query::>(Query::Scan { + txn: self.state.clone(), + table: table.to_string(), + filter, + })? + .into_iter() + .map(Ok), )) } fn scan_index(&self, table: &str, column: &str) -> Result { Ok(Box::new( - Raft::deserialize::>(&self.query(Query::ScanIndex { - txn: self.state.clone(), - table: table.to_string(), - column: column.to_string(), - })?)? - .into_iter() - .map(Ok), + self.client + .query::>(Query::ScanIndex { + txn: self.state.clone(), + table: table.to_string(), + column: column.to_string(), + })? + .into_iter() + .map(Ok), )) } fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Update { + self.client.mutate(Mutation::Update { txn: self.state.clone(), table: table.to_string(), id: id.clone(), row, - })?) + }) } } impl Catalog for Transaction { fn create_table(&mut self, table: Table) -> Result<()> { - Raft::deserialize( - &self.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })?, - ) + self.client.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table }) } fn delete_table(&mut self, table: &str) -> Result<()> { - Raft::deserialize( - &self.mutate(Mutation::DeleteTable { - txn: self.state.clone(), - table: table.to_string(), - })?, - ) + self.client + .mutate(Mutation::DeleteTable { txn: self.state.clone(), table: table.to_string() }) } fn read_table(&self, table: &str) -> Result> { - Raft::deserialize( - &self.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() })?, - ) + self.client.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() }) } fn scan_tables(&self) -> Result { Ok(Box::new( - Raft::deserialize::>( - &self.query(Query::ScanTables { txn: self.state.clone() })?, - )? - .into_iter(), + self.client.query::>(Query::ScanTables { txn: self.state.clone() })?.into_iter(), )) } } @@ -321,7 +289,7 @@ impl State { let engine = super::KV::new(engine); let applied_index = engine .get_metadata(b"applied_index")? - .map(|b| Raft::deserialize(&b)) + .map(|b| bincode::deserialize(&b)) .unwrap_or(Ok(0))?; Ok(State { engine, applied_index }) } @@ -337,26 +305,26 @@ impl State { } else { self.engine.begin_read_only()? }; - Raft::serialize(&txn.state()) + bincode::serialize(&txn.state()) } - Mutation::Commit(txn) => Raft::serialize(&self.engine.resume(txn)?.commit()?), - Mutation::Rollback(txn) => Raft::serialize(&self.engine.resume(txn)?.rollback()?), + Mutation::Commit(txn) => bincode::serialize(&self.engine.resume(txn)?.commit()?), + Mutation::Rollback(txn) => bincode::serialize(&self.engine.resume(txn)?.rollback()?), Mutation::Create { txn, table, row } => { - Raft::serialize(&self.engine.resume(txn)?.create(&table, row)?) + bincode::serialize(&self.engine.resume(txn)?.create(&table, row)?) } Mutation::Delete { txn, table, id } => { - Raft::serialize(&self.engine.resume(txn)?.delete(&table, &id)?) + bincode::serialize(&self.engine.resume(txn)?.delete(&table, &id)?) } Mutation::Update { txn, table, id, row } => { - Raft::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?) + bincode::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?) } Mutation::CreateTable { txn, schema } => { - Raft::serialize(&self.engine.resume(txn)?.create_table(schema)?) + bincode::serialize(&self.engine.resume(txn)?.create_table(schema)?) } Mutation::DeleteTable { txn, table } => { - Raft::serialize(&self.engine.resume(txn)?.delete_table(&table)?) + bincode::serialize(&self.engine.resume(txn)?.delete_table(&table)?) } } } @@ -370,10 +338,10 @@ impl raft::State for State { fn mutate(&mut self, index: u64, command: Vec) -> Result> { // We don't check that index == applied_index + 1, since the Raft log commits no-op // entries during leader election which we need to ignore. - match self.apply(Raft::deserialize(&command)?) { + match self.apply(bincode::deserialize(&command)?) { error @ Err(Error::Internal(_)) => error, result => { - self.engine.set_metadata(b"applied_index", Raft::serialize(&(index))?)?; + self.engine.set_metadata(b"applied_index", bincode::serialize(&(index))?)?; self.applied_index = index; result } @@ -381,31 +349,31 @@ impl raft::State for State { } fn query(&self, command: Vec) -> Result> { - match Raft::deserialize(&command)? { + match bincode::deserialize(&command)? { Query::Read { txn, table, id } => { - Raft::serialize(&self.engine.resume(txn)?.read(&table, &id)?) + bincode::serialize(&self.engine.resume(txn)?.read(&table, &id)?) } Query::ReadIndex { txn, table, column, value } => { - Raft::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?) + bincode::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?) } // FIXME These need to stream rows somehow - Query::Scan { txn, table, filter } => Raft::serialize( + Query::Scan { txn, table, filter } => bincode::serialize( &self.engine.resume(txn)?.scan(&table, filter)?.collect::>>()?, ), - Query::ScanIndex { txn, table, column } => Raft::serialize( + Query::ScanIndex { txn, table, column } => bincode::serialize( &self .engine .resume(txn)? .scan_index(&table, &column)? .collect::>>()?, ), - Query::Status => Raft::serialize(&self.engine.kv.status()?), + Query::Status => bincode::serialize(&self.engine.kv.status()?), Query::ReadTable { txn, table } => { - Raft::serialize(&self.engine.resume(txn)?.read_table(&table)?) + bincode::serialize(&self.engine.resume(txn)?.read_table(&table)?) } Query::ScanTables { txn } => { - Raft::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::>()) + bincode::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::>()) } } }