diff --git a/src/http.rs b/src/http.rs index 36bd549..1c40db2 100644 --- a/src/http.rs +++ b/src/http.rs @@ -18,7 +18,7 @@ use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; -use crate::store::Store; +use crate::store::{ReadOptions, Store}; type BoxError = Box; type HTTPResult = Result>, BoxError>; @@ -49,7 +49,7 @@ async fn get(store: Store, req: Request) -> HTTPResult { eprintln!("path: {:?}", req.uri().path()); match match_route(req.uri().path()) { Routes::Root => { - let rx = store.subscribe().await; + let rx = store.subscribe(ReadOptions { follow: false }).await; let stream = ReceiverStream::new(rx); let stream = stream.map(|frame| { eprintln!("streaming"); diff --git a/src/store.rs b/src/store.rs index 252a525..8d456a0 100644 --- a/src/store.rs +++ b/src/store.rs @@ -23,10 +23,15 @@ pub struct Store { commands_tx: mpsc::Sender, } +#[derive(Debug)] +pub struct ReadOptions { + pub follow: bool, +} + #[derive(Debug)] enum Command { - Subscribe(mpsc::Sender), - Put(Frame), + Read(mpsc::Sender, ReadOptions), + Append(Frame), } impl Store { @@ -53,7 +58,7 @@ impl Store { 'outer: while let Some(command) = rx.blocking_recv() { eprintln!("command: {:?}", &command); match command { - Command::Subscribe(tx) => { + Command::Read(tx, options) => { for record in store.partition.iter() { eprintln!("record: {:?}", &record); let record = record.unwrap(); @@ -63,10 +68,11 @@ impl Store { continue 'outer; } } - - subscribers.push(tx); + if options.follow { + subscribers.push(tx); + } } - Command::Put(frame) => { + Command::Append(frame) => { subscribers.retain(|tx| tx.blocking_send(frame.clone()).is_ok()); } } @@ -77,9 +83,12 @@ impl Store { store } - pub async fn subscribe(&self) -> mpsc::Receiver { + pub async fn subscribe(&self, options: ReadOptions) -> mpsc::Receiver { let (tx, rx) = mpsc::channel::(100); - self.commands_tx.send(Command::Subscribe(tx)).await.unwrap(); // our thread went away? + self.commands_tx + .send(Command::Read(tx, options)) + .await + .unwrap(); // our thread went away? rx } @@ -103,7 +112,7 @@ impl Store { self.partition.insert(frame.id.to_bytes(), encoded).unwrap(); self.commands_tx - .send(Command::Put(frame.clone())) + .send(Command::Append(frame.clone())) .await .unwrap(); // our thread went away?