Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown #287

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ serde_json = { version = "1.0.133" }
supermusr-common = { path = "./common" }
supermusr-streaming-types = { path = "./streaming-types" }
taos = { version = "0.10.27", default_features = false, features = ["ws"] }
tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "sync"] }
tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "signal", "sync"] }
thiserror = "1.0.69"
tracing = "0.1.41"
tracing-opentelemetry = "0.23.0"
Expand Down
84 changes: 67 additions & 17 deletions digitiser-aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ use supermusr_streaming_types::{
},
flatbuffers::InvalidFlatbuffer,
};
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn};
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::TrySendError, Receiver, Sender},
task::JoinHandle,
};
use tracing::{debug, error, info, info_span, instrument, level_filters::LevelFilter, warn};

const PRODUCER_TIMEOUT: Timeout = Timeout::After(Duration::from_millis(100));

type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;

#[derive(Debug, Parser)]
#[clap(author, version, about)]
struct Cli {
Expand Down Expand Up @@ -92,9 +100,6 @@ struct Cli {
otel_namespace: String,
}

type AggregatedFrameToBufferSender = Sender<AggregatedFrame<EventData>>;
type TrySendAggregatedFrameError = TrySendError<AggregatedFrame<EventData>>;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Cli::parse();
Expand Down Expand Up @@ -157,12 +162,15 @@ async fn main() -> anyhow::Result<()> {
let mut cache_poll_interval = tokio::time::interval(Duration::from_millis(args.cache_poll_ms));

// Creates Send-Frame thread and returns channel sender
let channel_send = create_producer_task(
let (channel_send, producer_task_handle) = create_producer_task(
tracer.use_otel(),
args.send_frame_buffer_size,
&producer,
&args.output_topic,
);
)?;

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
Expand All @@ -179,6 +187,12 @@ async fn main() -> anyhow::Result<()> {
_ = cache_poll_interval.tick() => {
cache_poll(&channel_send, &mut cache).await?;
}
_ = sigint.recv() => {
// Wait for the channel to close and
// all pending production tasks to finish
producer_task_handle.await?;
return Ok(());
}
}
}
}
Expand Down Expand Up @@ -313,39 +327,75 @@ fn create_producer_task(
send_frame_buffer_size: usize,
producer: &FutureProducer,
output_topic: &str,
) -> AggregatedFrameToBufferSender {
) -> std::io::Result<(AggregatedFrameToBufferSender, JoinHandle<()>)> {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<AggregatedFrame<EventData>>(send_frame_buffer_size);

tokio::spawn(produce_to_kafka(
let sigint = signal(SignalKind::interrupt())?;
let handle = tokio::spawn(produce_to_kafka(
use_otel,
channel_recv,
producer.to_owned(),
output_topic.to_owned(),
sigint,
));
channel_send
Ok((channel_send, handle))
}

async fn produce_to_kafka(
use_otel: bool,
mut channel_recv: Receiver<AggregatedFrame<EventData>>,
producer: FutureProducer,
output_topic: String,
mut sigint: Signal,
) {
loop {
// Blocks until a frame is received
match channel_recv.recv().await {
Some(frame) => {
produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await;
select! {
message = channel_recv.recv() => {
// Blocks until a frame is received
match message {
Some(frame) => {
produce_frame_to_kafka(use_otel, frame, &producer, &output_topic).await;
}
None => {
info!("Send-Frame channel closed");
return;
}
}
}
None => {
error!("Send-Frame Receiver Error");
return;
_ = sigint.recv() => {
close_and_flush_producer_channel(use_otel,&mut channel_recv,&producer,&output_topic).await;
}
}
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capacity = channel_recv.capacity(), max_capacity = channel_recv.max_capacity()))]
async fn close_and_flush_producer_channel(
use_otel: bool,
channel_recv: &mut Receiver<AggregatedFrame<EventData>>,
producer: &FutureProducer,
output_topic: &str,
) -> Option<()> {
channel_recv.close();

loop {
let frame = channel_recv.recv().await?;
flush_frame(use_otel, frame, producer, output_topic).await?;
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Flush Frame")]
async fn flush_frame(
use_otel: bool,
frame: AggregatedFrame<EventData>,
producer: &FutureProducer,
output_topic: &str,
) -> Option<()> {
produce_frame_to_kafka(use_otel, frame, producer, output_topic).await;
Some(())
}

async fn produce_frame_to_kafka(
use_otel: bool,
frame: AggregatedFrame<EventData>,
Expand Down
18 changes: 16 additions & 2 deletions nexus-writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use supermusr_streaming_types::{
flatbuffers::InvalidFlatbuffer,
FrameMetadata,
};
use tokio::time;
use tokio::{
signal::unix::{signal, SignalKind},
time,
};
use tracing::{debug, error, info_span, instrument, level_filters::LevelFilter, warn, warn_span};

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -195,10 +198,16 @@ async fn main() -> anyhow::Result<()> {
"Number of failures encountered"
);

let run_ttl =
Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible");

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
_ = nexus_write_interval.tick() => {
nexus_engine.flush(&Duration::try_milliseconds(args.cache_run_ttl_ms).expect("Conversion is possible"));
nexus_engine.flush(&run_ttl);
nexus_engine.flush_move_cache().await;
}
event = consumer.recv() => {
Expand All @@ -214,6 +223,11 @@ async fn main() -> anyhow::Result<()> {
}
}
}
_ = sigint.recv() => {
// Move any runs in the `move cache` before shutting down.
nexus_engine.close().await;
return Ok(());
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions nexus-writer/src/nexus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl NexusEngine {
}
self.run_move_cache.clear();
}

#[tracing::instrument(skip_all, level = "info", name = "Closing", fields(num_runs_to_archive = self.run_move_cache.len()))]
pub(crate) async fn close(mut self) {
self.flush_move_cache().await;
}
}

#[cfg(test)]
Expand Down
128 changes: 87 additions & 41 deletions trace-to-events/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ use supermusr_streaming_types::{
flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer},
FrameMetadata,
};
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tokio::{
select,
signal::unix::{signal, Signal, SignalKind},
sync::mpsc::{error::TrySendError, Receiver, Sender},
task::JoinHandle,
};
use tracing::{debug, error, info, instrument, metadata::LevelFilter, trace, warn};

type DigitiserEventListToBufferSender = Sender<DeliveryFuture>;
type TrySendDigitiserEventListError = TrySendError<DeliveryFuture>;

#[derive(Debug, Parser)]
#[clap(author, version, about)]
struct Cli {
Expand Down Expand Up @@ -140,7 +148,10 @@ async fn main() -> anyhow::Result<()> {
"Number of failures encountered"
);

let sender = create_producer_task(args.send_eventlist_buffer_size);
let (sender, producer_task_handle) = create_producer_task(args.send_eventlist_buffer_size)?;

// Is used to await any sigint signals
let mut sigint = signal(SignalKind::interrupt())?;

loop {
tokio::select! {
Expand All @@ -156,46 +167,12 @@ async fn main() -> anyhow::Result<()> {
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
Err(e) => warn!("Kafka error: {}", e)
}
}
}
}

type DigitiserEventListToBufferSender = Sender<DeliveryFuture>;
type TrySendDigitiserEventListError = TrySendError<DeliveryFuture>;

// The following functions control the kafka producer thread
fn create_producer_task(
send_digitiser_eventlist_buffer_size: usize,
) -> DigitiserEventListToBufferSender {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<DeliveryFuture>(send_digitiser_eventlist_buffer_size);

tokio::spawn(produce_to_kafka(channel_recv));
channel_send
}

async fn produce_to_kafka(mut channel_recv: Receiver<DeliveryFuture>) {
loop {
// Blocks until a frame is received
match channel_recv.recv().await {
Some(future) => match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
}
},
None => {
info!("Send-Frame channel closed");
return;
_ = sigint.recv() => {
// Wait for the channel to close and
// all pending production tasks to finish
producer_task_handle.await?;
return Ok(());
}
}
}
Expand Down Expand Up @@ -325,3 +302,72 @@ fn process_digitiser_trace_message(
Ok(())
}
}

// The following functions control the kafka producer thread
fn create_producer_task(
send_digitiser_eventlist_buffer_size: usize,
) -> std::io::Result<(DigitiserEventListToBufferSender, JoinHandle<()>)> {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<DeliveryFuture>(send_digitiser_eventlist_buffer_size);

let sigint = signal(SignalKind::interrupt())?;
let handle = tokio::spawn(produce_to_kafka(channel_recv, sigint));
Ok((channel_send, handle))
}

async fn produce_to_kafka(mut channel_recv: Receiver<DeliveryFuture>, mut sigint: Signal) {
loop {
// Blocks until a frame is received
select! {
message = channel_recv.recv() => {
match message {
Some(future) => {
produce_eventlist_to_kafka(future).await
},
None => {
info!("Send-Eventlist channel closed");
return;
}
}
},
_ = sigint.recv() => {
close_and_flush_producer_channel(&mut channel_recv).await;
}
}
}
}

async fn produce_eventlist_to_kafka(future: DeliveryFuture) {
match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
}
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Closing", level = "info", fields(capactity = channel_recv.capacity(), max_capactity = channel_recv.max_capacity()))]
async fn close_and_flush_producer_channel(
channel_recv: &mut Receiver<DeliveryFuture>,
) -> Option<()> {
channel_recv.close();

loop {
let future = channel_recv.recv().await?;
flush_eventlist(future).await?;
}
}

#[tracing::instrument(skip_all, target = "otel", name = "Flush Eventlist")]
async fn flush_eventlist(future: DeliveryFuture) -> Option<()> {
produce_eventlist_to_kafka(future).await;
Some(())
}
Loading