Skip to content

Commit

Permalink
feat: move cleaning up expired frames to a background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Dec 28, 2024
1 parent a83f75b commit 81bb22a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 52 deletions.
1 change: 1 addition & 0 deletions src/handlers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ async fn test_return_options() {
assert_eq!(meta["frame_id"], frame2.id.to_string());

// Only newest response should be in store
store.wait_for_gc().await;
let options = ReadOptions::default();
let recver = store.read(options).await;
use tokio_stream::StreamExt;
Expand Down
128 changes: 76 additions & 52 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use std::path::PathBuf;
use std::time::Duration;

use tokio::sync::broadcast;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

use scru128::Scru128Id;

use serde::{Deserialize, Deserializer, Serialize};

use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle, Slice};

use crate::error::Error;

#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
#[builder(start_fn = with_topic)]
pub struct Frame {
Expand Down Expand Up @@ -114,21 +113,26 @@ where
Some((&slice[..index], &slice[index + 1..]))
}

#[derive(Debug)]
enum GCTask {
Remove(Scru128Id),
CheckHeadTTL { topic: String, keep: u32 },
Drain(tokio::sync::oneshot::Sender<()>),
}

#[derive(Clone)]
pub struct Store {
pub path: PathBuf,

keyspace: Keyspace,
frame_partition: PartitionHandle,
topic_index: PartitionHandle,

broadcast_tx: broadcast::Sender<Frame>,
gc_tx: UnboundedSender<GCTask>,
}

