Skip to content

Commit

Permalink
Graceful shutdown (#287)
Browse files Browse the repository at this point in the history
## Summary of changes

Detects a SIGINT signal from the OS and performs a graceful shutdown.

### `trace-to-events` and `digitiser-aggregator`

For trace-to-events and digitiser-aggregator, both the producer thread,
and the main thread listens for SIGINT.

- When the producer thread hears it, it closes the MPSC channel and
flushes all messages (eventlists and frames respectively) before
terminating the thread.
- When the main-thread hears it, it waits for the producer thread to
end, then terminates.

### `nexus writer`

When the signal is received, the main (and only) thread blocks, moves
any nxs files in the `move_cache` queue, then terminates.

## Instruction for review/testing

General code review.

Tested on simulated data.

Closes
#270.
  • Loading branch information
Modularius authored Dec 13, 2024
1 parent 0705b8f commit 5d98c33
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ serde_json = { version = "1.0.133" }
supermusr-common = { path = "./common" }
supermusr-streaming-types = { path = "./streaming-types" }
taos = { version = "0.10.27", default_features = false, features = ["ws"] }
tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "sync"] }
tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "signal", "sync"] }
thiserror = "1.0.69"
tracing = "0.1.41"
tracing-opentelemetry = "0.23.0"
Expand Down
84 changes: 67 additions & 17 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ use supermusr_streaming_types::{
},
flatbuffers::InvalidFlatbuffer,
};
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn};
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::TrySendError, Receiver, Sender},
task::JoinHandle,
};
use tracing::{debug, error, info, info_span, instrument, level_filters::LevelFilter, warn};

const PRODUCER_TIMEOUT: Timeout = Timeout::After(Duration::from_millis(100));

type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;

#[derive(Debug, Parser)]
#[clap(author, version, about)]
struct Cli {
Expand Down Expand Up @@ -92,9 +100,6 @@ struct Cli {
otel_namespace: String,
}

type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Cli::parse();
Expand Down Expand Up @@ -157,12 +162,15 @@ async fn main() -> anyhow::Result<()> {
let mut cache_poll_interval = tokio::time::interval(Duration::from_millis(args.cache_poll_ms));

// Creates Send-Frame thread and returns channel sender
let channel_send = create_producer_task(
let (channel_send, producer_task_handle) = create_producer_task(
tracer.use_otel(),
args.send_frame_buffer_size,
&producer,
&args.output_topic,
);
)?;

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
Expand All @@ -179,6 +187,12 @@ async fn main() -> anyhow::Result<()> {
_ = cache_poll_interval.tick() => {
cache_poll(&channel_send, &mut cache).await?;
}
_ = sigint.recv() => {
// Wait for the channel to close and
// all pending production tasks to finish
producer_task_handle.await?;
return Ok(());
}
}
}
}
Expand Down Expand Up @@ -313,39 +327,75 @@ fn create_producer_task(
send_frame_buffer_size: usize,
producer: &FutureProducer,
output_topic: &str,
) -> AggregatedFrameToBufferSender {
) -> std::io::Result<(AggregatedFrameToBufferSender, JoinHandle<()>)> {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<AggregatedFrame<EventData>>(send_frame_buffer_size);

tokio::spawn(produce_to_kafka(
let sigint = signal(SignalKind::interrupt())?;
let handle = tokio::spawn(produce_to_kafka(
use_otel,
channel_recv,
producer.to_owned(),
output_topic.to_owned(),
sigint,
));
channel_send
Ok((channel_send, handle))
}

async fn produce_to_kafka(
use_otel: bool,
mut channel_recv: Receiver<AggregatedFrame<EventData>>,
producer: FutureProducer,
output_topic: String,
mut sigint: Signal,
) {
loop {
// Blocks until a frame is received
match channel_recv.recv().await {
Some(frame) => {
produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await;
select! {
message = channel_recv.recv() => {
// Blocks until a frame is received
match message {
Some(frame) => {
produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await;
}
None => {
info!("Send-Frame channel closed");
return;
}
}
}
None => {
error!("Send-Frame Receiver Error");
return;
_ = sigint.recv() => {
close_and_flush_producer_channel(use_otel,&mut channel_recv,&producer,&output_topic).await;
}
}
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capacity = channel_recv.capacity(), max_capacity = channel_recv.max_capacity()))]
async fn close_and_flush_producer_channel(
use_otel: bool,
channel_recv: &mut Receiver<AggregatedFrame<EventData>>,
producer: &FutureProducer,
output_topic: &str,
) -> Option<()> {
channel_recv.close();

loop {
let frame = channel_recv.recv().await?;
flush_frame(use_otel, frame, producer, output_topic).await?;
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Flush Frame")]
async fn flush_frame(
use_otel: bool,
frame: AggregatedFrame<EventData>,
producer: &FutureProducer,
output_topic: &str,
) -> Option<()> {
produce_frame_to_kafka(use_otel, frame, producer, output_topic).await;
Some(())
}

async fn produce_frame_to_kafka(
use_otel: bool,
frame: AggregatedFrame<EventData>,
Expand Down
18 changes: 16 additions & 2 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use supermusr_streaming_types::{
flatbuffers::InvalidFlatbuffer,
FrameMetadata,
};
use tokio::time;
use tokio::{
signal::unix::{signal, SignalKind},
time,
};
use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn, warn_span};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -195,10 +198,16 @@ async fn main() -> anyhow::Result<()> {
"Number of failures encountered"
);

let run_ttl =
Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible");

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
_ = nexus_write_interval.tick() => {
nexus_engine.flush(&Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible"));
nexus_engine.flush(&run_ttl);
nexus_engine.flush_move_cache().await;
}
event = consumer.recv() => {
Expand All @@ -214,6 +223,11 @@ async fn main() -> anyhow::Result<()> {
}
}
}
_ = sigint.recv() => {
// Move any runs in the `move cache` before shutting down.
nexus_engine.close().await;
return Ok(());
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl NexusEngine {
}
self.run_move_cache.clear();
}

#[tracing::instrument(skip_all, level = "info", name = "Closing", fields(num_runs_to_archive = self.run_move_cache.len()))]
pub(crate) async fn close(mut self) {
self.flush_move_cache().await;
}
}

#[cfg(test)]
Expand Down
128 changes: 87 additions & 41 deletions trace-to-events/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ use supermusr_streaming_types::{
flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer},
FrameMetadata,
};
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::TrySendError, Receiver, Sender},
task::JoinHandle,
};
use tracing::{debug, error, info, instrument, metadata::LevelFilter, trace, warn};

type DigitiserEventListToBufferSender = Sender<DeliveryFuture>;
type TrySendDigitiserEventListError = TrySendError<DeliveryFuture>;

#[derive(Debug, Parser)]
#[clap(author, version, about)]
struct Cli {
Expand Down Expand Up @@ -140,7 +148,10 @@ async fn main() -> anyhow::Result<()> {
"Number of failures encountered"
);

let sender = create_producer_task(args.send_eventlist_buffer_size);
let (sender, producer_task_handle) = create_producer_task(args.send_eventlist_buffer_size)?;

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
Expand All @@ -156,46 +167,12 @@ async fn main() -> anyhow::Result<()> {
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
Err(e) => warn!("Kafka error: {}", e)
}
}
}
}

