Skip to content

Commit

Permalink
abandoning the cursor approach
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Nov 30, 2024
1 parent a910bf2 commit add27f4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 68 deletions.
88 changes: 28 additions & 60 deletions src/handlers/handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::str::FromStr;
use std::time::Duration;

use nu_engine::eval_block_with_early_return;
Expand All @@ -13,31 +12,30 @@ use scru128::Scru128Id;
use crate::error::Error;
use crate::nu;
use crate::nu::frame_to_value;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
use crate::store::{FollowOption, ReadOptions, Store};
use crate::thread_pool::ThreadPool;
use crate::ttl::TTL;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
pub struct Meta {
pub initial_state: Option<serde_json::Value>,
pub pulse: Option<u64>,
pub mode: Mode,
pub start: StartFrom,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum Mode {
#[default]
Batch,
Online(StartMode),
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum StartMode {
#[default]
pub enum StartFrom {
/// Only process new frames
#[default]
Tail,
Head(String),
/// Process from the beginning of the stream
Root,
/// Try specific topic, then tail
At {
topic: String,
#[serde(default)]
or_tail: bool,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -140,62 +138,32 @@ impl Handler {
.into_value(Span::unknown())?)
}

pub async fn get_cursor(&self, store: &Store) -> Option<Scru128Id> {
store
.head(&format!("{}.cursor", self.topic))
.and_then(|frame| {
frame.meta.and_then(|meta| {
meta.get("frame_id").and_then(|v| {
v.as_str()
.and_then(|id| scru128::Scru128Id::from_str(id).ok())
})
})
})
}

pub async fn save_cursor(&self, store: &Store, frame_id: Scru128Id) {
let _ = store
.append(
Frame::with_topic(format!("{}.cursor", self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"frame_id": frame_id.to_string(),
}))
.ttl(TTL::Head(1))
.build(),
)
.await;
}

pub async fn configure_read_options(&self, store: &Store) -> ReadOptions {
let last_id: Option<Scru128Id> = match self.meta.mode {
Mode::Batch => store
.head(&format!("{}.cursor", self.topic))
.and_then(|frame| {
frame.meta.and_then(|meta| {
meta.get("frame_id").and_then(|v| {
v.as_str()
.and_then(|id| scru128::Scru128Id::from_str(id).ok())
})
})
}),
Mode::Online(ref start_mode) => match start_mode {
StartMode::Tail => None,
StartMode::Head(ref head) => store.head(head).map(|frame| frame.id),
},
// Determine last_id based on StartFrom
let (last_id, is_tail) = match &self.meta.start {
StartFrom::Root => (None, false),
StartFrom::Tail => (None, true),
StartFrom::At { topic, or_tail } => {
let id = store.head(topic).map(|frame| frame.id);
// If we found the topic, use it. Otherwise fall back based on or_tail
match (id, or_tail) {
(Some(id), _) => (Some(id), false), // Found topic, use it
(None, true) => (None, true), // Not found, fallback to tail
(None, false) => (None, false), // Not found, fallback to root
}
}
};

eprintln!("LAST_ID: {:?}", last_id.map(|id| id.to_string()));
eprintln!("Tail: {}", is_tail);

// Configure follow option based on pulse setting
let follow_option = self
.meta
.pulse
.map(|pulse| FollowOption::WithHeartbeat(Duration::from_millis(pulse)))
.unwrap_or(FollowOption::On);

let is_tail = matches!(self.meta.mode, Mode::Online(_)) && last_id.is_none();
eprintln!("Tail: {}", is_tail);

ReadOptions::builder()
.follow(follow_option)
.tail(is_tail)
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod handler;
mod serve;

pub use handler::{Handler, Meta, Mode};
pub use handler::{Handler, Meta};
pub use serve::serve;
8 changes: 1 addition & 7 deletions src/handlers/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio_util::compat::FuturesAsyncReadCompatExt;
use nu_protocol::Value;

use crate::error::Error;
use crate::handlers::{Handler, Meta, Mode};
use crate::handlers::{Handler, Meta};
use crate::nu;
use crate::nu::util::value_to_json;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
Expand Down Expand Up @@ -103,12 +103,6 @@ async fn spawn(
let value = handler.eval_in_thread(&pool, &frame).await;

handle_result(&store, &handler, &frame, value).await;

// Save cursor after processing in batch mode, skip ephemeral frames
if matches!(handler.meta.mode, Mode::Batch) && frame.ttl != Some(TTL::Ephemeral) {
eprintln!("HANDLER: {} SAVING CURSOR: frame: {:?}", handler.id, frame);
handler.save_cursor(&store, frame.id).await;
}
}

eprintln!("HANDLER: {} EXITING", handler.id);
Expand Down

0 comments on commit add27f4

Please sign in to comment.