Skip to content

Commit

Permalink
*: refactor response format for range queries
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Aug 6, 2024
1 parent 815fe7d commit d966eb3
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 58 deletions.
11 changes: 7 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ pub trait KVMapHandle {
fn delete(&mut self, key: &[u8]);

/// Querying a range starting from the first key greater than or equal to the given key.
///
/// Note: For simplicity, it returns only the values.
fn scan(&mut self, key: &[u8], n: usize) -> Vec<Box<[u8]>>;
fn scan(&mut self, key: &[u8], n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)>;
}

/// A single operation that is applied to the key-value store.
Expand Down Expand Up @@ -129,7 +127,12 @@ pub struct Response {
pub id: usize,

/// The real payload that contains the potential returned value.
pub data: Option<Box<[u8]>>,
///
/// For a `SET` or `DELETE` request, this should be `None`.
/// For a `GET` request, this should contain the value of the key (single element).
/// For a `SCAN` request, this should contain the sequence of the range query results, ordered
/// like `key0, value0, key1, value1 ...`.
pub data: Option<Vec<Box<[u8]>>>,
}

/// A non-blocking, thread-safe key-value map.
Expand Down
83 changes: 48 additions & 35 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,40 @@ fn read_requests(reader: &mut impl Read) -> Result<Vec<Request>, bincode::Error>
#[derive(Serialize, Deserialize)]
struct ResponseHeader {
id: usize,
len: usize,
has_data: bool,
}

fn write_response(
writer: &mut impl Write,
id: usize,
data: Option<&[u8]>,
data: Option<&[&[u8]]>,
) -> Result<(), bincode::Error> {
let len = match data {
Some(data) => data.len(),
None => 0,
let has_data = match data {
Some(_) => true,
None => false,
};
let header = ResponseHeader { id, len };
let header = ResponseHeader { id, has_data };
if let Err(e) = bincode::serialize_into(&mut *writer, &header) {
return Err(e);
}
// has payload
if len != 0 {
if let Err(e) = writer.write_all(data.unwrap()) {
return Err(bincode::Error::from(e));
if has_data {
if let Err(e) = bincode::serialize_into(&mut *writer, data.unwrap()) {
return Err(e);
}
}
Ok(())
}

fn read_response(reader: &mut impl Read) -> Result<Response, bincode::Error> {
let header = bincode::deserialize_from::<_, ResponseHeader>(&mut *reader)?;
let id = header.id;
let len = header.len;
if len != 0 {
let mut data = vec![0u8; len].into_boxed_slice();
if let Err(e) = reader.read_exact(&mut data[..]) {
Err(bincode::Error::from(e))
} else {
Ok(Response {
id,
data: Some(data),
})
}
let ResponseHeader { id, has_data } =
bincode::deserialize_from::<_, ResponseHeader>(&mut *reader)?;
if has_data {
let data = bincode::deserialize_from::<_, Vec<Box<[u8]>>>(&mut *reader)?;
Ok(Response {
id,
data: Some(data),
})
} else {
Ok(Response { id, data: None })
}
Expand All @@ -102,7 +97,8 @@ fn serve_requests_regular(
}
Operation::Get { ref key } => match handle.get(key) {
Some(v) => {
assert!(write_response(&mut *writer, id, Some(&v[..]),).is_ok());
let data = &vec![&v[..]];
assert!(write_response(&mut *writer, id, Some(data)).is_ok());
}
None => {
assert!(write_response(&mut *writer, id, None).is_ok());
Expand All @@ -113,9 +109,15 @@ fn serve_requests_regular(
assert!(write_response(&mut *writer, id, None).is_ok());
}
Operation::Scan { ref key, n } => {
for v in handle.scan(key, *n) {
assert!(write_response(&mut *writer, id, Some(&v[..])).is_ok());
}
let kv = handle.scan(key, *n);
let data = kv
.iter()
.fold(Vec::with_capacity(kv.len() * 2), |mut vec, kv| {
vec.push(&kv.0[..]);
vec.push(&kv.1[..]);
vec
});
assert!(write_response(&mut *writer, id, Some(&data[..])).is_ok());
}
}
}
Expand Down Expand Up @@ -230,12 +232,20 @@ type StreamMap = HashMap<Token, Connection>;

impl AsyncResponder for ResponseWriter {
fn callback(&self, response: Response) {
assert!(write_response(
&mut *self.0.borrow_mut(),
response.id,
response.data.as_deref()
)
.is_ok());
let Response { id, data } = response;
match data {
Some(data) => {
assert!(write_response(
&mut *self.0.borrow_mut(),
id,
Some(&data.iter().map(|d| &d[..]).collect::<Vec<&[u8]>>()[..])
)
.is_ok());
}
None => {
assert!(write_response(&mut *self.0.borrow_mut(), id, None).is_ok());
}
}
}
}

Expand Down Expand Up @@ -680,7 +690,10 @@ impl KVClient {
let response = responses.pop().unwrap();
assert_eq!(response.id, 0);
match response.data {
Some(v) => Some(v),
Some(mut v) => {
assert_eq!(v.len(), 1);
v.pop()
}
None => None,
}
}
Expand Down Expand Up @@ -906,7 +919,7 @@ mod tests {
// set
assert_eq!(r.data, None);
} else {
assert_eq!(r.data, Some([170u8; 16].into()));
assert_eq!(r.data, Some(vec![[170u8; 16].into()]));
}
pending -= 1;
}
Expand All @@ -927,7 +940,7 @@ mod tests {
// set
assert_eq!(r.data, None);
} else {
assert_eq!(r.data, Some([170u8; 16].into()));
assert_eq!(r.data, Some(vec![[170u8; 16].into()]));
}
pending -= 1;
}
Expand Down
18 changes: 12 additions & 6 deletions src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,31 +155,35 @@ mod tests {
assert_eq!(v.len(), 10000);
for i in 10000..20000usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 10000], bytes);
assert_eq!(*v[i - 10000].0, bytes);
assert_eq!(*v[i - 10000].1, bytes);
}

// query 10000 next 20000, should have 10000
let v = handle.scan(&10000_usize.to_be_bytes(), 20000);
assert_eq!(v.len(), 10000);
for i in 10000..20000usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 10000], bytes);
assert_eq!(*v[i - 10000].0, bytes);
assert_eq!(*v[i - 10000].1, bytes);
}

