Skip to content

Commit

Permalink
server: send requests in batches only
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Jul 25, 2024
1 parent 124fd08 commit d8f414b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 73 deletions.
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub struct Request {
}

/// A response sent by a server to a client.
#[derive(Eq, PartialEq, Clone, Debug)]
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone, Debug)]
pub struct Response {
/// The `id` of the corresponding request.
pub id: usize,
Expand Down Expand Up @@ -188,7 +188,6 @@ impl AsyncResponder for RefCell<Vec<Response>> {

pub mod bench;
mod cmdline;
mod serialization;
pub mod server;
pub mod stores;
pub mod thread;
Expand Down
61 changes: 0 additions & 61 deletions src/serialization.rs

This file was deleted.

78 changes: 68 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! A key-value server/client implementation.
use crate::serialization::{read_request, read_response, write_request, write_response};
use crate::stores::{BenchKVMap, BenchKVMapOpt};
use crate::thread::{JoinHandle, Thread};
use crate::*;
Expand All @@ -19,6 +18,69 @@ use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;

/// Requests are sent in a batch. Here we do not send the requests in the batch one by one, but
/// instead in a vector, because the server implementation uses event-based notification. It does
/// not know how many requests are there during an event, so it may not read all requests,
/// especially when the batch is large.
fn write_requests(writer: &mut impl Write, requests: &Vec<Request>) -> Result<(), bincode::Error> {
bincode::serialize_into(writer, requests)
}

fn read_requests(reader: &mut impl Read) -> Result<Vec<Request>, bincode::Error> {
bincode::deserialize_from::<_, Vec<Request>>(reader)
}

/// Responses have a customized header and the (de)serialization process is manual because the
/// payload (data) in a response may be from a reference. It is preferable to directly write the
/// bytes from the reference to the writer instead of creating a new [`Response`] and perform a
/// copy of the payload data.
#[derive(Serialize, Deserialize)]
struct ResponseHeader {
id: usize,
len: usize,
}

fn write_response(
writer: &mut impl Write,
id: usize,
data: Option<&[u8]>,
) -> Result<(), bincode::Error> {
let len = match data {
Some(data) => data.len(),
None => 0,
};
let header = ResponseHeader { id, len };
if let Err(e) = bincode::serialize_into(&mut *writer, &header) {
return Err(e);
}
// has payload
if len != 0 {
if let Err(e) = writer.write_all(data.unwrap()) {
return Err(bincode::Error::from(e));
}
}
Ok(())
}

fn read_response(reader: &mut impl Read) -> Result<Response, bincode::Error> {
let header = bincode::deserialize_from::<_, ResponseHeader>(&mut *reader)?;
let id = header.id;
let len = header.len;
if len != 0 {
let mut data = vec![0u8; len].into_boxed_slice();
if let Err(e) = reader.read_exact(&mut data[..]) {
Err(bincode::Error::from(e))
} else {
Ok(Response {
id,
data: Some(data),
})
}
} else {
Ok(Response { id, data: None })
}
}

const POLLING_TIMEOUT: Option<Duration> = Some(Duration::new(0, 0));

enum WorkerMsg {
Expand Down Expand Up @@ -185,10 +247,10 @@ fn recv_requests(reader: &mut RequestReader) -> Vec<Request> {
assert!(reader.fill_buf().is_ok());
let mut requests = Vec::new();

// here we must drain the buffer until it is empty, because one event may consist of multiple
// batches, usually when the client keeps sending.
while !reader.0.buffer().is_empty() {
let reader = &mut *reader;
let request = read_request(reader).unwrap();
requests.push(request);
requests.append(&mut read_requests(reader).unwrap());
}

requests
Expand All @@ -206,7 +268,7 @@ fn server_worker_regular_main(
assert!(connection.writer().flush().is_ok());
}
assert!(poll.poll(events, POLLING_TIMEOUT).is_ok());
for event in events as &Events {
for event in events.iter() {
let token = event.token();
assert_ne!(token, Token(0));
if event.is_read_closed() || event.is_write_closed() {
Expand Down Expand Up @@ -543,9 +605,7 @@ impl KVClient {
}

pub(crate) fn send_requests(&mut self, requests: &Vec<Request>) {
for r in requests {
assert!(write_request(&mut self.request_writer, r).is_ok())
}
assert!(write_requests(&mut self.request_writer, requests).is_ok());
assert!(self.request_writer.flush().is_ok());
}

Expand Down Expand Up @@ -804,7 +864,6 @@ mod tests {
for j in 0..NR_BATCHES {
client.send_requests(&batch[j]);
pending += BATCH_SIZE;
// println!("send: client {} batch {} pending {}", i, j, pending);
loop {
let response = client.recv_responses();
for r in response {
Expand All @@ -821,7 +880,6 @@ mod tests {
break;
}
}
// println!("recv: client {} batch {} pending {}", i, j, pending);
}
// finish remaining
loop {
Expand Down

0 comments on commit d8f414b

Please sign in to comment.