From f456b7838e594090b8b8b4e6005905ab19a6f1e9 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 29 Nov 2024 14:24:02 -0500 Subject: [PATCH 1/6] start on support for batch handlers --- src/handlers/handler.rs | 55 ++++++++++-- src/handlers/mod.rs | 2 +- src/handlers/serve.rs | 192 ++++++++++++++++++++++++++++++++++++++-- src/store.rs | 10 ++- 4 files changed, 241 insertions(+), 18 deletions(-) diff --git a/src/handlers/handler.rs b/src/handlers/handler.rs index 5fcdeef..54b192c 100644 --- a/src/handlers/handler.rs +++ b/src/handlers/handler.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use nu_engine::eval_block_with_early_return; use nu_protocol::debugger::WithoutDebug; use nu_protocol::engine::Stack; @@ -10,19 +12,31 @@ use scru128::Scru128Id; use crate::error::Error; use crate::nu; use crate::nu::frame_to_value; +use crate::store::{Frame, Store}; use crate::thread_pool::ThreadPool; - -#[derive(Clone, Debug, serde::Deserialize)] -#[serde(untagged)] -pub enum StartDefinition { - Head { head: String }, -} +use crate::ttl::TTL; #[derive(Clone, Debug, serde::Deserialize, Default)] pub struct Meta { pub initial_state: Option, pub pulse: Option, - pub start: Option, + pub mode: Mode, +} + +#[derive(Clone, Debug, serde::Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum Mode { + #[default] + Batch, + Online { + start: Option, + }, +} + +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(untagged)] +pub enum StartDefinition { + Head { head: String }, } #[derive(Clone)] @@ -124,4 +138,31 @@ impl Handler { })? .into_value(Span::unknown())?) } + + pub async fn get_cursor(&self, store: &Store) -> Option { + store + .head(&format!("{}.cursor", self.topic)) + .and_then(|frame| { + frame.meta.and_then(|meta| { + meta.get("frame_id").and_then(|v| { + v.as_str() + .and_then(|id| scru128::Scru128Id::from_str(id).ok()) + }) + }) + }) + } + + pub async fn save_cursor(&self, store: &Store, frame_id: Scru128Id) { + let _ = store + .append( + Frame::with_topic(format!("{}.cursor", self.topic)) + .meta(serde_json::json!({ + "handler_id": self.id.to_string(), + "frame_id": frame_id.to_string(), + })) + .ttl(TTL::Head(1)) + .build(), + ) + .await; + } } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index eace0a1..b939229 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -1,5 +1,5 @@ mod handler; mod serve; -pub use handler::{Handler, Meta, StartDefinition}; +pub use handler::{Handler, Meta, Mode, StartDefinition}; pub use serve::serve; diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index 1f2753f..77aca86 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -8,7 +8,7 @@ use tokio_util::compat::FuturesAsyncReadCompatExt; use nu_protocol::Value; use crate::error::Error; -use crate::handlers::{Handler, Meta}; +use crate::handlers::{Handler, Meta, Mode, StartDefinition}; use crate::nu; use crate::nu::util::value_to_json; use crate::store::{FollowOption, Frame, ReadOptions, Store}; @@ -68,6 +68,7 @@ async fn handle_result_stateless(store: &Store, handler: &Handler, frame: &Frame } } } + async fn spawn( store: Store, handler: Handler, @@ -75,26 +76,33 @@ async fn spawn( ) -> Result, Error> { let (tx_command, _rx_command) = tokio::sync::mpsc::channel(1); - let last_id: Option = if let Some(start) = handler.meta.start.as_ref() { - match start { - crate::handlers::StartDefinition::Head { head } => { - store.head(head).map(|frame| frame.id) + let last_id: Option = match handler.meta.mode { + Mode::Batch => handler.get_cursor(&store).await, + Mode::Online { ref start } => { + if let Some(start) = start { + match start { + StartDefinition::Head { head } => store.head(head).map(|frame| frame.id), + } + } else { + None } } - } else { - None }; + eprintln!("LAST_ID: {:?}", last_id.map(|id| id.to_string())); + let follow_option = handler .meta .pulse .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse))) .unwrap_or(FollowOption::On); + let options = ReadOptions::builder() .follow(follow_option) - .tail(last_id.is_none()) + .tail(matches!(handler.meta.mode, Mode::Online { start: None })) .maybe_last_id(last_id) .build(); + let mut recver = store.read(options).await; { @@ -103,6 +111,21 @@ async fn spawn( tokio::spawn(async move { while let Some(frame) = recver.recv().await { + eprintln!("HANDLER: {} SEE: frame: {:?}", handler.id, frame); + + // Skip cursor frames to prevent feedback loop of handlers updating cursors in response to cursor frames + if frame.topic.ends_with(".cursor") { + continue; + } + + // Skip registration activity that occurred before this handler was registered + if (frame.topic == format!("{}.register", handler.topic) + || frame.topic == format!("{}.unregister", handler.topic)) + && frame.id <= handler.id + { + continue; + } + // Skip frames that were generated by this handler if let Some(meta) = &frame.meta { if let Some(handler_id) = meta.get("handler_id") { @@ -114,6 +137,8 @@ async fn spawn( } } + eprintln!("HANDLER: {} PROCESSING: frame: {:?}", handler.id, frame); + if (frame.topic == format!("{}.register", &handler.topic) && frame.id != handler.id) || frame.topic == format!("{}.unregister", &handler.topic) { @@ -132,12 +157,21 @@ async fn spawn( } let value = handler.eval_in_thread(&pool, &frame).await; + if handler.stateful { handle_result_stateful(&store, &mut handler, &frame, value).await; } else { handle_result_stateless(&store, &handler, &frame, value).await; } + + // Save cursor after processing in batch mode, skip ephemeral frames + if matches!(handler.meta.mode, Mode::Batch) && frame.ttl != Some(TTL::Ephemeral) { + eprintln!("HANDLER: {} SAVING CURSOR: frame: {:?}", handler.id, frame); + handler.save_cursor(&store, frame.id).await; + } } + + eprintln!("HANDLER: {} EXITING", handler.id); }); } @@ -670,4 +704,146 @@ mod tests { } } } + + #[tokio::test] + async fn test_mode_batch() { + let temp_dir = TempDir::new().unwrap(); + let store = Store::new(temp_dir.into_path()).await; + let pool = ThreadPool::new(4); + let engine = nu::Engine::new(store.clone()).unwrap(); + + { + let store = store.clone(); + let _ = tokio::spawn(async move { + serve(store, engine, pool).await.unwrap(); + }); + } + + // Create some initial data + let frame1 = store.append(Frame::with_topic("pew").build()).await; + let frame2 = store.append(Frame::with_topic("pew").build()).await; + + let options = ReadOptions::builder().follow(FollowOption::On).build(); + let mut recver = store.read(options).await; + + assert_eq!(recver.recv().await.unwrap().topic, "pew"); + assert_eq!(recver.recv().await.unwrap().topic, "pew"); + assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); + + // Start handler in batch mode + let frame_handler = store + .append( + Frame::with_topic("action.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "pew" { return } + "processed" + }"#, + ) + .await + .unwrap(), + ) + .meta(serde_json::json!({ + "mode": "batch" + })) + .build(), + ) + .await; + + assert_eq!(recver.recv().await.unwrap().topic, "action.register"); + assert_eq!(recver.recv().await.unwrap().topic, "action.registered"); + + // Should process historical frames + let frame = recver.recv().await.unwrap(); + assert_eq!(frame.topic, "action"); + let meta = frame.meta.unwrap(); + assert_eq!(meta["handler_id"], frame_handler.id.to_string()); + assert_eq!(meta["frame_id"], frame1.id.to_string()); + + let cursor = recver.recv().await.unwrap(); + assert_eq!(cursor.topic, "action.cursor"); + let meta = cursor.meta.unwrap(); + assert_eq!(meta["handler_id"], frame_handler.id.to_string()); + assert_eq!(meta["frame_id"], frame1.id.to_string()); + + let frame = recver.recv().await.unwrap(); + assert_eq!(frame.topic, "action"); + let meta = frame.meta.unwrap(); + assert_eq!(meta["frame_id"], frame2.id.to_string()); + + let cursor = recver.recv().await.unwrap(); + assert_eq!(cursor.topic, "action.cursor"); + let meta = cursor.meta.unwrap(); + assert_eq!(meta["frame_id"], frame2.id.to_string()); + + assert_no_more_frames(&mut recver).await; + + // Unregister handler and restart - should resume from cursor + store + .append(Frame::with_topic("action.unregister").build()) + .await; + assert_eq!(recver.recv().await.unwrap().topic, "action.unregister"); + assert_eq!(recver.recv().await.unwrap().topic, "action.unregistered"); + + assert_no_more_frames(&mut recver).await; + + // Restart handler + let frame_handler_2 = store + .append( + Frame::with_topic("action.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "pew" { return } + "processed" + }"#, + ) + .await + .unwrap(), + ) + .meta(serde_json::json!({ + "mode": "batch" + })) + .build(), + ) + .await; + + assert_eq!(recver.recv().await.unwrap().topic, "action.register"); + assert_eq!(recver.recv().await.unwrap().topic, "action.registered"); + + assert_no_more_frames(&mut recver).await; + + let frame3 = store.append(Frame::with_topic("pew").build()).await; + assert_eq!(recver.recv().await.unwrap().topic, "pew"); + + // Should only process frame3 since we resume from cursor + let frame = recver.recv().await.unwrap(); + assert_eq!(frame.topic, "action"); + let meta = frame.meta.clone().unwrap(); + assert_eq!(meta["handler_id"], frame_handler_2.id.to_string()); + assert_eq!(meta["frame_id"], frame3.id.to_string()); + + let cursor = recver.recv().await.unwrap(); + assert_eq!(cursor.topic, "action.cursor"); + let meta = cursor.meta.unwrap(); + assert_eq!(meta["frame_id"], frame3.id.to_string()); + + assert_no_more_frames(&mut recver).await; + } + + async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver) { + let timeout = tokio::time::sleep(std::time::Duration::from_millis(50)); + tokio::pin!(timeout); + tokio::select! { + Some(frame) = recver.recv() => { + panic!("Unexpected frame processed: {:?}", frame); + } + _ = &mut timeout => { + // Success - no additional frames were processed + } + } + } } diff --git a/src/store.rs b/src/store.rs index ff91d2d..b87fd29 100644 --- a/src/store.rs +++ b/src/store.rs @@ -236,7 +236,10 @@ impl Store { && options_clone.compaction_strategy.is_none() && options_clone.limit.is_none() { - let threshold = Frame::with_topic("xs.threshold").id(scru128::new()).build(); + let threshold = Frame::with_topic("xs.threshold") + .id(scru128::new()) + .ttl(TTL::Ephemeral) + .build(); if tx_clone.blocking_send(threshold).is_err() { return; } @@ -300,7 +303,10 @@ impl Store { tokio::spawn(async move { loop { tokio::time::sleep(duration).await; - let frame = Frame::with_topic("xs.pulse").id(scru128::new()).build(); + let frame = Frame::with_topic("xs.pulse") + .id(scru128::new()) + .ttl(TTL::Ephemeral) + .build(); if heartbeat_tx.send(frame).await.is_err() { break; } From 07eb82f48a1bfd7750b4bfec7916c83401430bbd Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 29 Nov 2024 14:26:04 -0500 Subject: [PATCH 2/6] use convenience to assert no more frames --- src/handlers/serve.rs | 49 ++++--------------------------------------- 1 file changed, 4 insertions(+), 45 deletions(-) diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index 77aca86..efdd5df 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -531,17 +531,7 @@ mod tests { assert_eq!(meta["handler_id"], frame_handler_2.id.to_string()); assert_eq!(meta["frame_id"], frame_pew.id.to_string()); - // Ensure we've processed all frames - let timeout = tokio::time::sleep(std::time::Duration::from_millis(50)); - tokio::pin!(timeout); - tokio::select! { - Some(frame) = recver.recv() => { - panic!("Unregistered handler still processing: {:?}", frame); - } - _ = &mut timeout => { - // Success - no frames processed after unregister - } - } + assert_no_more_frames(&mut recver).await; // Test explicit unregistration store @@ -560,17 +550,7 @@ mod tests { let _ = store.append(Frame::with_topic("pew").build()).await; assert_eq!(recver.recv().await.unwrap().topic, "pew".to_string()); - // No response should come since handler is unregistered - let timeout = tokio::time::sleep(std::time::Duration::from_millis(50)); - tokio::pin!(timeout); - tokio::select! { - Some(frame) = recver.recv() => { - panic!("Unregistered handler still processing: {:?}", frame); - } - _ = &mut timeout => { - // Success - no frames processed after unregister - } - } + assert_no_more_frames(&mut recver).await; } #[tokio::test] @@ -625,18 +605,7 @@ mod tests { assert_eq!(recver.recv().await.unwrap().topic, "a-frame"); assert_eq!(recver.recv().await.unwrap().topic, "echo"); - // Wait a bit to ensure no more frames are processed - let timeout = tokio::time::sleep(std::time::Duration::from_millis(50)); - tokio::pin!(timeout); - - tokio::select! { - Some(frame) = recver.recv() => { - panic!("Handler processed its own output: {:?}", frame); - } - _ = &mut timeout => { - // Success - no additional frames were processed - } - } + assert_no_more_frames(&mut recver).await; } #[tokio::test] @@ -692,17 +661,7 @@ mod tests { let error_message = meta["error"].as_str().unwrap(); assert!(error_message.contains("Closure must accept 1 or 2 arguments")); - // Ensure no additional frames are processed - let timeout = tokio::time::sleep(std::time::Duration::from_millis(50)); - tokio::pin!(timeout); - tokio::select! { - Some(frame) = recver.recv() => { - panic!("Unexpected frame processed: {:?}", frame); - } - _ = &mut timeout => { - // Success - no additional frames were processed - } - } + assert_no_more_frames(&mut recver).await; } #[tokio::test] From 4b767832c43ecdec0b944314cabfe5c94009ee72 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 29 Nov 2024 14:35:46 -0500 Subject: [PATCH 3/6] Online prefers to start from tail --- src/handlers/serve.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index efdd5df..966db7c 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -99,7 +99,7 @@ async fn spawn( let options = ReadOptions::builder() .follow(follow_option) - .tail(matches!(handler.meta.mode, Mode::Online { start: None })) + .tail(matches!(handler.meta.mode, Mode::Online { .. }) && last_id.is_none()) .maybe_last_id(last_id) .build(); From 1ad29d1b87e4fe8cc70d57ef55c693373607d609 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 29 Nov 2024 16:21:01 -0500 Subject: [PATCH 4/6] rework Handler meta enums --- src/handlers/handler.rs | 57 +++++++++++++++++++++++++++++++++-------- src/handlers/mod.rs | 2 +- src/handlers/serve.rs | 41 ++++++----------------------- 3 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/handlers/handler.rs b/src/handlers/handler.rs index 54b192c..f1ace08 100644 --- a/src/handlers/handler.rs +++ b/src/handlers/handler.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::time::Duration; use nu_engine::eval_block_with_early_return; use nu_protocol::debugger::WithoutDebug; @@ -12,31 +13,31 @@ use scru128::Scru128Id; use crate::error::Error; use crate::nu; use crate::nu::frame_to_value; -use crate::store::{Frame, Store}; +use crate::store::{FollowOption, Frame, ReadOptions, Store}; use crate::thread_pool::ThreadPool; use crate::ttl::TTL; -#[derive(Clone, Debug, serde::Deserialize, Default)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] pub struct Meta { pub initial_state: Option, pub pulse: Option, pub mode: Mode, } -#[derive(Clone, Debug, serde::Deserialize, Default)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] #[serde(rename_all = "snake_case")] pub enum Mode { #[default] Batch, - Online { - start: Option, - }, + Online(StartMode), } -#[derive(Clone, Debug, serde::Deserialize)] -#[serde(untagged)] -pub enum StartDefinition { - Head { head: String }, +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum StartMode { + #[default] + Tail, + Head(String), } #[derive(Clone)] @@ -165,4 +166,40 @@ impl Handler { ) .await; } + + pub async fn configure_read_options(&self, store: &Store) -> ReadOptions { + let last_id: Option = match self.meta.mode { + Mode::Batch => store + .head(&format!("{}.cursor", self.topic)) + .and_then(|frame| { + frame.meta.and_then(|meta| { + meta.get("frame_id").and_then(|v| { + v.as_str() + .and_then(|id| scru128::Scru128Id::from_str(id).ok()) + }) + }) + }), + Mode::Online(ref start_mode) => match start_mode { + StartMode::Tail => None, + StartMode::Head(ref head) => store.head(head).map(|frame| frame.id), + }, + }; + + eprintln!("LAST_ID: {:?}", last_id.map(|id| id.to_string())); + + let follow_option = self + .meta + .pulse + .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse))) + .unwrap_or(FollowOption::On); + + let is_tail = matches!(self.meta.mode, Mode::Online(_)) && last_id.is_none(); + eprintln!("Tail: {}", is_tail); + + ReadOptions::builder() + .follow(follow_option) + .tail(is_tail) + .maybe_last_id(last_id) + .build() + } } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index b939229..20958ff 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -1,5 +1,5 @@ mod handler; mod serve; -pub use handler::{Handler, Meta, Mode, StartDefinition}; +pub use handler::{Handler, Meta, Mode}; pub use serve::serve; diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index 966db7c..30aae64 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -1,14 +1,10 @@ -use std::time::Duration; - -use scru128::Scru128Id; - use tokio::io::AsyncReadExt; use tokio_util::compat::FuturesAsyncReadCompatExt; use nu_protocol::Value; use crate::error::Error; -use crate::handlers::{Handler, Meta, Mode, StartDefinition}; +use crate::handlers::{Handler, Meta, Mode}; use crate::nu; use crate::nu::util::value_to_json; use crate::store::{FollowOption, Frame, ReadOptions, Store}; @@ -74,35 +70,11 @@ async fn spawn( handler: Handler, pool: ThreadPool, ) -> Result, Error> { - let (tx_command, _rx_command) = tokio::sync::mpsc::channel(1); - - let last_id: Option = match handler.meta.mode { - Mode::Batch => handler.get_cursor(&store).await, - Mode::Online { ref start } => { - if let Some(start) = start { - match start { - StartDefinition::Head { head } => store.head(head).map(|frame| frame.id), - } - } else { - None - } - } - }; + eprintln!("HANDLER: {:?} SPAWNING", handler.meta); - eprintln!("LAST_ID: {:?}", last_id.map(|id| id.to_string())); - - let follow_option = handler - .meta - .pulse - .map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse))) - .unwrap_or(FollowOption::On); - - let options = ReadOptions::builder() - .follow(follow_option) - .tail(matches!(handler.meta.mode, Mode::Online { .. }) && last_id.is_none()) - .maybe_last_id(last_id) - .build(); + let (tx_command, _rx_command) = tokio::sync::mpsc::channel(1); + let options = handler.configure_read_options(&store).await; let mut recver = store.read(options).await; { @@ -555,7 +527,7 @@ mod tests { #[tokio::test] // This test is to ensure that a handler does not process its own output - async fn test_handler_stateless_no_self_loop() { + async fn test_no_self_loop() { let temp_dir = TempDir::new().unwrap(); let store = Store::new(temp_dir.into_path()).await; let pool = ThreadPool::new(4); @@ -590,6 +562,9 @@ mod tests { .await .unwrap(), ) + .meta(serde_json::json!({ + "mode": {"online": "tail"} + })) .build(), ) .await; From 562f53367e1f2776575f1d503c53f12aa26a6fdb Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Fri, 29 Nov 2024 16:26:46 -0500 Subject: [PATCH 5/6] wip --- src/handlers/serve.rs | 75 ++++--------------------------------------- 1 file changed, 7 insertions(+), 68 deletions(-) diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index 30aae64..b5fe0fb 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -232,73 +232,6 @@ mod tests { use std::collections::HashMap; use tempfile::TempDir; - #[tokio::test] - async fn test_serve_stateless() { - let temp_dir = TempDir::new().unwrap(); - let store = Store::new(temp_dir.into_path()).await; - let pool = ThreadPool::new(4); - let engine = nu::Engine::new(store.clone()).unwrap(); - - { - let store = store.clone(); - let _ = tokio::spawn(async move { - serve(store, engine, pool).await.unwrap(); - }); - } - - let frame_handler = store - .append( - Frame::with_topic("action.register") - .hash( - store - .cas_insert( - r#"{|frame| - if $frame.topic != "topic2" { return } - "ran action" - }"#, - ) - .await - .unwrap(), - ) - .build(), - ) - .await; - - let options = ReadOptions::builder().follow(FollowOption::On).build(); - let mut recver = store.read(options).await; - - assert_eq!( - recver.recv().await.unwrap().topic, - "action.register".to_string() - ); - assert_eq!( - recver.recv().await.unwrap().topic, - "xs.threshold".to_string() - ); - assert_eq!( - recver.recv().await.unwrap().topic, - "action.registered".to_string() - ); - - let _ = store.append(Frame::with_topic("topic1").build()).await; - let frame_topic2 = store.append(Frame::with_topic("topic2").build()).await; - assert_eq!(recver.recv().await.unwrap().topic, "topic1".to_string()); - assert_eq!(recver.recv().await.unwrap().topic, "topic2".to_string()); - - let frame = recver.recv().await.unwrap(); - assert_eq!(frame.topic, "action".to_string()); - - let meta = frame.meta.unwrap(); - assert_eq!(meta["handler_id"], frame_handler.id.to_string()); - assert_eq!(meta["frame_id"], frame_topic2.id.to_string()); - - let content = store.cas_read(&frame.hash.unwrap()).await.unwrap(); - assert_eq!(content, r#""ran action""#.as_bytes()); - - let _ = store.append(Frame::with_topic("topic3").build()).await; - assert_eq!(recver.recv().await.unwrap().topic, "topic3".to_string()); - } - #[tokio::test] async fn test_serve_stateful() { let temp_dir = TempDir::new().unwrap(); @@ -380,7 +313,7 @@ mod tests { } #[tokio::test] - async fn test_handler_update() { + async fn test_online_tail() { let temp_dir = TempDir::new().unwrap(); let store = Store::new(temp_dir.into_path()).await; let pool = ThreadPool::new(4); @@ -415,6 +348,9 @@ mod tests { .await .unwrap(), ) + .meta(serde_json::json!({ + "mode": {"online": "tail"} + })) .build(), ) .await; @@ -454,6 +390,9 @@ mod tests { .await .unwrap(), ) + .meta(serde_json::json!({ + "mode": {"online": "tail"} + })) .build(), ) .await; From a910bf267ec59661915684146e78c0a988a8af53 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Sat, 30 Nov 2024 09:38:09 -0500 Subject: [PATCH 6/6] normalize return value handling --- src/handlers/serve.rs | 40 +++++----------------------------------- 1 file changed, 5 insertions(+), 35 deletions(-) diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index b5fe0fb..ca88db2 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -11,36 +11,7 @@ use crate::store::{FollowOption, Frame, ReadOptions, Store}; use crate::thread_pool::ThreadPool; use crate::ttl::TTL; -async fn handle_result_stateful(store: &Store, handler: &mut Handler, frame: &Frame, value: Value) { - match value { - Value::Nothing { .. } => (), - Value::Record { ref val, .. } => { - if let Some(state) = val.get("state") { - handler.state = Some(state.clone()); - } - let _ = store - .append( - Frame::with_topic(format!("{}.state", &handler.topic)) - .hash( - store - .cas_insert(&value_to_json(&value).to_string()) - .await - .unwrap(), - ) - .meta(serde_json::json!({ - "handler_id": handler.id.to_string(), - "frame_id": frame.id.to_string(), - })) - .ttl(TTL::Ephemeral) - .build(), - ) - .await; - } - _ => panic!("unexpected value type"), - } -} - -async fn handle_result_stateless(store: &Store, handler: &Handler, frame: &Frame, value: Value) { +async fn handle_result(store: &Store, handler: &Handler, frame: &Frame, value: Value) { match value { Value::Nothing { .. } => (), _ => { @@ -57,6 +28,7 @@ async fn handle_result_stateless(store: &Store, handler: &Handler, frame: &Frame "handler_id": handler.id.to_string(), "frame_id": frame.id.to_string(), })) + // TODO: TTL should be configurable .ttl(TTL::Ephemeral) .build(), ) @@ -130,11 +102,7 @@ async fn spawn( let value = handler.eval_in_thread(&pool, &frame).await; - if handler.stateful { - handle_result_stateful(&store, &mut handler, &frame, value).await; - } else { - handle_result_stateless(&store, &handler, &frame, value).await; - } + handle_result(&store, &handler, &frame, value).await; // Save cursor after processing in batch mode, skip ephemeral frames if matches!(handler.meta.mode, Mode::Batch) && frame.ttl != Some(TTL::Ephemeral) { @@ -153,6 +121,8 @@ async fn spawn( .meta(serde_json::json!({ "handler_id": handler.id.to_string(), })) + // Todo: + // .ttl(TTL::Head(1)) .ttl(TTL::Ephemeral) .build(), )