From add27f4f7eec66ac7523fcaaa82a9cf23a0c2e39 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Sat, 30 Nov 2024 10:27:51 -0500 Subject: [PATCH] abandoning the cursor approach --- src/handlers/handler.rs | 88 +++++++++++++---------------------------- src/handlers/mod.rs | 2 +- src/handlers/serve.rs | 8 +--- 3 files changed, 30 insertions(+), 68 deletions(-) diff --git a/src/handlers/handler.rs b/src/handlers/handler.rs index f1ace08..a42ce1b 100644 --- a/src/handlers/handler.rs +++ b/src/handlers/handler.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::time::Duration; use nu_engine::eval_block_with_early_return; @@ -13,31 +12,30 @@ use scru128::Scru128Id; use crate::error::Error; use crate::nu; use crate::nu::frame_to_value; -use crate::store::{FollowOption, Frame, ReadOptions, Store}; +use crate::store::{FollowOption, ReadOptions, Store}; use crate::thread_pool::ThreadPool; -use crate::ttl::TTL; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] pub struct Meta { pub initial_state: Option, pub pulse: Option, - pub mode: Mode, + pub start: StartFrom, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] #[serde(rename_all = "snake_case")] -pub enum Mode { - #[default] - Batch, - Online(StartMode), -} - -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] -#[serde(rename_all = "snake_case")] -pub enum StartMode { - #[default] +pub enum StartFrom { + /// Only process new frames + #[default] Tail, - Head(String), + /// Process from the beginning of the stream + Root, + /// Try specific topic, then tail + At { + topic: String, + #[serde(default)] + or_tail: bool, + }, } #[derive(Clone)] @@ -140,62 +138,32 @@ impl Handler { .into_value(Span::unknown())?) } - pub async fn get_cursor(&self, store: &Store) -> Option { - store - .head(&format!("{}.cursor", self.topic)) - .and_then(|frame| { - frame.meta.and_then(|meta| { - meta.get("frame_id").and_then(|v| { - v.as_str() - .and_then(|id| scru128::Scru128Id::from_str(id).ok()) - }) - }) - }) - } - - pub async fn save_cursor(&self, store: &Store, frame_id: Scru128Id) { - let _ = store - .append( - Frame::with_topic(format!("{}.cursor", self.topic)) - .meta(serde_json::json!({ - "handler_id": self.id.to_string(), - "frame_id": frame_id.to_string(), - })) - .ttl(TTL::Head(1)) - .build(), - ) - .await; - } - pub async fn configure_read_options(&self, store: &Store) -> ReadOptions { - let last_id: Option = match self.meta.mode { - Mode::Batch => store - .head(&format!("{}.cursor", self.topic)) - .and_then(|frame| { - frame.meta.and_then(|meta| { - meta.get("frame_id").and_then(|v| { - v.as_str() - .and_then(|id| scru128::Scru128Id::from_str(id).ok()) - }) - }) - }), - Mode::Online(ref start_mode) => match start_mode { - StartMode::Tail => None, - StartMode::Head(ref head) => store.head(head).map(|frame| frame.id), - }, + // Determine last_id based on StartFrom + let (last_id, is_tail) = match &self.meta.start { + StartFrom::Root => (None, false), + StartFrom::Tail => (None, true), + StartFrom::At { topic, or_tail } => { + let id = store.head(topic).map(|frame| frame.id); + // If we found the topic, use it. Otherwise fall back based on or_tail + match (id, or_tail) { + (Some(id), _) => (Some(id), false), // Found topic, use it + (None, true) => (None, true), // Not found, fallback to tail + (None, false) => (None, false), // Not found, fallback to root + } + } }; eprintln!("LAST_ID: {:?}", last_id.map(|id| id.to_string())); + eprintln!("Tail: {}", is_tail); + // Configure follow option based on pulse setting let follow_option = self .meta .pulse .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse))) .unwrap_or(FollowOption::On); - let is_tail = matches!(self.meta.mode, Mode::Online(_)) && last_id.is_none(); - eprintln!("Tail: {}", is_tail); - ReadOptions::builder() .follow(follow_option) .tail(is_tail) diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 20958ff..4df35f3 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -1,5 +1,5 @@ mod handler; mod serve; -pub use handler::{Handler, Meta, Mode}; +pub use handler::{Handler, Meta}; pub use serve::serve; diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index ca88db2..acc02f8 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -4,7 +4,7 @@ use tokio_util::compat::FuturesAsyncReadCompatExt; use nu_protocol::Value; use crate::error::Error; -use crate::handlers::{Handler, Meta, Mode}; +use crate::handlers::{Handler, Meta}; use crate::nu; use crate::nu::util::value_to_json; use crate::store::{FollowOption, Frame, ReadOptions, Store}; @@ -103,12 +103,6 @@ async fn spawn( let value = handler.eval_in_thread(&pool, &frame).await; handle_result(&store, &handler, &frame, value).await; - - // Save cursor after processing in batch mode, skip ephemeral frames - if matches!(handler.meta.mode, Mode::Batch) && frame.ttl != Some(TTL::Ephemeral) { - eprintln!("HANDLER: {} SAVING CURSOR: frame: {:?}", handler.id, frame); - handler.save_cursor(&store, frame.id).await; - } } eprintln!("HANDLER: {} EXITING", handler.id);