impl Store {
pub fn new(path: PathBuf) -> Store {
let config = Config::new(path.join("fjall"));

let keyspace = config
.flush_workers(1)
.compaction_workers(1)
Expand All @@ -144,16 +148,27 @@ impl Store {
.unwrap();

let (broadcast_tx, _) = broadcast::channel(1024);
let (gc_tx, gc_rx) = mpsc::unbounded_channel();

Store {
path,
let store = Store {
path: path.clone(),
keyspace: keyspace.clone(),
frame_partition: frame_partition.clone(),
topic_index: topic_index.clone(),
broadcast_tx,
gc_tx,
};

keyspace,
frame_partition,
topic_index,
// Spawn gc worker thread
spawn_gc_worker(gc_rx, store.clone());

broadcast_tx,
}
store
}

pub async fn wait_for_gc(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self.gc_tx.send(GCTask::Drain(tx));
let _ = rx.await;
}

#[tracing::instrument(skip(self))]
Expand All @@ -177,28 +192,24 @@ impl Store {
// Only create done channel if we're doing historical processing
let done_rx = if !options.tail {
let (done_tx, done_rx) = tokio::sync::oneshot::channel();

// Clone these for the background thread
let tx_clone = tx.clone();
let partition = self.frame_partition.clone();
let options_clone = options.clone();
let should_follow_clone = should_follow;

let store_clone = self.clone();
let gc_tx = self.gc_tx.clone();

// Spawn OS thread to handle historical events
std::thread::spawn(move || {
let mut last_id = None;
let mut count = 0;
let mut expired_frames = Vec::new();

let range = get_range(options_clone.last_id.as_ref());
for record in partition.range(range) {
let frame = deserialize_frame(record.unwrap());

if let Some(TTL::Time(ttl)) = frame.ttl.as_ref() {
if is_expired(&frame.id, ttl) {
expired_frames.push(frame.id);
let _ = gc_tx.send(GCTask::Remove(frame.id));
continue;
}
}
Expand All @@ -211,7 +222,7 @@ impl Store {
}
}
if tx_clone.blocking_send(frame).is_err() {
return; // Receiver dropped, exit thread
return;
}
count += 1;
}
Expand All @@ -227,10 +238,6 @@ impl Store {
}
}

for id in expired_frames {
let _ = store_clone.remove(&id);
}

// Signal completion with the last seen ID and count
let _ = done_tx.send((last_id, count));
});
Expand All @@ -240,7 +247,7 @@ impl Store {
None
};

// For tail mode or if we're following, spawn task to handle broadcast
// Handle broadcast subscription and heartbeat
if let Some(broadcast_rx) = broadcast_rx {
{
let tx = tx.clone();
Expand Down Expand Up @@ -368,41 +375,19 @@ impl Store {
self.keyspace.persist(fjall::PersistMode::SyncAll)
}

pub fn append(&self, frame: Frame) -> Frame {
let mut frame = frame;
pub fn append(&self, mut frame: Frame) -> Frame {
frame.id = scru128::new();

// only store the frame if it's not ephemeral
if frame.ttl != Some(TTL::Ephemeral) {
self.insert_frame(&frame).unwrap();

// If this is a Head TTL, cleanup old frames AFTER insert
// If this is a Head TTL, schedule a gc task
if let Some(TTL::Head(n)) = frame.ttl {
let prefix = topic_index_key_for_frame(&frame);
let prefix = &prefix[..prefix.len() - frame.id.as_bytes().len()];

let frames_to_remove: Vec<_> = self
.topic_index
.prefix(prefix)
.rev() // Scan from newest to oldest
.skip(n as usize)
.map(|r| -> Result<_, Error> {
let (key, _) = r?;
let (_topic_bytes, frame_id_bytes) =
split_once(&key, |&c| c == 0xFF).ok_or("Missing delimiter")?;
let bytes: [u8; 16] = frame_id_bytes
.try_into()
.map_err(|_| "Invalid frame ID length")?;
Ok(Scru128Id::from_bytes(bytes))
})
.collect::<Result<_, _>>()
.unwrap();

for frame_id in frames_to_remove {
tracing::trace!("Removing old frame: {:?}", frame_id);
let _ = self.remove(&frame_id);
tracing::trace!("Removed old frame: {:?}", frame_id);
}
let _ = self.gc_tx.send(GCTask::CheckHeadTTL {
topic: frame.topic.clone(),
keep: n,
});
}
}

Expand All @@ -411,6 +396,45 @@ impl Store {
}
}

fn spawn_gc_worker(mut gc_rx: UnboundedReceiver<GCTask>, store: Store) {
std::thread::spawn(move || {
while let Some(task) = gc_rx.blocking_recv() {
match task {
GCTask::Remove(id) => {
let _ = store.remove(&id);
}

GCTask::CheckHeadTTL { topic, keep } => {
let mut prefix = Vec::with_capacity(topic.len() + 1);
prefix.extend(topic.as_bytes());
prefix.push(0xFF);

let frames_to_remove: Vec<_> = store
.topic_index
.prefix(&prefix)
.rev() // Scan from newest to oldest
.skip(keep as usize)
.filter_map(|r| {
let (key, _) = r.ok()?;
let (_, frame_id_bytes) = split_once(&key, |&c| c == 0xFF)?;
let bytes: [u8; 16] = frame_id_bytes.try_into().ok()?;
Some(Scru128Id::from_bytes(bytes))
})
.collect();

for frame_id in frames_to_remove {
let _ = store.remove(&frame_id);
}
}

GCTask::Drain(tx) => {
let _ = tx.send(());
}
}
}
});
}

fn get_range(last_id: Option<&Scru128Id>) -> (Bound<Vec<u8>>, Bound<Vec<u8>>) {
match last_id {
Some(last_id) => (
Expand Down
2 changes: 2 additions & 0 deletions src/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ mod tests_ttl_expire {
);

// Assert the underlying partition has been updated
store.wait_for_gc().await;
assert_eq!(store.get(&expiring_frame.id), None);
}

Expand Down Expand Up @@ -470,6 +471,7 @@ mod tests_ttl_expire {
);

// Read all frames and assert exact expected set
store.wait_for_gc().await;
let recver = store.read(ReadOptions::default()).await;
let frames = tokio_stream::wrappers::ReceiverStream::new(recver)
.collect::<Vec<Frame>>()
Expand Down

0 comments on commit 81bb22a

Please sign in to comment.