diff --git a/Cargo.lock b/Cargo.lock index 64aa0408..0a79c5a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3286,6 +3286,7 @@ dependencies = [ "libc", "mio 1.0.2", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index 4b05d9d8..f6fef030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ serde_json = { version = "1.0.133" } supermusr-common = { path = "./common" } supermusr-streaming-types = { path = "./streaming-types" } taos = { version = "0.10.27", default_features = false, features = ["ws"] } -tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "sync"] } +tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "signal", "sync"] } thiserror = "1.0.69" tracing = "0.1.41" tracing-opentelemetry = "0.23.0" diff --git a/digitiser-aggregator/src/main.rs b/digitiser-aggregator/src/main.rs index 94fb998e..7271d78f 100644 --- a/digitiser-aggregator/src/main.rs +++ b/digitiser-aggregator/src/main.rs @@ -32,11 +32,19 @@ use supermusr_streaming_types::{ }, flatbuffers::InvalidFlatbuffer, }; -use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender}; -use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn}; +use tokio::{ + select, + signal::unix::{signal, Signal, SignalKind}, + sync::mpsc::{error::TrySendError, Receiver, Sender}, + task::JoinHandle, +}; +use tracing::{debug, error, info, info_span, instrument, level_filters::LevelFilter, warn}; const PRODUCER_TIMEOUT: Timeout = Timeout::After(Duration::from_millis(100)); +type AggregatedFrameToBufferSender = Sender>; +type TrySendAggregatedFrameError = TrySendError>; + #[derive(Debug, Parser)] #[clap(author, version, about)] struct Cli { @@ -92,9 +100,6 @@ struct Cli { otel_namespace: String, } -type AggregatedFrameToBufferSender = Sender>; -type TrySendAggregatedFrameError = TrySendError>; - #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Cli::parse(); @@ -157,12 +162,15 @@ async fn main() -> anyhow::Result<()> { let mut cache_poll_interval = tokio::time::interval(Duration::from_millis(args.cache_poll_ms)); // Creates Send-Frame thread and returns channel sender - let channel_send = create_producer_task( + let (channel_send, producer_task_handle) = create_producer_task( tracer.use_otel(), args.send_frame_buffer_size, &producer, &args.output_topic, - ); + )?; + + // Is used to await any sigint signals + let mut sigint = signal(SignalKind::interrupt())?; loop { tokio::select! { @@ -179,6 +187,12 @@ async fn main() -> anyhow::Result<()> { _ = cache_poll_interval.tick() => { cache_poll(&channel_send, &mut cache).await?; } + _ = sigint.recv() => { + // Wait for the channel to close and + // all pending production tasks to finish + producer_task_handle.await?; + return Ok(()); + } } } } @@ -313,17 +327,19 @@ fn create_producer_task( send_frame_buffer_size: usize, producer: &FutureProducer, output_topic: &str, -) -> AggregatedFrameToBufferSender { +) -> std::io::Result<(AggregatedFrameToBufferSender, JoinHandle<()>)> { let (channel_send, channel_recv) = tokio::sync::mpsc::channel::>(send_frame_buffer_size); - tokio::spawn(produce_to_kafka( + let sigint = signal(SignalKind::interrupt())?; + let handle = tokio::spawn(produce_to_kafka( use_otel, channel_recv, producer.to_owned(), output_topic.to_owned(), + sigint, )); - channel_send + Ok((channel_send, handle)) } async fn produce_to_kafka( @@ -331,21 +347,55 @@ async fn produce_to_kafka( mut channel_recv: Receiver>, producer: FutureProducer, output_topic: String, + mut sigint: Signal, ) { loop { - // Blocks until a frame is received - match channel_recv.recv().await { - Some(frame) => { - produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await; + select! { + message = channel_recv.recv() => { + // Blocks until a frame is received + match message { + Some(frame) => { + produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await; + } + None => { + info!("Send-Frame channel closed"); + return; + } + } } - None => { - error!("Send-Frame Receiver Error"); - return; + _ = sigint.recv() => { + close_and_flush_producer_channel(use_otel,&mut channel_recv,&producer,&output_topic).await; } } } } +#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capacity = channel_recv.capacity(), max_capacity = channel_recv.max_capacity()))] +async fn close_and_flush_producer_channel( + use_otel: bool, + channel_recv: &mut Receiver>, + producer: &FutureProducer, + output_topic: &str, +) -> Option<()> { + channel_recv.close(); + + loop { + let frame = channel_recv.recv().await?; + flush_frame(use_otel, frame, producer, output_topic).await?; + } +} + +#[tracing::instrument(skip_all, target = "otel", name = "Flush Frame")] +async fn flush_frame( + use_otel: bool, + frame: AggregatedFrame, + producer: &FutureProducer, + output_topic: &str, +) -> Option<()> { + produce_frame_to_kafka(use_otel, frame, producer, output_topic).await; + Some(()) +} + async fn produce_frame_to_kafka( use_otel: bool, frame: AggregatedFrame, diff --git a/nexus-writer/src/main.rs b/nexus-writer/src/main.rs index 41cde6e7..391b58e0 100644 --- a/nexus-writer/src/main.rs +++ b/nexus-writer/src/main.rs @@ -37,7 +37,10 @@ use supermusr_streaming_types::{ flatbuffers::InvalidFlatbuffer, FrameMetadata, }; -use tokio::time; +use tokio::{ + signal::unix::{signal, SignalKind}, + time, +}; use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn, warn_span}; #[derive(Debug, Parser)] @@ -195,10 +198,16 @@ async fn main() -> anyhow::Result<()> { "Number of failures encountered" ); + let run_ttl = + Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible"); + + // Is used to await any sigint signals + let mut sigint = signal(SignalKind::interrupt())?; + loop { tokio::select! { _ = nexus_write_interval.tick() => { - nexus_engine.flush(&Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible")); + nexus_engine.flush(&run_ttl); nexus_engine.flush_move_cache().await; } event = consumer.recv() => { @@ -214,6 +223,11 @@ async fn main() -> anyhow::Result<()> { } } } + _ = sigint.recv() => { + // Move any runs in the `move cache` before shutting down. + nexus_engine.close().await; + return Ok(()); + } } } } diff --git a/nexus-writer/src/nexus/engine.rs b/nexus-writer/src/nexus/engine.rs index abc04f90..e0999932 100644 --- a/nexus-writer/src/nexus/engine.rs +++ b/nexus-writer/src/nexus/engine.rs @@ -256,6 +256,11 @@ impl NexusEngine { } self.run_move_cache.clear(); } + + #[tracing::instrument(skip_all, level = "info", name = "Closing", fields(num_runs_to_archive = self.run_move_cache.len()))] + pub(crate) async fn close(mut self) { + self.flush_move_cache().await; + } } #[cfg(test)] diff --git a/trace-to-events/src/main.rs b/trace-to-events/src/main.rs index 429ab253..d43b73fe 100644 --- a/trace-to-events/src/main.rs +++ b/trace-to-events/src/main.rs @@ -32,9 +32,17 @@ use supermusr_streaming_types::{ flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer}, FrameMetadata, }; -use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender}; +use tokio::{ + select, + signal::unix::{signal, Signal, SignalKind}, + sync::mpsc::{error::TrySendError, Receiver, Sender}, + task::JoinHandle, +}; use tracing::{debug, error, info, instrument, metadata::LevelFilter, trace, warn}; +type DigitiserEventListToBufferSender = Sender; +type TrySendDigitiserEventListError = TrySendError; + #[derive(Debug, Parser)] #[clap(author, version, about)] struct Cli { @@ -140,7 +148,10 @@ async fn main() -> anyhow::Result<()> { "Number of failures encountered" ); - let sender = create_producer_task(args.send_eventlist_buffer_size); + let (sender, producer_task_handle) = create_producer_task(args.send_eventlist_buffer_size)?; + + // Is used to await any sigint signals + let mut sigint = signal(SignalKind::interrupt())?; loop { tokio::select! { @@ -156,46 +167,12 @@ async fn main() -> anyhow::Result<()> { consumer.commit_message(&m, CommitMode::Async).unwrap(); } Err(e) => warn!("Kafka error: {}", e) - } - } - } -} - -type DigitiserEventListToBufferSender = Sender; -type TrySendDigitiserEventListError = TrySendError; - -// The following functions control the kafka producer thread -fn create_producer_task( - send_digitiser_eventlist_buffer_size: usize, -) -> DigitiserEventListToBufferSender { - let (channel_send, channel_recv) = - tokio::sync::mpsc::channel::(send_digitiser_eventlist_buffer_size); - - tokio::spawn(produce_to_kafka(channel_recv)); - channel_send -} - -async fn produce_to_kafka(mut channel_recv: Receiver) { - loop { - // Blocks until a frame is received - match channel_recv.recv().await { - Some(future) => match future.await { - Ok(_) => { - trace!("Published event message"); - counter!(MESSAGES_PROCESSED).increment(1); - } - Err(e) => { - error!("{:?}", e); - counter!( - FAILURES, - &[failures::get_label(FailureKind::KafkaPublishFailed)] - ) - .increment(1); - } }, - None => { - info!("Send-Frame channel closed"); - return; + _ = sigint.recv() => { + // Wait for the channel to close and + // all pending production tasks to finish + producer_task_handle.await?; + return Ok(()); } } } @@ -325,3 +302,72 @@ fn process_digitiser_trace_message( Ok(()) } } + +// The following functions control the kafka producer thread +fn create_producer_task( + send_digitiser_eventlist_buffer_size: usize, +) -> std::io::Result<(DigitiserEventListToBufferSender, JoinHandle<()>)> { + let (channel_send, channel_recv) = + tokio::sync::mpsc::channel::(send_digitiser_eventlist_buffer_size); + + let sigint = signal(SignalKind::interrupt())?; + let handle = tokio::spawn(produce_to_kafka(channel_recv, sigint)); + Ok((channel_send, handle)) +} + +async fn produce_to_kafka(mut channel_recv: Receiver, mut sigint: Signal) { + loop { + // Blocks until a frame is received + select! { + message = channel_recv.recv() => { + match message { + Some(future) => { + produce_eventlist_to_kafka(future).await + }, + None => { + info!("Send-Eventlist channel closed"); + return; + } + } + }, + _ = sigint.recv() => { + close_and_flush_producer_channel(&mut channel_recv).await; + } + } + } +} + +async fn produce_eventlist_to_kafka(future: DeliveryFuture) { + match future.await { + Ok(_) => { + trace!("Published event message"); + counter!(MESSAGES_PROCESSED).increment(1); + } + Err(e) => { + error!("{:?}", e); + counter!( + FAILURES, + &[failures::get_label(FailureKind::KafkaPublishFailed)] + ) + .increment(1); + } + } +} + +#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capactity = channel_recv.capacity(), max_capactity = channel_recv.max_capacity()))] +async fn close_and_flush_producer_channel( + channel_recv: &mut Receiver, +) -> Option<()> { + channel_recv.close(); + + loop { + let future = channel_recv.recv().await?; + flush_eventlist(future).await?; + } +} + +#[tracing::instrument(skip_all, target = "otel", name = "Flush Eventlist")] +async fn flush_eventlist(future: DeliveryFuture) -> Option<()> { + produce_eventlist_to_kafka(future).await; + Some(()) +}