Skip to content

Commit

Permalink
feat: drop async from store.append -- it's not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Dec 27, 2024
1 parent f01672a commit a83f75b
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 455 deletions.
28 changes: 12 additions & 16 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,13 @@ async fn handle_stream_append(
Err(e) => return response_400(e.to_string()),
};

let frame = store
.append(
Frame::with_topic(topic)
.maybe_hash(hash)
.maybe_meta(meta)
.ttl(ttl)
.build(),
)
.await;
let frame = store.append(
Frame::with_topic(topic)
.maybe_hash(hash)
.maybe_meta(meta)
.ttl(ttl)
.build(),
);

Ok(Response::builder()
.status(StatusCode::OK)
Expand Down Expand Up @@ -351,13 +349,11 @@ pub async fn serve(
pool: ThreadPool,
expose: Option<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let _ = store
.append(
Frame::with_topic("xs.start")
.maybe_meta(expose.as_ref().map(|e| serde_json::json!({"expose": e})))
.build(),
)
.await;
let _ = store.append(
Frame::with_topic("xs.start")
.maybe_meta(expose.as_ref().map(|e| serde_json::json!({"expose": e})))
.build(),
);

let path = store.path.join("sock").to_string_lossy().to_string();
let listener = Listener::bind(&path).await?;
Expand Down
60 changes: 27 additions & 33 deletions src/handlers/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ impl Handler {
}
}

let output_frame = store.append(output_frame).await;
let output_frame = store.append(output_frame);

// Update state if the appended frame is a state frame
if self.stateful && output_frame.topic == format!("{}.state", self.topic) {
Expand Down Expand Up @@ -375,17 +375,15 @@ impl Handler {
});
}

let _ = store
.append(
Frame::with_topic(format!("{}.registered", &self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"tail": options.tail,
"last_id": options.last_id.map(|id| id.to_string()),
}))
.build(),
)
.await;
let _ = store.append(
Frame::with_topic(format!("{}.registered", &self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"tail": options.tail,
"last_id": options.last_id.map(|id| id.to_string()),
}))
.build(),
);

Ok(())
}
Expand All @@ -405,16 +403,14 @@ impl Handler {
if frame.topic == format!("{}.register", &self.topic)
|| frame.topic == format!("{}.unregister", &self.topic)
{
let _ = store
.append(
Frame::with_topic(format!("{}.unregistered", &self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build(),
)
.await;
let _ = store.append(
Frame::with_topic(format!("{}.unregistered", &self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
}))
.build(),
);
break;
}

Expand All @@ -431,17 +427,15 @@ impl Handler {
}

if let Err(err) = self.process_frame(&frame, store, pool).await {
let _ = store
.append(
Frame::with_topic(format!("{}.unregistered", self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
)
.await;
let _ = store.append(
Frame::with_topic(format!("{}.unregistered", self.topic))
.meta(serde_json::json!({
"handler_id": self.id.to_string(),
"frame_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
);
break;
}
}
Expand Down
18 changes: 8 additions & 10 deletions src/handlers/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ async fn start_handler(
Ok(())
}
Err(err) => {
let _ = store
.append(
Frame::with_topic(format!("{}.unregistered", topic))
.meta(serde_json::json!({
"handler_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
)
.await;
let _ = store.append(
Frame::with_topic(format!("{}.unregistered", topic))
.meta(serde_json::json!({
"handler_id": frame.id.to_string(),
"error": err.to_string(),
}))
.build(),
);
Ok(())
}
}
Expand Down
Loading

0 comments on commit a83f75b

Please sign in to comment.