// query 10000 next 5, should have 5
let v = handle.scan(&10000_usize.to_be_bytes(), 5);
assert_eq!(v.len(), 5);
for i in 10000..10005usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 10000], bytes);
assert_eq!(*v[i - 10000].0, bytes);
assert_eq!(*v[i - 10000].1, bytes);
}

// query 13333 next 444, should have 444
let v = handle.scan(&13333_usize.to_be_bytes(), 444);
assert_eq!(v.len(), 444);
for i in 13333..13777usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 13333], bytes);
assert_eq!(*v[i - 13333].0, bytes);
assert_eq!(*v[i - 13333].1, bytes);
}

// query 13333 next 0, should have 0
Expand All @@ -195,15 +199,17 @@ mod tests {
assert_eq!(v.len(), 5000);
for i in 10000..15000usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 10000], bytes);
assert_eq!(*v[i - 10000].0, bytes);
assert_eq!(*v[i - 10000].1, bytes);
}

// query 8000 next 5000, should have 5000
let v = handle.scan(&8000_usize.to_be_bytes(), 5000);
assert_eq!(v.len(), 5000);
for i in 10000..15000usize {
let bytes = i.clone().to_be_bytes();
assert_eq!(*v[i - 10000], bytes);
assert_eq!(*v[i - 10000].0, bytes);
assert_eq!(*v[i - 10000].1, bytes);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/stores/btreemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl KVMapHandle for MutexBTreeMap {
self.0.lock().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
// technically iteration is supported but querying a specific range is not a stable feature
unimplemented!("Range query is not supported");
}
Expand Down Expand Up @@ -108,7 +108,7 @@ impl KVMapHandle for RwLockBTreeMap {
self.0.write().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
// technically iteration is supported but querying a specific range is not a stable feature
unimplemented!("Range query is not supported");
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/chashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl KVMapHandle for CHashMap {
self.0.remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/contrie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl KVMapHandle for Contrie {
self.0.remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/dashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl KVMapHandle for DashMap {
self.0.remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/flurry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl KVMapHandle for Flurry {
self.0.pin().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/stores/hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl KVMapHandle for MutexHashMap {
self.shards[sid].lock().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl KVMapHandle for RwLockHashMap {
self.shards[sid].write().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl KVMapHandle for NullMap {

fn delete(&mut self, _key: &[u8]) {}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
Vec::new()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stores/papaya.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl KVMapHandle for Papaya {
self.0.pin().remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/stores/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl KVMapHandle for RocksDB {
assert!(self.db.delete(key).is_ok());
}

fn scan(&mut self, key: &[u8], n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, key: &[u8], n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
let mut kv = Vec::with_capacity(n);
let iter = self
.db
Expand All @@ -70,7 +70,7 @@ impl KVMapHandle for RocksDB {
if i == n {
break;
}
kv.push(item.unwrap().1);
kv.push(item.unwrap());
i += 1;
}
kv
Expand Down
2 changes: 1 addition & 1 deletion src/stores/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl KVMapHandle for SccHashMap {
self.0.remove(key);
}

fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<Box<[u8]>> {
fn scan(&mut self, _key: &[u8], _n: usize) -> Vec<(Box<[u8]>, Box<[u8]>)> {
unimplemented!("Range query is not supported");
}
}
Expand Down

0 comments on commit d966eb3

Please sign in to comment.