type DigitiserEventListToBufferSender = Sender<DeliveryFuture>;
type TrySendDigitiserEventListError = TrySendError<DeliveryFuture>;

// The following functions control the kafka producer thread
fn create_producer_task(
send_digitiser_eventlist_buffer_size: usize,
) -> DigitiserEventListToBufferSender {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<DeliveryFuture>(send_digitiser_eventlist_buffer_size);

tokio::spawn(produce_to_kafka(channel_recv));
channel_send
}

async fn produce_to_kafka(mut channel_recv: Receiver<DeliveryFuture>) {
loop {
// Blocks until a frame is received
match channel_recv.recv().await {
Some(future) => match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
}
},
None => {
info!("Send-Frame channel closed");
return;
_ = sigint.recv() => {
// Wait for the channel to close and
// all pending production tasks to finish
producer_task_handle.await?;
return Ok(());
}
}
}
Expand Down Expand Up @@ -325,3 +302,72 @@ fn process_digitiser_trace_message(
Ok(())
}
}

// The following functions control the kafka producer thread
fn create_producer_task(
send_digitiser_eventlist_buffer_size: usize,
) -> std::io::Result<(DigitiserEventListToBufferSender, JoinHandle<()>)> {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<DeliveryFuture>(send_digitiser_eventlist_buffer_size);

let sigint = signal(SignalKind::interrupt())?;
let handle = tokio::spawn(produce_to_kafka(channel_recv, sigint));
Ok((channel_send, handle))
}

async fn produce_to_kafka(mut channel_recv: Receiver<DeliveryFuture>, mut sigint: Signal) {
loop {
// Blocks until a frame is received
select! {
message = channel_recv.recv() => {
match message {
Some(future) => {
produce_eventlist_to_kafka(future).await
},
None => {
info!("Send-Eventlist channel closed");
return;
}
}
},
_ = sigint.recv() => {
close_and_flush_producer_channel(&mut channel_recv).await;
}
}
}
}

async fn produce_eventlist_to_kafka(future: DeliveryFuture) {
match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
}
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capactity = channel_recv.capacity(), max_capactity = channel_recv.max_capacity()))]
async fn close_and_flush_producer_channel(
channel_recv: &mut Receiver<DeliveryFuture>,
) -> Option<()> {
channel_recv.close();

loop {
let future = channel_recv.recv().await?;
flush_eventlist(future).await?;
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Flush Eventlist")]
async fn flush_eventlist(future: DeliveryFuture) -> Option<()> {
produce_eventlist_to_kafka(future).await;
Some(())
}

0 comments on commit 5d98c33

Please sign in to comment.