diff --git a/src/api.rs b/src/api.rs index da0389e..6d8065b 100644 --- a/src/api.rs +++ b/src/api.rs @@ -254,15 +254,13 @@ async fn handle_stream_append( Err(e) => return response_400(e.to_string()), }; - let frame = store - .append( - Frame::with_topic(topic) - .maybe_hash(hash) - .maybe_meta(meta) - .ttl(ttl) - .build(), - ) - .await; + let frame = store.append( + Frame::with_topic(topic) + .maybe_hash(hash) + .maybe_meta(meta) + .ttl(ttl) + .build(), + ); Ok(Response::builder() .status(StatusCode::OK) @@ -351,13 +349,11 @@ pub async fn serve( pool: ThreadPool, expose: Option, ) -> Result<(), Box> { - let _ = store - .append( - Frame::with_topic("xs.start") - .maybe_meta(expose.as_ref().map(|e| serde_json::json!({"expose": e}))) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic("xs.start") + .maybe_meta(expose.as_ref().map(|e| serde_json::json!({"expose": e}))) + .build(), + ); let path = store.path.join("sock").to_string_lossy().to_string(); let listener = Listener::bind(&path).await?; diff --git a/src/handlers/handler.rs b/src/handlers/handler.rs index 667ff6b..b9fe915 100644 --- a/src/handlers/handler.rs +++ b/src/handlers/handler.rs @@ -345,7 +345,7 @@ impl Handler { } } - let output_frame = store.append(output_frame).await; + let output_frame = store.append(output_frame); // Update state if the appended frame is a state frame if self.stateful && output_frame.topic == format!("{}.state", self.topic) { @@ -375,17 +375,15 @@ impl Handler { }); } - let _ = store - .append( - Frame::with_topic(format!("{}.registered", &self.topic)) - .meta(serde_json::json!({ - "handler_id": self.id.to_string(), - "tail": options.tail, - "last_id": options.last_id.map(|id| id.to_string()), - })) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.registered", &self.topic)) + .meta(serde_json::json!({ + "handler_id": self.id.to_string(), + "tail": options.tail, + "last_id": options.last_id.map(|id| id.to_string()), + })) + .build(), + ); Ok(()) } @@ -405,16 +403,14 @@ impl Handler { if frame.topic == format!("{}.register", &self.topic) || frame.topic == format!("{}.unregister", &self.topic) { - let _ = store - .append( - Frame::with_topic(format!("{}.unregistered", &self.topic)) - .meta(serde_json::json!({ - "handler_id": self.id.to_string(), - "frame_id": frame.id.to_string(), - })) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.unregistered", &self.topic)) + .meta(serde_json::json!({ + "handler_id": self.id.to_string(), + "frame_id": frame.id.to_string(), + })) + .build(), + ); break; } @@ -431,17 +427,15 @@ impl Handler { } if let Err(err) = self.process_frame(&frame, store, pool).await { - let _ = store - .append( - Frame::with_topic(format!("{}.unregistered", self.topic)) - .meta(serde_json::json!({ - "handler_id": self.id.to_string(), - "frame_id": frame.id.to_string(), - "error": err.to_string(), - })) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.unregistered", self.topic)) + .meta(serde_json::json!({ + "handler_id": self.id.to_string(), + "frame_id": frame.id.to_string(), + "error": err.to_string(), + })) + .build(), + ); break; } } diff --git a/src/handlers/serve.rs b/src/handlers/serve.rs index cc2c1df..16e2216 100644 --- a/src/handlers/serve.rs +++ b/src/handlers/serve.rs @@ -17,16 +17,14 @@ async fn start_handler( Ok(()) } Err(err) => { - let _ = store - .append( - Frame::with_topic(format!("{}.unregistered", topic)) - .meta(serde_json::json!({ - "handler_id": frame.id.to_string(), - "error": err.to_string(), - })) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.unregistered", topic)) + .meta(serde_json::json!({ + "handler_id": frame.id.to_string(), + "error": err.to_string(), + })) + .build(), + ); Ok(()) } } diff --git a/src/handlers/tests.rs b/src/handlers/tests.rs index 344702f..93d1fa5 100644 --- a/src/handlers/tests.rs +++ b/src/handlers/tests.rs @@ -110,20 +110,18 @@ async fn test_register_invalid_closure() { ); // Attempt to register a closure with no arguments - let frame_handler = store - .append( - Frame::with_topic("invalid.register") - .hash( - store - .cas_insert( - r#"{|| 42 }"#, // Invalid closure, expects at least one argument - ) - .await - .unwrap(), - ) - .build(), - ) - .await; + let frame_handler = store.append( + Frame::with_topic("invalid.register") + .hash( + store + .cas_insert( + r#"{|| 42 }"#, // Invalid closure, expects at least one argument + ) + .await + .unwrap(), + ) + .build(), + ); // Ensure the register frame is processed assert_eq!( @@ -155,22 +153,20 @@ async fn test_no_self_loop() { ); // Register handler that would process its own output if not prevented - store - .append( - Frame::with_topic("echo.register") - .hash( - store - .cas_insert( - r#"{|frame| - $frame - }"#, - ) - .await - .unwrap(), - ) - .build(), - ) - .await; + store.append( + Frame::with_topic("echo.register") + .hash( + store + .cas_insert( + r#"{|frame| + $frame + }"#, + ) + .await + .unwrap(), + ) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "echo.register"); assert_eq!(recver.recv().await.unwrap().topic, "echo.registered"); @@ -178,7 +174,7 @@ async fn test_no_self_loop() { // note we don't see an echo of the echo.registered frame // Trigger the handler - store.append(Frame::with_topic("a-frame").build()).await; + store.append(Frame::with_topic("a-frame").build()); // we should see the trigger, and then a single echo assert_eq!(recver.recv().await.unwrap().topic, "a-frame"); assert_eq!(recver.recv().await.unwrap().topic, "echo.out"); @@ -191,8 +187,8 @@ async fn test_essentials() { let (store, _temp_dir) = setup_test_environment().await; // Create initial frames - let pew1 = store.append(Frame::with_topic("pew").build()).await; - let pew2 = store.append(Frame::with_topic("pew").build()).await; + let pew1 = store.append(Frame::with_topic("pew").build()); + let pew2 = store.append(Frame::with_topic("pew").build()); let options = ReadOptions::builder().follow(FollowOption::On).build(); let mut recver = store.read(options).await; @@ -202,15 +198,13 @@ async fn test_essentials() { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); // Create a pointer frame that contains indicates we've processed pew1 - let _pointer_frame = store - .append( - Frame::with_topic("action.out") - .meta(serde_json::json!({ - "frame_id": pew1.id.to_string() - })) - .build(), - ) - .await; + let _pointer_frame = store.append( + Frame::with_topic("action.out") + .meta(serde_json::json!({ + "frame_id": pew1.id.to_string() + })) + .build(), + ); // Register handler with start pointing to the content of action.out let handler_proto = Frame::with_topic("action.register") @@ -231,7 +225,7 @@ async fn test_essentials() { .build(); // Start handler - let frame_handler = store.append(handler_proto.clone()).await; + let frame_handler = store.append(handler_proto.clone()); assert_eq!(recver.recv().await.unwrap().topic, "action.out"); // The pointer frame assert_eq!(recver.recv().await.unwrap().topic, "action.register"); @@ -254,16 +248,14 @@ async fn test_essentials() { assert_no_more_frames(&mut recver).await; // Unregister handler and restart - should resume from cursor - store - .append(Frame::with_topic("action.unregister").build()) - .await; + store.append(Frame::with_topic("action.unregister").build()); assert_eq!(recver.recv().await.unwrap().topic, "action.unregister"); assert_eq!(recver.recv().await.unwrap().topic, "action.unregistered"); assert_no_more_frames(&mut recver).await; // Restart handler - let frame_handler_2 = store.append(handler_proto.clone()).await; + let frame_handler_2 = store.append(handler_proto.clone()); assert_eq!(recver.recv().await.unwrap().topic, "action.register"); // Assert registered frame has the correct meta @@ -275,7 +267,7 @@ async fn test_essentials() { // The last_id should now be pew2 assert_eq!(meta["last_id"], pew2.id.to_string()); - let pew3 = store.append(Frame::with_topic("pew").build()).await; + let pew3 = store.append(Frame::with_topic("pew").build()); assert_eq!(recver.recv().await.unwrap().topic, "pew"); // Should resume processing from pew3 on @@ -297,35 +289,33 @@ async fn test_unregister_on_error() { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); // This frame will trigger the error when the handler comes online - let frame_trigger = store.append(Frame::with_topic("trigger").build()).await; + let frame_trigger = store.append(Frame::with_topic("trigger").build()); validate_frame!(recver.recv().await.unwrap(), {topic: "trigger"}); // add an additional frame, which shouldn't be processed, as the handler should immediately // unregister - let _ = store.append(Frame::with_topic("trigger").build()).await; + let _ = store.append(Frame::with_topic("trigger").build()); validate_frame!(recver.recv().await.unwrap(), {topic: "trigger"}); // Start handler - let frame_handler = store - .append( - Frame::with_topic("error.register") - .hash( - store - .cas_insert( - r#"{|frame| - let x = {"foo": null} - $x.foo.bar # Will error at runtime - null access - }"#, - ) - .await - .unwrap(), - ) - .meta(serde_json::json!({ - "start": {"cursor": "root"} - })) - .build(), - ) - .await; + let frame_handler = store.append( + Frame::with_topic("error.register") + .hash( + store + .cas_insert( + r#"{|frame| + let x = {"foo": null} + $x.foo.bar # Will error at runtime - null access + }"#, + ) + .await + .unwrap(), + ) + .meta(serde_json::json!({ + "start": {"cursor": "root"} + })) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "error.register"); assert_eq!(recver.recv().await.unwrap().topic, "error.registered"); @@ -365,7 +355,7 @@ async fn test_state() { })) .build(); - let frame_handler = store.append(handler_proto.clone()).await; + let frame_handler = store.append(handler_proto.clone()); let options = ReadOptions::builder().follow(FollowOption::On).build(); let mut recver = store.read(options).await; @@ -374,8 +364,8 @@ async fn test_state() { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); assert_eq!(recver.recv().await.unwrap().topic, "counter.registered"); - let _ = store.append(Frame::with_topic("topic1").build()).await; - let frame_count1 = store.append(Frame::with_topic("count.me").build()).await; + let _ = store.append(Frame::with_topic("topic1").build()); + let frame_count1 = store.append(Frame::with_topic("count.me").build()); assert_eq!(recver.recv().await.unwrap().topic, "topic1".to_string()); assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string()); @@ -394,7 +384,7 @@ async fn test_state() { let value = serde_json::from_slice::(&content).unwrap(); assert_eq!(value, serde_json::json!({"count": 1})); - let frame_count2 = store.append(Frame::with_topic("count.me").build()).await; + let frame_count2 = store.append(Frame::with_topic("count.me").build()); assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string()); let frame_state_2 = recver.recv().await.unwrap(); @@ -413,20 +403,18 @@ async fn test_state() { assert_eq!(value, serde_json::json!({"count": 2})); // Unregister the handler - store - .append(Frame::with_topic("counter.unregister").build()) - .await; + store.append(Frame::with_topic("counter.unregister").build()); assert_eq!(recver.recv().await.unwrap().topic, "counter.unregister"); assert_eq!(recver.recv().await.unwrap().topic, "counter.unregistered"); // Re-register the handler - let frame_handler2 = store.append(handler_proto.clone()).await; + let frame_handler2 = store.append(handler_proto.clone()); assert_eq!(recver.recv().await.unwrap().topic, "counter.register"); assert_eq!(recver.recv().await.unwrap().topic, "counter.registered"); // Send another count.me frame - let frame_count3 = store.append(Frame::with_topic("count.me").build()).await; + let frame_count3 = store.append(Frame::with_topic("count.me").build()); assert_eq!(recver.recv().await.unwrap().topic, "count.me".to_string()); let frame_state_3 = recver.recv().await.unwrap(); @@ -474,12 +462,12 @@ async fn test_return_options() { })) .build(); - let frame_handler = store.append(handler_proto).await; + let frame_handler = store.append(handler_proto); assert_eq!(recver.recv().await.unwrap().topic, "echo.register"); assert_eq!(recver.recv().await.unwrap().topic, "echo.registered"); // Send first ping - let frame1 = store.append(Frame::with_topic("ping").build()).await; + let frame1 = store.append(Frame::with_topic("ping").build()); assert_eq!(recver.recv().await.unwrap().topic, "ping"); // Check response has custom suffix and right meta @@ -491,7 +479,7 @@ async fn test_return_options() { assert_eq!(meta["frame_id"], frame1.id.to_string()); // Send second ping - should only see newest response due to Head(1) - let frame2 = store.append(Frame::with_topic("ping").build()).await; + let frame2 = store.append(Frame::with_topic("ping").build()); assert_eq!(recver.recv().await.unwrap().topic, "ping"); let response2 = recver.recv().await.unwrap(); @@ -540,11 +528,11 @@ async fn test_custom_append() { .build(); // Start handler - let frame_handler = store.append(handler_proto.clone()).await; + let frame_handler = store.append(handler_proto.clone()); assert_eq!(recver.recv().await.unwrap().topic, "action.register"); assert_eq!(recver.recv().await.unwrap().topic, "action.registered"); - let trigger_frame = store.append(Frame::with_topic("trigger").build()).await; + let trigger_frame = store.append(Frame::with_topic("trigger").build()); assert_eq!(recver.recv().await.unwrap().topic, "trigger"); validate_handler_output_frames!( @@ -568,52 +556,48 @@ async fn test_handler_replacement() { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); // Register first handler - let _ = store - .append( - Frame::with_topic("h.register") - .hash( - store - .cas_insert( - r#"{|frame| - if $frame.topic != "trigger" { return } - "handler1" - }"#, - ) - .await - .unwrap(), - ) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic("h.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "trigger" { return } + "handler1" + }"#, + ) + .await + .unwrap(), + ) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "h.register"); assert_eq!(recver.recv().await.unwrap().topic, "h.registered"); // Register second handler for same topic - let handler2 = store - .append( - Frame::with_topic("h.register") - .hash( - store - .cas_insert( - r#"{|frame| - if $frame.topic != "trigger" { return } - "handler2" - }"#, - ) - .await - .unwrap(), - ) - .build(), - ) - .await; + let handler2 = store.append( + Frame::with_topic("h.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "trigger" { return } + "handler2" + }"#, + ) + .await + .unwrap(), + ) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "h.register"); assert_eq!(recver.recv().await.unwrap().topic, "h.unregistered"); assert_eq!(recver.recv().await.unwrap().topic, "h.registered"); // Send trigger - should be handled by handler2 - let trigger = store.append(Frame::with_topic("trigger").build()).await; + let trigger = store.append(Frame::with_topic("trigger").build()); assert_eq!(recver.recv().await.unwrap().topic, "trigger"); // Verify handler2 processed it @@ -638,55 +622,51 @@ async fn test_handler_with_module() -> Result<(), Error> { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); // First create our module that exports a function - let module_frame = store - .append( - Frame::with_topic("mymod.nu") - .hash( - store - .cas_insert( - r#" - # Add two numbers and format result - export def add_nums [x, y] { - $"sum is ($x + $y)" - } - "#, - ) - .await?, - ) - .build(), - ) - .await; + let module_frame = store.append( + Frame::with_topic("mymod.nu") + .hash( + store + .cas_insert( + r#" + # Add two numbers and format result + export def add_nums [x, y] { + $"sum is ($x + $y)" + } + "#, + ) + .await?, + ) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "mymod.nu"); // Create handler that uses the module - let frame_handler = store - .append( - Frame::with_topic("test.register") - .hash( - store - .cas_insert( - r#"{|frame| - if $frame.topic != "trigger" { return } - mymod add_nums 40 2 - }"#, - ) - .await?, - ) - .meta(serde_json::json!({ - "modules": { - "mymod": module_frame.id.to_string() - } - })) - .build(), - ) - .await; + let frame_handler = store.append( + Frame::with_topic("test.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "trigger" { return } + mymod add_nums 40 2 + }"#, + ) + .await?, + ) + .meta(serde_json::json!({ + "modules": { + "mymod": module_frame.id.to_string() + } + })) + .build(), + ); // Wait for handler registration assert_eq!(recver.recv().await.unwrap().topic, "test.register"); assert_eq!(recver.recv().await.unwrap().topic, "test.registered"); // Send trigger frame - let trigger = store.append(Frame::with_topic("trigger").build()).await; + let trigger = store.append(Frame::with_topic("trigger").build()); assert_eq!(recver.recv().await.unwrap().topic, "trigger"); // Get handler output @@ -710,44 +690,40 @@ async fn test_handler_with_env() -> Result<(), Error> { assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold"); // Create a frame with some content for the env var - let env_frame = store - .append( - Frame::with_topic("env-content") - .hash(store.cas_insert("hello world").await?) - .build(), - ) - .await; + let env_frame = store.append( + Frame::with_topic("env-content") + .hash(store.cas_insert("hello world").await?) + .build(), + ); assert_eq!(recver.recv().await.unwrap().topic, "env-content"); // Create handler that uses the env var - let frame_handler = store - .append( - Frame::with_topic("test.register") - .hash( - store - .cas_insert( - r#"{|frame| - if $frame.topic != "trigger" { return } - $env.TEST_VAR - }"#, - ) - .await?, - ) - .meta(serde_json::json!({ - "with_env": { - "TEST_VAR": env_frame.id.to_string() - } - })) - .build(), - ) - .await; + let frame_handler = store.append( + Frame::with_topic("test.register") + .hash( + store + .cas_insert( + r#"{|frame| + if $frame.topic != "trigger" { return } + $env.TEST_VAR + }"#, + ) + .await?, + ) + .meta(serde_json::json!({ + "with_env": { + "TEST_VAR": env_frame.id.to_string() + } + })) + .build(), + ); // Wait for handler registration assert_eq!(recver.recv().await.unwrap().topic, "test.register"); assert_eq!(recver.recv().await.unwrap().topic, "test.registered"); // Send trigger frame - let trigger = store.append(Frame::with_topic("trigger").build()).await; + let trigger = store.append(Frame::with_topic("trigger").build()); assert_eq!(recver.recv().await.unwrap().topic, "trigger"); // Get handler output diff --git a/src/http.rs b/src/http.rs index f95198a..3c1d6d1 100644 --- a/src/http.rs +++ b/src/http.rs @@ -108,14 +108,12 @@ async fn handle( Some(writer.commit().await?) }; - let frame = store - .append( - Frame::with_topic("http.request") - .maybe_hash(hash) - .maybe_meta(serde_json::to_value(&req_meta).ok()) - .build(), - ) - .await; + let frame = store.append( + Frame::with_topic("http.request") + .maybe_hash(hash) + .maybe_meta(serde_json::to_value(&req_meta).ok()) + .build(), + ); // Track request after creating its frame active_streams.lock().await.track(&connection_id, frame.id); @@ -228,16 +226,14 @@ pub async fn serve( if let Some(requests) = streams.untrack_connection(&connection_id) { for request_id in requests { let store = store.clone(); - let _ = store - .append( - Frame::with_topic("http.disconnect") - .meta(serde_json::json!({ - "request_id": request_id.to_string(), - })) - .ttl(TTL::Ephemeral) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic("http.disconnect") + .meta(serde_json::json!({ + "request_id": request_id.to_string(), + })) + .ttl(TTL::Ephemeral) + .build(), + ); } } }); diff --git a/src/nu/commands/append_command.rs b/src/nu/commands/append_command.rs index 4bba736..96533bc 100644 --- a/src/nu/commands/append_command.rs +++ b/src/nu/commands/append_command.rs @@ -75,15 +75,13 @@ impl Command for AppendCommand { let frame = rt.block_on(async { let hash = util::write_pipeline_to_cas(input, &store, span).await?; - let frame = store - .append( - Frame::with_topic(topic) - .maybe_hash(hash) - .maybe_meta(meta) - .maybe_ttl(ttl) - .build(), - ) - .await; + let frame = store.append( + Frame::with_topic(topic) + .maybe_hash(hash) + .maybe_meta(meta) + .maybe_ttl(ttl) + .build(), + ); Ok::<_, ShellError>(frame) })?; diff --git a/src/nu/test_commands.rs b/src/nu/test_commands.rs index 5eeb43d..82e76a3 100644 --- a/src/nu/test_commands.rs +++ b/src/nu/test_commands.rs @@ -91,21 +91,17 @@ mod tests { async fn test_head_command() -> Result<(), Error> { let (store, engine) = setup_test_env().await; - let _frame1 = store - .append( - Frame::with_topic("topic") - .hash(store.cas_insert("content1").await?) - .build(), - ) - .await; - - let frame2 = store - .append( - Frame::with_topic("topic") - .hash(store.cas_insert("content2").await?) - .build(), - ) - .await; + let _frame1 = store.append( + Frame::with_topic("topic") + .hash(store.cas_insert("content1").await?) + .build(), + ); + + let frame2 = store.append( + Frame::with_topic("topic") + .hash(store.cas_insert("content2").await?) + .build(), + ); let head_frame = nu_eval(&engine, PipelineData::empty(), ".head topic"); @@ -120,21 +116,17 @@ mod tests { async fn test_cat_command() -> Result<(), Error> { let (store, engine) = setup_test_env().await; - let _frame1 = store - .append( - Frame::with_topic("topic") - .hash(store.cas_insert("content1").await?) - .build(), - ) - .await; - - let _frame2 = store - .append( - Frame::with_topic("topic") - .hash(store.cas_insert("content2").await?) - .build(), - ) - .await; + let _frame1 = store.append( + Frame::with_topic("topic") + .hash(store.cas_insert("content1").await?) + .build(), + ); + + let _frame2 = store.append( + Frame::with_topic("topic") + .hash(store.cas_insert("content2").await?) + .build(), + ); // Test basic .cat let value = nu_eval(&engine, PipelineData::empty(), ".cat"); @@ -153,13 +145,11 @@ mod tests { async fn test_remove_command() -> Result<(), Error> { let (store, engine) = setup_test_env().await; - let frame = store - .append( - Frame::with_topic("topic") - .hash(store.cas_insert("test").await?) - .build(), - ) - .await; + let frame = store.append( + Frame::with_topic("topic") + .hash(store.cas_insert("test").await?) + .build(), + ); nu_eval( &engine, diff --git a/src/store/mod.rs b/src/store/mod.rs index 7e2ed60..110b8ca 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -368,7 +368,7 @@ impl Store { self.keyspace.persist(fjall::PersistMode::SyncAll) } - pub async fn append(&self, frame: Frame) -> Frame { + pub fn append(&self, frame: Frame) -> Frame { let mut frame = frame; frame.id = scru128::new(); diff --git a/src/store/tests.rs b/src/store/tests.rs index 406e71a..c0fa4ef 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -33,14 +33,14 @@ mod tests_read_options { topic: "hello".to_owned(), ..Default::default() }; - let frame1 = store.append(frame1).await; + let frame1 = store.append(frame1); let frame2 = Frame { id: scru128::new(), topic: "hallo".to_owned(), ..Default::default() }; - let frame2 = store.append(frame2).await; + let frame2 = store.append(frame2); assert_eq!(Some(frame1), store.head("hello")); assert_eq!(Some(frame2), store.head("hallo")); @@ -114,9 +114,7 @@ mod tests_store { let temp_dir = TempDir::new().unwrap(); let store = Store::new(temp_dir.into_path()); let meta = serde_json::json!({"key": "value"}); - let frame = store - .append(Frame::with_topic("stream").meta(meta).build()) - .await; + let frame = store.append(Frame::with_topic("stream").meta(meta).build()); let got = store.get(&frame.id); assert_eq!(Some(frame.clone()), got); } @@ -127,8 +125,8 @@ mod tests_store { let store = Store::new(temp_dir.into_path()); // Append two initial clips - let f1 = store.append(Frame::with_topic("stream").build()).await; - let f2 = store.append(Frame::with_topic("stream").build()).await; + let f1 = store.append(Frame::with_topic("stream").build()); + let f2 = store.append(Frame::with_topic("stream").build()); // cat the full stream and follow new items with a heartbeat every 5ms let follow_options = ReadOptions::builder() @@ -146,8 +144,8 @@ mod tests_store { ); // Append two more clips - let f3 = store.append(Frame::with_topic("stream").build()).await; - let f4 = store.append(Frame::with_topic("stream").build()).await; + let f3 = store.append(Frame::with_topic("stream").build()); + let f4 = store.append(Frame::with_topic("stream").build()); assert_eq!(f3, recver.recv().await.unwrap()); assert_eq!(f4, recver.recv().await.unwrap()); @@ -166,8 +164,8 @@ mod tests_store { let temp_dir = TempDir::new().unwrap(); let store = Store::new(temp_dir.into_path()); - let f1 = store.append(Frame::with_topic("/stream").build()).await; - let f2 = store.append(Frame::with_topic("/stream").build()).await; + let f1 = store.append(Frame::with_topic("/stream").build()); + let f2 = store.append(Frame::with_topic("/stream").build()); assert_eq!(store.head("/stream"), Some(f2.clone())); @@ -196,9 +194,9 @@ mod tests_store { let store = Store::new(temp_dir.path().to_path_buf()); // Add 3 items - let frame1 = store.append(Frame::with_topic("test").build()).await; - let frame2 = store.append(Frame::with_topic("test").build()).await; - let _ = store.append(Frame::with_topic("test").build()).await; + let frame1 = store.append(Frame::with_topic("test").build()); + let frame2 = store.append(Frame::with_topic("test").build()); + let _ = store.append(Frame::with_topic("test").build()); // Read with limit 2 let options = ReadOptions::builder().limit(2).build(); @@ -218,7 +216,7 @@ mod tests_store { let store = Store::new(temp_dir.path().to_path_buf()); // Add 1 item - let frame1 = store.append(Frame::with_topic("test").build()).await; + let frame1 = store.append(Frame::with_topic("test").build()); // Start read with limit 2 and follow let options = ReadOptions::builder() @@ -236,8 +234,8 @@ mod tests_store { .is_err()); // Add 2 more items - let frame2 = store.append(Frame::with_topic("test").build()).await; - let _frame3 = store.append(Frame::with_topic("test").build()).await; + let frame2 = store.append(Frame::with_topic("test").build()); + let _frame3 = store.append(Frame::with_topic("test").build()); // Assert we get one more item assert_eq!(Some(frame2), rx.recv().await); @@ -252,11 +250,11 @@ mod tests_store { let store = Store::new(temp_dir.path().to_path_buf()); // Create 5 records upfront - let frame1 = store.append(Frame::with_topic("test").build()).await; - let frame2 = store.append(Frame::with_topic("test").build()).await; - let frame3 = store.append(Frame::with_topic("test").build()).await; - let _frame4 = store.append(Frame::with_topic("test").build()).await; - let _frame5 = store.append(Frame::with_topic("test").build()).await; + let frame1 = store.append(Frame::with_topic("test").build()); + let frame2 = store.append(Frame::with_topic("test").build()); + let frame3 = store.append(Frame::with_topic("test").build()); + let _frame4 = store.append(Frame::with_topic("test").build()); + let _frame5 = store.append(Frame::with_topic("test").build()); // Start read with limit 3 and follow enabled let options = ReadOptions::builder() @@ -395,16 +393,14 @@ mod tests_ttl_expire { let store = Store::new(temp_dir.into_path()); // Add permanent frame - let permanent_frame = store.append(Frame::with_topic("test").build()).await; + let permanent_frame = store.append(Frame::with_topic("test").build()); // Add frame with a TTL - let expiring_frame = store - .append( - Frame::with_topic("test") - .ttl(TTL::Time(Duration::from_millis(20))) - .build(), - ) - .await; + let expiring_frame = store.append( + Frame::with_topic("test") + .ttl(TTL::Time(Duration::from_millis(20))) + .build(), + ); // Immediate read should show both frames let recver = store.read(ReadOptions::default()).await; @@ -437,51 +433,41 @@ mod tests_ttl_expire { let store = Store::new(temp_dir.into_path()); // Add 4 frames to the same topic with Head(2) TTL - let _frame1 = store - .append( - Frame::with_topic("test") - .ttl(TTL::Head(2)) - .meta(serde_json::json!({"order": 1})) - .build(), - ) - .await; + let _frame1 = store.append( + Frame::with_topic("test") + .ttl(TTL::Head(2)) + .meta(serde_json::json!({"order": 1})) + .build(), + ); - let _frame2 = store - .append( - Frame::with_topic("test") - .ttl(TTL::Head(2)) - .meta(serde_json::json!({"order": 2})) - .build(), - ) - .await; + let _frame2 = store.append( + Frame::with_topic("test") + .ttl(TTL::Head(2)) + .meta(serde_json::json!({"order": 2})) + .build(), + ); - let frame3 = store - .append( - Frame::with_topic("test") - .ttl(TTL::Head(2)) - .meta(serde_json::json!({"order": 3})) - .build(), - ) - .await; + let frame3 = store.append( + Frame::with_topic("test") + .ttl(TTL::Head(2)) + .meta(serde_json::json!({"order": 3})) + .build(), + ); - let frame4 = store - .append( - Frame::with_topic("test") - .ttl(TTL::Head(2)) - .meta(serde_json::json!({"order": 4})) - .build(), - ) - .await; + let frame4 = store.append( + Frame::with_topic("test") + .ttl(TTL::Head(2)) + .meta(serde_json::json!({"order": 4})) + .build(), + ); // Add a frame to a different topic to ensure isolation - let other_frame = store - .append( - Frame::with_topic("other") - .ttl(TTL::Head(2)) - .meta(serde_json::json!({"order": 1})) - .build(), - ) - .await; + let other_frame = store.append( + Frame::with_topic("other") + .ttl(TTL::Head(2)) + .meta(serde_json::json!({"order": 1})) + .build(), + ); // Read all frames and assert exact expected set let recver = store.read(ReadOptions::default()).await; diff --git a/src/tasks.rs b/src/tasks.rs index 3b9bdcf..1a0fad1 100644 --- a/src/tasks.rs +++ b/src/tasks.rs @@ -64,13 +64,11 @@ async fn try_start_task( "reason": e.to_string() }); - let _ = store - .append( - Frame::with_topic(format!("{}.spawn.error", topic)) - .meta(meta) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.spawn.error", topic)) + .meta(meta) + .build(), + ); } } @@ -181,13 +179,11 @@ pub async fn serve( "reason": e.to_string() }); - let _ = store - .append( - Frame::with_topic(format!("{}.spawn.error", topic)) - .meta(meta) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic(format!("{}.spawn.error", topic)) + .meta(meta) + .build(), + ); } } } @@ -212,14 +208,12 @@ async fn append( "source_id": source_id.to_string(), }); - let frame = store - .append( - Frame::with_topic(format!("{}.{}", topic, suffix)) - .maybe_hash(hash) - .meta(meta) - .build(), - ) - .await; + let frame = store.append( + Frame::with_topic(format!("{}.{}", topic, suffix)) + .maybe_hash(hash) + .meta(meta) + .build(), + ); Ok(frame) } @@ -344,18 +338,16 @@ mod tests { }); } - let frame_generator = store - .append( - Frame::with_topic("toml.spawn") - .maybe_hash( - store - .cas_insert(r#"^tail -n+0 -F Cargo.toml | lines"#) - .await - .ok(), - ) - .build(), - ) - .await; + let frame_generator = store.append( + Frame::with_topic("toml.spawn") + .maybe_hash( + store + .cas_insert(r#"^tail -n+0 -F Cargo.toml | lines"#) + .await + .ok(), + ) + .build(), + ); eprintln!("frame_generator: {:?}", frame_generator); @@ -399,14 +391,12 @@ mod tests { }); } - let frame_generator = store - .append( - Frame::with_topic("greeter.spawn".to_string()) - .maybe_hash(store.cas_insert(r#"each { |x| $"hi: ($x)" }"#).await.ok()) - .meta(serde_json::json!({"duplex": true})) - .build(), - ) - .await; + let frame_generator = store.append( + Frame::with_topic("greeter.spawn".to_string()) + .maybe_hash(store.cas_insert(r#"each { |x| $"hi: ($x)" }"#).await.ok()) + .meta(serde_json::json!({"duplex": true})) + .build(), + ); let options = ReadOptions::builder() .follow(FollowOption::On) @@ -417,13 +407,11 @@ mod tests { let frame = recver.recv().await.unwrap(); assert_eq!(frame.topic, "greeter.start".to_string()); - let _ = store - .append( - Frame::with_topic("greeter.send") - .maybe_hash(store.cas_insert(r#"henry"#).await.ok()) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic("greeter.send") + .maybe_hash(store.cas_insert(r#"henry"#).await.ok()) + .build(), + ); assert_eq!( recver.recv().await.unwrap().topic, "greeter.send".to_string() @@ -444,32 +432,28 @@ mod tests { let store = Store::new(temp_dir.into_path()); let engine = nu::Engine::new(store.clone()).unwrap(); - let _ = store - .append( - Frame::with_topic("toml.spawn") - .maybe_hash( - store - .cas_insert(r#"^tail -n+0 -F Cargo.toml | lines"#) - .await - .ok(), - ) - .build(), - ) - .await; + let _ = store.append( + Frame::with_topic("toml.spawn") + .maybe_hash( + store + .cas_insert(r#"^tail -n+0 -F Cargo.toml | lines"#) + .await + .ok(), + ) + .build(), + ); // replaces the previous generator - let frame_generator = store - .append( - Frame::with_topic("toml.spawn") - .maybe_hash( - store - .cas_insert(r#"^tail -n +2 -F Cargo.toml | lines"#) - .await - .ok(), - ) - .build(), - ) - .await; + let frame_generator = store.append( + Frame::with_topic("toml.spawn") + .maybe_hash( + store + .cas_insert(r#"^tail -n +2 -F Cargo.toml | lines"#) + .await + .ok(), + ) + .build(), + ); let options = ReadOptions::builder() .follow(FollowOption::On)