Skip to content

Commit

Permalink
*: skeleton of range query
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Aug 5, 2024
1 parent 9fe8fd4 commit 815fe7d
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 24 deletions.
3 changes: 3 additions & 0 deletions presets/benchmarks/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ kmax = 1000
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
repeat = 1
dist = "incrementp"

Expand All @@ -20,11 +21,13 @@ timeout = 0.2
set_perc = 50
get_perc = 50
del_perc = 0
scan_perc = 0
dist = "zipfian"

[[benchmark]]
timeout = 0.2
set_perc = 50
get_perc = 50
del_perc = 0
scan_perc = 0
dist = "uniform"
34 changes: 34 additions & 0 deletions presets/benchmarks/example_scan.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[global]
threads = 8
repeat = 5
qd = 100
batch = 10
scan = 10
klen = 8
vlen = 16
kmin = 0
kmax = 1000

[[benchmark]]
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
repeat = 1
dist = "incrementp"

[[benchmark]]
timeout = 0.2
set_perc = 50
get_perc = 25
del_perc = 0
scan_perc = 25
dist = "zipfian"

[[benchmark]]
timeout = 0.2
set_perc = 50
get_perc = 25
del_perc = 0
scan_perc = 25
dist = "uniform"
130 changes: 124 additions & 6 deletions src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,16 @@ impl Benchmark {
/// is missing. For the usage of each option, please refer to [`BenchmarkOpt`].
#[derive(Deserialize, Clone, Debug)]
pub struct GlobalOpt {
// benchmark
pub threads: Option<usize>,
pub repeat: Option<usize>,
pub qd: Option<usize>,
pub batch: Option<usize>,
pub report: Option<String>,
pub latency: Option<bool>,
pub cdf: Option<bool>,
// workload
pub scan: Option<usize>,
pub klen: Option<usize>,
pub vlen: Option<usize>,
pub kmin: Option<usize>,
Expand All @@ -376,6 +379,7 @@ impl Default for GlobalOpt {
report: None,
latency: None,
cdf: None,
scan: None,
klen: None,
vlen: None,
kmin: None,
Expand Down Expand Up @@ -403,6 +407,12 @@ impl GlobalOpt {
.cdf
.clone()
.or_else(|| Some(self.cdf.clone().unwrap_or(false)));
// the workload options (fall back to defaults)
opt.workload.scan = opt
.workload
.scan
.clone()
.or_else(|| Some(self.scan.clone().unwrap_or(10)));
// the workload options (must be specified)
opt.workload.klen = opt
.workload
Expand Down Expand Up @@ -765,6 +775,9 @@ fn bench_worker_regular(
Operation::Delete { key } => {
handle.delete(&key[..]);
}
Operation::Scan { key, n } => {
let _ = handle.scan(&key[..], n);
}
}
let op_end = latency_tick();
if let Some(ref mut l) = latency {
Expand Down Expand Up @@ -1099,27 +1112,31 @@ mod tests {
report = "finish"
latency = true
cdf = true
scan = 500
klen = 8
vlen = 16
kmin = 100
kmax = 1000
[[benchmark]]
timeout = 10.0
set_perc = 100
get_perc = 0
del_perc = 0
set_perc = 50
get_perc = 30
del_perc = 10
scan_perc = 10
dist = "incrementp"
"#;

let (_, bg) = init(opt);
assert_eq!(bg.len(), 1);

let wopt = WorkloadOpt {
set_perc: 100,
get_perc: 0,
del_perc: 0,
set_perc: 50,
get_perc: 30,
del_perc: 10,
scan_perc: 10,
dist: "incrementp".to_string(),
scan: Some(500),
klen: Some(8),
vlen: Some(16),
kmin: Some(100),
Expand All @@ -1143,6 +1160,57 @@ mod tests {
assert_eq!(*bg[0], benchmark)
}

#[test]
fn global_options_defaults_are_applied() {
let opt = r#"
[map]
name = "nullmap"
[[benchmark]]
set_perc = 50
get_perc = 30
del_perc = 10
scan_perc = 10
klen = 8
vlen = 16
kmin = 1
kmax = 1000
dist = "shufflep"
"#;

let (_, bg) = init(opt);
assert_eq!(bg.len(), 1);

let wopt = WorkloadOpt {
set_perc: 50,
get_perc: 30,
del_perc: 10,
scan_perc: 10,
dist: "shufflep".to_string(),
scan: Some(10),
klen: Some(8),
vlen: Some(16),
kmin: Some(1),
kmax: Some(1000),
zipf_theta: None,
zipf_hotspot: None,
};

let benchmark = Benchmark {
threads: 1,
repeat: 1,
qd: 1,
batch: 1,
report: ReportMode::All,
latency: false,
cdf: false,
len: Length::Exhaust,
wopt,
};

assert_eq!(*bg[0], benchmark)
}

#[test]
#[should_panic(expected = "should be positive")]
fn invalid_threads() {
Expand All @@ -1162,6 +1230,7 @@ mod tests {
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
dist = "incrementp"
"#;

Expand All @@ -1187,6 +1256,7 @@ mod tests {
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
dist = "incrementp"
"#;

Expand All @@ -1211,6 +1281,7 @@ mod tests {
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
dist = "incrementp"
report = "alll"
"#;
Expand All @@ -1237,6 +1308,7 @@ mod tests {
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
dist = "incrementp"
"#;

Expand All @@ -1262,6 +1334,7 @@ mod tests {
set_perc = 100
get_perc = 0
del_perc = 0
scan_perc = 0
dist = "incrementp"
"#;

Expand All @@ -1273,13 +1346,25 @@ mod tests {
"/presets/benchmarks/example.toml"
));

const EXAMPLE_SCAN_BENCH: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/presets/benchmarks/example_scan.toml"
));

fn example(map_opt: &str) {
let _ = env_logger::try_init();
let opt = map_opt.to_string() + "\n" + EXAMPLE_BENCH;
let (map, phases) = init(&opt);
map.bench(&phases);
}

fn example_scan(map_opt: &str) {
let _ = env_logger::try_init();
let opt = map_opt.to_string() + "\n" + EXAMPLE_SCAN_BENCH;
let (map, phases) = init(&opt);
map.bench(&phases);
}

#[test]
fn example_null() {
const OPT: &str = include_str!(concat!(
Expand All @@ -1289,6 +1374,15 @@ mod tests {
example(OPT);
}

#[test]
fn example_scan_null() {
const OPT: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/presets/stores/null.toml"
));
example_scan(OPT);
}

#[test]
fn example_null_async() {
const OPT: &str = include_str!(concat!(
Expand All @@ -1298,6 +1392,15 @@ mod tests {
example(OPT);
}

#[test]
fn example_scan_null_async() {
const OPT: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/presets/stores/null_async.toml"
));
example_scan(OPT);
}

#[test]
fn example_mutex_hashmap() {
const OPT: &str = include_str!(concat!(
Expand Down Expand Up @@ -1408,6 +1511,21 @@ mod tests {
);
example(&opt);
}

#[test]
#[cfg(feature = "rocksdb")]
fn example_scan_rocksdb() {
let tmp_dir = tempfile::tempdir().unwrap();
let opt = format!(
r#"
[map]
name = "rocksdb"
path = "{}"
"#,
tmp_dir.path().to_str().unwrap().to_string()
);
example_scan(&opt);
}
}

// }}} tests
8 changes: 7 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ pub trait KVMapHandle {
/// Removing a key if it exists.
fn delete(&mut self, key: &[u8]);

// fn read_modify_write(&mut self, key: &[u8]);
/// Querying a range starting from the first key greater than or equal to the given key.
///
/// Note: For simplicity, it returns only the values.
fn scan(&mut self, key: &[u8], n: usize) -> Vec<Box<[u8]>>;
}

/// A single operation that is applied to the key-value store.
Expand All @@ -104,6 +107,9 @@ pub enum Operation {

/// Removing a key if it exists.
Delete { key: Box<[u8]> },

/// Querying a range starting from the first key greater than or equal to the given key.
Scan { key: Box<[u8]>, n: usize },
}

/// A request sent by a client to a server.
Expand Down
5 changes: 5 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ fn serve_requests_regular(
handle.delete(key);
assert!(write_response(&mut *writer, id, None).is_ok());
}
Operation::Scan { ref key, n } => {
for v in handle.scan(key, *n) {
assert!(write_response(&mut *writer, id, Some(&v[..])).is_ok());
}
}
}
}
}
Expand Down
Loading

0 comments on commit 815fe7d

Please sign in to comment.