Skip to content

Commit

Permalink
normalize return value handling
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Nov 30, 2024
1 parent 562f533 commit a910bf2
Showing 1 changed file with 5 additions and 35 deletions.
40 changes: 5 additions & 35 deletions src/handlers/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,7 @@ use crate::store::{FollowOption, Frame, ReadOptions, Store};
use crate::thread_pool::ThreadPool;
use crate::ttl::TTL;

async fn handle_result_stateful(store: &Store, handler: &mut Handler, frame: &Frame, value: Value) {
match value {
Value::Nothing { .. } => (),
Value::Record { ref val, .. } => {
if let Some(state) = val.get("state") {
handler.state = Some(state.clone());
}
let _ = store
.append(
Frame::with_topic(format!("{}.state", &handler.topic))
.hash(
store
.cas_insert(&value_to_json(&value).to_string())
.await
.unwrap(),
)
.meta(serde_json::json!({
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.ttl(TTL::Ephemeral)
.build(),
)
.await;
}
_ => panic!("unexpected value type"),
}
}

async fn handle_result_stateless(store: &Store, handler: &Handler, frame: &Frame, value: Value) {
async fn handle_result(store: &Store, handler: &Handler, frame: &Frame, value: Value) {
match value {
Value::Nothing { .. } => (),
_ => {
Expand All @@ -57,6 +28,7 @@ async fn handle_result_stateless(store: &Store, handler: &Handler, frame: &Frame
"handler_id": handler.id.to_string(),
"frame_id": frame.id.to_string(),
}))
// TODO: TTL should be configurable
.ttl(TTL::Ephemeral)
.build(),
)
Expand Down Expand Up @@ -130,11 +102,7 @@ async fn spawn(

let value = handler.eval_in_thread(&pool, &frame).await;

if handler.stateful {
handle_result_stateful(&store, &mut handler, &frame, value).await;
} else {
handle_result_stateless(&store, &handler, &frame, value).await;
}
handle_result(&store, &handler, &frame, value).await;

// Save cursor after processing in batch mode, skip ephemeral frames
if matches!(handler.meta.mode, Mode::Batch) && frame.ttl != Some(TTL::Ephemeral) {
Expand All @@ -153,6 +121,8 @@ async fn spawn(
.meta(serde_json::json!({
"handler_id": handler.id.to_string(),
}))
// Todo:
// .ttl(TTL::Head(1))
.ttl(TTL::Ephemeral)
.build(),
)
Expand Down

0 comments on commit a910bf2

Please sign in to comment.