Skip to content

Commit

Permalink
refactor: move AppendCommand buffer to nu/commands
Browse files Browse the repository at this point in the history
Moved the handler's buffered version of `AppendCommand` to `nu/commands` to make loading custom commands into the engine more modular.
  • Loading branch information
cablehead committed Dec 28, 2024
1 parent 79446d7 commit a643ea9
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 154 deletions.
106 changes: 5 additions & 101 deletions src/handlers/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use tracing::instrument;
Expand All @@ -19,6 +20,7 @@ use scru128::Scru128Id;

use crate::error::Error;
use crate::nu;
use crate::nu::commands;
use crate::nu::frame_to_value;
use crate::nu::util::value_to_json;
use crate::store::{FollowOption, Frame, ReadOptions, Store, TTL};
Expand Down Expand Up @@ -78,6 +80,9 @@ impl Handler {
store: Store,
) -> Result<Self, Error> {
let output = Arc::new(Mutex::new(Vec::new()));
engine.add_commands(vec![Box::new(
commands::append_command_buffered::AppendCommand::new(store.clone(), output.clone()),
)])?;

// Handle any modules first if specified
if let Some(modules) = &meta.modules {
Expand Down Expand Up @@ -144,14 +149,6 @@ impl Handler {
}
}

// Set up engine with custom append command
let mut working_set = StateWorkingSet::new(&engine.state);
working_set.add_decl(Box::new(AppendCommand {
output: output.clone(),
store: store.clone(),
}));
engine.state.merge_delta(working_set.render())?;

let closure = engine.parse_closure(&expression)?;
let block = engine.state.get_block(closure.block_id);

Expand Down Expand Up @@ -520,96 +517,3 @@ impl Handler {
.build()
}
}

use std::sync::{Arc, Mutex};

use nu_engine::CallExt;
use nu_protocol::engine::{Call, Command, EngineState};
use nu_protocol::{Category, ShellError, Signature, SyntaxShape, Type};

#[derive(Clone)]
pub struct AppendCommand {
output: Arc<Mutex<Vec<Frame>>>,
store: Store,
}

impl Command for AppendCommand {
fn name(&self) -> &str {
".append"
}

fn signature(&self) -> Signature {
Signature::build(".append")
.input_output_types(vec![(Type::Any, Type::Any)])
.required("topic", SyntaxShape::String, "this clip's topic")
.named(
"meta",
SyntaxShape::Record(vec![]),
"arbitrary metadata",
None,
)
.named(
"ttl",
SyntaxShape::String,
r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'"#,
None,
)
.category(Category::Experimental)
}

fn description(&self) -> &str {
"Writes its input to the CAS and then appends a frame with a hash of this content to the
given topic on the stream. Automatically includes handler_id and frame_id and
state_id."
}

fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;

let topic: String = call.req(engine_state, stack, 0)?;
let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;

// Convert string TTL to TTL enum
let ttl = ttl_str
.map(|s| TTL::from_query(Some(&format!("ttl={}", s))))
.transpose()
.map_err(|e| ShellError::GenericError {
error: "Invalid TTL format".into(),
msg: e.to_string(),
span: Some(span),
help: Some("TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'".into()),
inner: vec![],
})?;

let input_value = input.into_value(span)?;

let rt = tokio::runtime::Runtime::new()
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

let hash = rt.block_on(async {
crate::nu::util::write_pipeline_to_cas(
PipelineData::Value(input_value.clone(), None),
&self.store,
span,
)
.await
})?;

let frame = Frame::with_topic(topic)
.maybe_meta(meta.map(|v| value_to_json(&v)))
.maybe_hash(hash)
.maybe_ttl(ttl)
.build();

self.output.lock().unwrap().push(frame);

Ok(PipelineData::Empty)
}
}
13 changes: 11 additions & 2 deletions src/handlers/serve.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::collections::HashMap;

use crate::handlers::Handler;
use crate::nu;
use crate::nu::commands;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
use crate::thread_pool::ThreadPool;
use std::collections::HashMap;

