From 81bb22a08d391fe8084099b0342f96b44345fda2 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 27 Dec 2024 21:16:56 -0500 Subject: [PATCH] feat: move cleaning up expired frames to a background thread --- src/handlers/tests.rs | 1 + src/store/mod.rs | 128 +++++++++++++++++++++++++----------------- src/store/tests.rs | 2 + 3 files changed, 79 insertions(+), 52 deletions(-) diff --git a/src/handlers/tests.rs b/src/handlers/tests.rs index 93d1fa5..107ee28 100644 --- a/src/handlers/tests.rs +++ b/src/handlers/tests.rs @@ -488,6 +488,7 @@ async fn test_return_options() { assert_eq!(meta["frame_id"], frame2.id.to_string()); // Only newest response should be in store + store.wait_for_gc().await; let options = ReadOptions::default(); let recver = store.read(options).await; use tokio_stream::StreamExt; diff --git a/src/store/mod.rs b/src/store/mod.rs index 110b8ca..bfe4fe3 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use tokio::sync::broadcast; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use scru128::Scru128Id; @@ -16,8 +17,6 @@ use serde::{Deserialize, Deserializer, Serialize}; use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle, Slice}; -use crate::error::Error; - #[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)] #[builder(start_fn = with_topic)] pub struct Frame { @@ -114,21 +113,26 @@ where Some((&slice[..index], &slice[index + 1..])) } +#[derive(Debug)] +enum GCTask { + Remove(Scru128Id), + CheckHeadTTL { topic: String, keep: u32 }, + Drain(tokio::sync::oneshot::Sender<()>), +} + #[derive(Clone)] pub struct Store { pub path: PathBuf, - keyspace: Keyspace, frame_partition: PartitionHandle, topic_index: PartitionHandle, - broadcast_tx: broadcast::Sender, + gc_tx: UnboundedSender, } impl Store { pub fn new(path: PathBuf) -> Store { let config = Config::new(path.join("fjall")); - let keyspace = config .flush_workers(1) .compaction_workers(1) @@ -144,16 +148,27 @@ impl Store { .unwrap(); let (broadcast_tx, _) = broadcast::channel(1024); + let (gc_tx, gc_rx) = mpsc::unbounded_channel(); - Store { - path, + let store = Store { + path: path.clone(), + keyspace: keyspace.clone(), + frame_partition: frame_partition.clone(), + topic_index: topic_index.clone(), + broadcast_tx, + gc_tx, + }; - keyspace, - frame_partition, - topic_index, + // Spawn gc worker thread + spawn_gc_worker(gc_rx, store.clone()); - broadcast_tx, - } + store + } + + pub async fn wait_for_gc(&self) { + let (tx, rx) = tokio::sync::oneshot::channel(); + let _ = self.gc_tx.send(GCTask::Drain(tx)); + let _ = rx.await; } #[tracing::instrument(skip(self))] @@ -177,20 +192,16 @@ impl Store { // Only create done channel if we're doing historical processing let done_rx = if !options.tail { let (done_tx, done_rx) = tokio::sync::oneshot::channel(); - - // Clone these for the background thread let tx_clone = tx.clone(); let partition = self.frame_partition.clone(); let options_clone = options.clone(); let should_follow_clone = should_follow; - - let store_clone = self.clone(); + let gc_tx = self.gc_tx.clone(); // Spawn OS thread to handle historical events std::thread::spawn(move || { let mut last_id = None; let mut count = 0; - let mut expired_frames = Vec::new(); let range = get_range(options_clone.last_id.as_ref()); for record in partition.range(range) { @@ -198,7 +209,7 @@ impl Store { if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() { if is_expired(&frame.id, ttl) { - expired_frames.push(frame.id); + let _ = gc_tx.send(GCTask::Remove(frame.id)); continue; } } @@ -211,7 +222,7 @@ impl Store { } } if tx_clone.blocking_send(frame).is_err() { - return; // Receiver dropped, exit thread + return; } count += 1; } @@ -227,10 +238,6 @@ impl Store { } } - for id in expired_frames { - let _ = store_clone.remove(&id); - } - // Signal completion with the last seen ID and count let _ = done_tx.send((last_id, count)); }); @@ -240,7 +247,7 @@ impl Store { None }; - // For tail mode or if we're following, spawn task to handle broadcast + // Handle broadcast subscription and heartbeat if let Some(broadcast_rx) = broadcast_rx { { let tx = tx.clone(); @@ -368,41 +375,19 @@ impl Store { self.keyspace.persist(fjall::PersistMode::SyncAll) } - pub fn append(&self, frame: Frame) -> Frame { - let mut frame = frame; + pub fn append(&self, mut frame: Frame) -> Frame { frame.id = scru128::new(); // only store the frame if it's not ephemeral if frame.ttl != Some(TTL::Ephemeral) { self.insert_frame(&frame).unwrap(); - // If this is a Head TTL, cleanup old frames AFTER insert + // If this is a Head TTL, schedule a gc task if let Some(TTL::Head(n)) = frame.ttl { - let prefix = topic_index_key_for_frame(&frame); - let prefix = &prefix[..prefix.len() - frame.id.as_bytes().len()]; - - let frames_to_remove: Vec<_> = self - .topic_index - .prefix(prefix) - .rev() // Scan from newest to oldest - .skip(n as usize) - .map(|r| -> Result<_, Error> { - let (key, _) = r?; - let (_topic_bytes, frame_id_bytes) = - split_once(&key, |&c| c == 0xFF).ok_or("Missing delimiter")?; - let bytes: [u8; 16] = frame_id_bytes - .try_into() - .map_err(|_| "Invalid frame ID length")?; - Ok(Scru128Id::from_bytes(bytes)) - }) - .collect::>() - .unwrap(); - - for frame_id in frames_to_remove { - tracing::trace!("Removing old frame: {:?}", frame_id); - let _ = self.remove(&frame_id); - tracing::trace!("Removed old frame: {:?}", frame_id); - } + let _ = self.gc_tx.send(GCTask::CheckHeadTTL { + topic: frame.topic.clone(), + keep: n, + }); } } @@ -411,6 +396,45 @@ impl Store { } } +fn spawn_gc_worker(mut gc_rx: UnboundedReceiver, store: Store) { + std::thread::spawn(move || { + while let Some(task) = gc_rx.blocking_recv() { + match task { + GCTask::Remove(id) => { + let _ = store.remove(&id); + } + + GCTask::CheckHeadTTL { topic, keep } => { + let mut prefix = Vec::with_capacity(topic.len() + 1); + prefix.extend(topic.as_bytes()); + prefix.push(0xFF); + + let frames_to_remove: Vec<_> = store + .topic_index + .prefix(&prefix) + .rev() // Scan from newest to oldest + .skip(keep as usize) + .filter_map(|r| { + let (key, _) = r.ok()?; + let (_, frame_id_bytes) = split_once(&key, |&c| c == 0xFF)?; + let bytes: [u8; 16] = frame_id_bytes.try_into().ok()?; + Some(Scru128Id::from_bytes(bytes)) + }) + .collect(); + + for frame_id in frames_to_remove { + let _ = store.remove(&frame_id); + } + } + + GCTask::Drain(tx) => { + let _ = tx.send(()); + } + } + } + }); +} + fn get_range(last_id: Option<&Scru128Id>) -> (Bound>, Bound>) { match last_id { Some(last_id) => ( diff --git a/src/store/tests.rs b/src/store/tests.rs index c0fa4ef..84cdb90 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -424,6 +424,7 @@ mod tests_ttl_expire { ); // Assert the underlying partition has been updated + store.wait_for_gc().await; assert_eq!(store.get(&expiring_frame.id), None); } @@ -470,6 +471,7 @@ mod tests_ttl_expire { ); // Read all frames and assert exact expected set + store.wait_for_gc().await; let recver = store.read(ReadOptions::default()).await; let frames = tokio_stream::wrappers::ReceiverStream::new(recver) .collect::>()