Skip to content

Commit

Permalink
feat: add a sync version of Store.read: that only retrieves historic …
Browse files Browse the repository at this point in the history
…frames
  • Loading branch information
cablehead committed Dec 28, 2024
1 parent 81bb22a commit 0dc0b1c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,31 @@ impl Store {
rx
}

pub fn read_sync(
&self,
last_id: Option<&Scru128Id>,
limit: Option<usize>,
) -> impl Iterator<Item = Frame> + '_ {
let range = get_range(last_id);

self.frame_partition
.range(range)
.filter_map(move |record| {
let frame = deserialize_frame(record.ok()?);

// Filter out expired frames
if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
if is_expired(&frame.id, ttl) {
let _ = self.gc_tx.send(GCTask::Remove(frame.id));
return None;
}
}

Some(frame)
})
.take(limit.unwrap_or(usize::MAX))
}

pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
let res = self.frame_partition.get(id.to_bytes()).unwrap();
res.map(|value| serde_json::from_slice(&value).unwrap())
Expand Down
23 changes: 23 additions & 0 deletions src/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,29 @@ mod tests_store {
"Channel should be closed after limit"
);
}

#[test]
fn test_read_sync() {
let temp_dir = TempDir::new().unwrap();
let store = Store::new(temp_dir.into_path());

// Append three frames
let frame1 = store.append(Frame::with_topic("test").build());
let frame2 = store.append(Frame::with_topic("test").build());
let frame3 = store.append(Frame::with_topic("test").build());

// Test reading all frames
let frames: Vec<Frame> = store.read_sync(None, None).collect();
assert_eq!(vec![frame1.clone(), frame2.clone(), frame3.clone()], frames);

// Test with last_id (passing Scru128Id directly)
let frames: Vec<Frame> = store.read_sync(Some(&frame1.id), None).collect();
assert_eq!(vec![frame2.clone(), frame3.clone()], frames);

// Test with limit
let frames: Vec<Frame> = store.read_sync(None, Some(2)).collect();
assert_eq!(vec![frame1, frame2], frames);
}
}

mod tests_ttl {
Expand Down

0 comments on commit 0dc0b1c

Please sign in to comment.