From d8f414b79a32a5966691ab26a1b6f8339e7293a9 Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Wed, 24 Jul 2024 22:49:31 -0500 Subject: [PATCH] server: send requests in batches only --- src/lib.rs | 3 +- src/serialization.rs | 61 ---------------------------------- src/server.rs | 78 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 69 insertions(+), 73 deletions(-) delete mode 100644 src/serialization.rs diff --git a/src/lib.rs b/src/lib.rs index ccdcff4..3e48897 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,7 +115,7 @@ pub struct Request { } /// A response sent by a server to a client. -#[derive(Eq, PartialEq, Clone, Debug)] +#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)] pub struct Response { /// The `id` of the corresponding request. pub id: usize, @@ -188,7 +188,6 @@ impl AsyncResponder for RefCell> { pub mod bench; mod cmdline; -mod serialization; pub mod server; pub mod stores; pub mod thread; diff --git a/src/serialization.rs b/src/serialization.rs deleted file mode 100644 index 0cae002..0000000 --- a/src/serialization.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::{Request, Response}; -use serde::{Deserialize, Serialize}; -use std::io::{Read, Write}; - -pub(crate) fn read_request(reader: &mut impl Read) -> Result { - bincode::deserialize_from::<_, Request>(reader) -} - -pub(crate) fn write_request( - writer: &mut impl Write, - request: &Request, -) -> Result<(), bincode::Error> { - bincode::serialize_into(writer, request) -} - -#[derive(Serialize, Deserialize)] -struct ResponseHeader { - id: usize, - len: usize, -} - -pub(crate) fn read_response(reader: &mut impl Read) -> Result { - let header = bincode::deserialize_from::<_, ResponseHeader>(&mut *reader)?; - let id = header.id; - let len = header.len; - if len != 0 { - let mut data = vec![0u8; len].into_boxed_slice(); - if let Err(e) = reader.read_exact(&mut data[..]) { - Err(bincode::Error::from(e)) - } else { - Ok(Response { - id, - data: Some(data), - }) - } - } else { - Ok(Response { id, data: None }) - } -} - -pub(crate) fn write_response( - writer: &mut impl Write, - id: usize, - data: Option<&[u8]>, -) -> Result<(), bincode::Error> { - let len = match data { - Some(data) => data.len(), - None => 0, - }; - let header = ResponseHeader { id, len }; - if let Err(e) = bincode::serialize_into(&mut *writer, &header) { - return Err(e); - } - // has payload - if len != 0 { - if let Err(e) = writer.write_all(data.unwrap()) { - return Err(bincode::Error::from(e)); - } - } - Ok(()) -} diff --git a/src/server.rs b/src/server.rs index 313eec0..61069e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,5 @@ //! A key-value server/client implementation. -use crate::serialization::{read_request, read_response, write_request, write_response}; use crate::stores::{BenchKVMap, BenchKVMapOpt}; use crate::thread::{JoinHandle, Thread}; use crate::*; @@ -19,6 +18,69 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use std::time::Duration; +/// Requests are sent in a batch. Here we do not send the requests in the batch one by one, but +/// instead in a vector, because the server implementation uses event-based notification. It does +/// not know how many requests are there during an event, so it may not read all requests, +/// especially when the batch is large. +fn write_requests(writer: &mut impl Write, requests: &Vec) -> Result<(), bincode::Error> { + bincode::serialize_into(writer, requests) +} + +fn read_requests(reader: &mut impl Read) -> Result, bincode::Error> { + bincode::deserialize_from::<_, Vec>(reader) +} + +/// Responses have a customized header and the (de)serialization process is manual because the +/// payload (data) in a response may be from a reference. It is preferable to directly write the +/// bytes from the reference to the writer instead of creating a new [`Response`] and perform a +/// copy of the payload data. +#[derive(Serialize, Deserialize)] +struct ResponseHeader { + id: usize, + len: usize, +} + +fn write_response( + writer: &mut impl Write, + id: usize, + data: Option<&[u8]>, +) -> Result<(), bincode::Error> { + let len = match data { + Some(data) => data.len(), + None => 0, + }; + let header = ResponseHeader { id, len }; + if let Err(e) = bincode::serialize_into(&mut *writer, &header) { + return Err(e); + } + // has payload + if len != 0 { + if let Err(e) = writer.write_all(data.unwrap()) { + return Err(bincode::Error::from(e)); + } + } + Ok(()) +} + +fn read_response(reader: &mut impl Read) -> Result { + let header = bincode::deserialize_from::<_, ResponseHeader>(&mut *reader)?; + let id = header.id; + let len = header.len; + if len != 0 { + let mut data = vec![0u8; len].into_boxed_slice(); + if let Err(e) = reader.read_exact(&mut data[..]) { + Err(bincode::Error::from(e)) + } else { + Ok(Response { + id, + data: Some(data), + }) + } + } else { + Ok(Response { id, data: None }) + } +} + const POLLING_TIMEOUT: Option = Some(Duration::new(0, 0)); enum WorkerMsg { @@ -185,10 +247,10 @@ fn recv_requests(reader: &mut RequestReader) -> Vec { assert!(reader.fill_buf().is_ok()); let mut requests = Vec::new(); + // here we must drain the buffer until it is empty, because one event may consist of multiple + // batches, usually when the client keeps sending. while !reader.0.buffer().is_empty() { - let reader = &mut *reader; - let request = read_request(reader).unwrap(); - requests.push(request); + requests.append(&mut read_requests(reader).unwrap()); } requests @@ -206,7 +268,7 @@ fn server_worker_regular_main( assert!(connection.writer().flush().is_ok()); } assert!(poll.poll(events, POLLING_TIMEOUT).is_ok()); - for event in events as &Events { + for event in events.iter() { let token = event.token(); assert_ne!(token, Token(0)); if event.is_read_closed() || event.is_write_closed() { @@ -543,9 +605,7 @@ impl KVClient { } pub(crate) fn send_requests(&mut self, requests: &Vec) { - for r in requests { - assert!(write_request(&mut self.request_writer, r).is_ok()) - } + assert!(write_requests(&mut self.request_writer, requests).is_ok()); assert!(self.request_writer.flush().is_ok()); } @@ -804,7 +864,6 @@ mod tests { for j in 0..NR_BATCHES { client.send_requests(&batch[j]); pending += BATCH_SIZE; - // println!("send: client {} batch {} pending {}", i, j, pending); loop { let response = client.recv_responses(); for r in response { @@ -821,7 +880,6 @@ mod tests { break; } } - // println!("recv: client {} batch {} pending {}", i, j, pending); } // finish remaining loop {