From 0dc0b1c8432f4e1d4117a67d11ed6703d4c21dd2 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Sat, 28 Dec 2024 10:02:47 -0500 Subject: [PATCH] feat: add a sync version of Store.read: that only retrieves historic frames --- src/store/mod.rs | 25 +++++++++++++++++++++++++ src/store/tests.rs | 23 +++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/store/mod.rs b/src/store/mod.rs index bfe4fe3..5b9069f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -307,6 +307,31 @@ impl Store { rx } + pub fn read_sync( + &self, + last_id: Option<&Scru128Id>, + limit: Option, + ) -> impl Iterator + '_ { + let range = get_range(last_id); + + self.frame_partition + .range(range) + .filter_map(move |record| { + let frame = deserialize_frame(record.ok()?); + + // Filter out expired frames + if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() { + if is_expired(&frame.id, ttl) { + let _ = self.gc_tx.send(GCTask::Remove(frame.id)); + return None; + } + } + + Some(frame) + }) + .take(limit.unwrap_or(usize::MAX)) + } + pub fn get(&self, id: &Scru128Id) -> Option { let res = self.frame_partition.get(id.to_bytes()).unwrap(); res.map(|value| serde_json::from_slice(&value).unwrap()) diff --git a/src/store/tests.rs b/src/store/tests.rs index 84cdb90..4324c81 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -276,6 +276,29 @@ mod tests_store { "Channel should be closed after limit" ); } + + #[test] + fn test_read_sync() { + let temp_dir = TempDir::new().unwrap(); + let store = Store::new(temp_dir.into_path()); + + // Append three frames + let frame1 = store.append(Frame::with_topic("test").build()); + let frame2 = store.append(Frame::with_topic("test").build()); + let frame3 = store.append(Frame::with_topic("test").build()); + + // Test reading all frames + let frames: Vec = store.read_sync(None, None).collect(); + assert_eq!(vec![frame1.clone(), frame2.clone(), frame3.clone()], frames); + + // Test with last_id (passing Scru128Id directly) + let frames: Vec = store.read_sync(Some(&frame1.id), None).collect(); + assert_eq!(vec![frame2.clone(), frame3.clone()], frames); + + // Test with limit + let frames: Vec = store.read_sync(None, Some(2)).collect(); + assert_eq!(vec![frame1, frame2], frames); + } } mod tests_ttl {