async fn start_handler(
frame: &Frame,
Expand Down Expand Up @@ -38,9 +40,16 @@ struct TopicState {

pub async fn serve(
store: Store,
engine: nu::Engine,
mut engine: nu::Engine,
pool: ThreadPool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
engine.add_commands(vec![
Box::new(commands::cas_command::CasCommand::new(store.clone())),
Box::new(commands::cat_command::CatCommand::new(store.clone())),
Box::new(commands::head_command::HeadCommand::new(store.clone())),
Box::new(commands::remove_command::RemoveCommand::new(store.clone())),
])?;

let options = ReadOptions::builder().follow(FollowOption::On).build();

let mut recver = store.read(options).await;
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use tempfile::TempDir;

use crate::error::Error;
use crate::handlers::serve;
use crate::nu;
use crate::store::TTL;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
use crate::thread_pool::ThreadPool;
use tempfile::TempDir;

macro_rules! validate_handler_output_frame {
($frame_expr:expr, $expected_topic:expr, $handler:expr, $trigger:expr, $state_frame:expr) => {{
Expand Down Expand Up @@ -756,8 +757,8 @@ async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>)
async fn setup_test_environment() -> (Store, TempDir) {
let temp_dir = TempDir::new().unwrap();
let store = Store::new(temp_dir.path().to_path_buf());
let engine = nu::Engine::new().unwrap();
let pool = ThreadPool::new(4);
let engine = nu::Engine::new(store.clone()).unwrap();

{
let store = store.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ async fn serve(args: CommandServe) -> Result<(), Box<dyn std::error::Error + Sen

let store = Store::new(args.path);
let pool = ThreadPool::new(10);
let engine = nu::Engine::new(store.clone())?;
let engine = nu::Engine::new()?;

{
let store = store.clone();
Expand Down
98 changes: 98 additions & 0 deletions src/nu/commands/append_command_buffered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::sync::{Arc, Mutex};

use nu_engine::CallExt;
use nu_protocol::engine::{Call, Command, EngineState, Stack};
use nu_protocol::{Category, PipelineData, ShellError, Signature, SyntaxShape, Type, Value};

use crate::nu::util::value_to_json;
use crate::store::{Frame, Store, TTL};

#[derive(Clone)]
pub struct AppendCommand {
output: Arc<Mutex<Vec<Frame>>>,
store: Store,
}

impl AppendCommand {
pub fn new(store: Store, output: Arc<Mutex<Vec<Frame>>>) -> Self {
Self { output, store }
}
}

impl Command for AppendCommand {
fn name(&self) -> &str {
".append"
}

fn signature(&self) -> Signature {
Signature::build(".append")
.input_output_types(vec![(Type::Any, Type::Any)])
.required("topic", SyntaxShape::String, "this clip's topic")
.named(
"meta",
SyntaxShape::Record(vec![]),
"arbitrary metadata",
None,
)
.named(
"ttl",
SyntaxShape::String,
r#"TTL specification: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'"#,
None,
)
.category(Category::Experimental)
}

fn description(&self) -> &str {
"Writes its input to the CAS and buffers a frame for later batch processing. The frame will include the content hash, any provided metadata and TTL settings. Meant for use with handlers that need to batch multiple appends."
}

fn run(
&self,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
let span = call.head;

let topic: String = call.req(engine_state, stack, 0)?;
let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
let ttl_str: Option<String> = call.get_flag(engine_state, stack, "ttl")?;

let ttl = ttl_str
.map(|s| TTL::from_query(Some(&format!("ttl={}", s))))
.transpose()
.map_err(|e| ShellError::GenericError {
error: "Invalid TTL format".into(),
msg: e.to_string(),
span: Some(span),
help: Some("TTL must be one of: 'forever', 'ephemeral', 'time:<milliseconds>', or 'head:<n>'".into()),
inner: vec![],
})?;

let input_value = input.into_value(span)?;

let rt = tokio::runtime::Runtime::new()
.map_err(|e| ShellError::IOError { msg: e.to_string() })?;

let hash = rt.block_on(async {
crate::nu::util::write_pipeline_to_cas(
PipelineData::Value(input_value.clone(), None),
&self.store,
span,
)
.await
})?;

let frame = Frame::with_topic(topic)
.maybe_meta(meta.map(|v| value_to_json(&v)))
.maybe_hash(hash)
.maybe_ttl(ttl)
.build();

self.output.lock().unwrap().push(frame);

Ok(PipelineData::Empty)
}
}
37 changes: 6 additions & 31 deletions src/nu/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,6 @@
mod append_command;
mod cas_command;
mod cat_command;
mod head_command;
mod remove_command;

use crate::store::Store;
use nu_protocol::engine::EngineState;

pub fn add_custom_commands(store: Store, mut engine_state: EngineState) -> EngineState {
let delta = {
let mut working_set = nu_protocol::engine::StateWorkingSet::new(&engine_state);
working_set.add_decl(Box::new(cas_command::CasCommand::new(store.clone())));
working_set.add_decl(Box::new(append_command::AppendCommand::new(store.clone())));
working_set.add_decl(Box::new(head_command::HeadCommand::new(store.clone())));
working_set.add_decl(Box::new(remove_command::RemoveCommand::new(store.clone())));
working_set.add_decl(Box::new(cat_command::CatCommand::new(store.clone())));

// Add the .rm alias
let alias_expression = "alias .rm = .remove";
let _ = nu_parser::parse(&mut working_set, None, alias_expression.as_bytes(), false);

working_set.render()
};

if let Err(err) = engine_state.merge_delta(delta) {
eprintln!("Error adding custom commands: {:?}", err);
}

engine_state
}
pub mod append_command;
pub mod append_command_buffered;
pub mod cas_command;
pub mod cat_command;
pub mod head_command;
pub mod remove_command;
16 changes: 11 additions & 5 deletions src/nu/engine.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
use super::commands::add_custom_commands;
use nu_cli::{add_cli_context, gather_parent_env_vars};
use nu_cmd_lang::create_default_context;
use nu_command::add_shell_command_context;
use nu_engine::eval_block_with_early_return;
use nu_parser::parse;
use nu_protocol::debugger::WithoutDebug;
use nu_protocol::engine::{Closure, EngineState, Stack, StateWorkingSet};
use nu_protocol::engine::{Closure, Command, EngineState, Stack, StateWorkingSet};
use nu_protocol::{PipelineData, ShellError, Span};

use crate::error::Error;
use crate::store::Store;

#[derive(Clone)]
pub struct Engine {
pub state: EngineState,
}

impl Engine {
pub fn new(store: Store) -> Result<Self, Error> {
pub fn new() -> Result<Self, Error> {
let mut engine_state = create_default_context();
engine_state = add_shell_command_context(engine_state);
engine_state = add_cli_context(engine_state);
engine_state = add_custom_commands(store.clone(), engine_state);

let init_cwd = std::env::current_dir()?;
gather_parent_env_vars(&mut engine_state, init_cwd.as_ref());
Expand All @@ -31,6 +28,15 @@ impl Engine {
})
}

pub fn add_commands(&mut self, commands: Vec<Box<dyn Command>>) -> Result<(), Error> {
let mut working_set = StateWorkingSet::new(&self.state);
for command in commands {
working_set.add_decl(command);
}
self.state.merge_delta(working_set.render())?;
Ok(())
}

pub fn eval(
&self,
input: PipelineData,
Expand Down
2 changes: 1 addition & 1 deletion src/nu/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod commands;
mod engine;

pub mod commands;
pub mod util;
pub use engine::Engine;
pub use util::{frame_to_pipeline, frame_to_value, value_to_json};
Expand Down
Loading

0 comments on commit a643ea9

Please sign in to comment.