Skip to content

Commit

Permalink
Single producer thread event formation (#285)
Browse files Browse the repository at this point in the history
## Summary of changes

The current event-formation component creates a new thread to produce
each outgoing message resulting in the overhead associated with thread
for each message. In this PR, the component creates a single persistent
thread for producing messages to Kafka, and transfers processed traces
from the main thread to this thread via a channel.

## Instruction for review/testing

Commentary on the new architecture is welcome. Is this a good change?

General code review.

Has been tested on simulated and Kafka data.
  • Loading branch information
Modularius authored Dec 9, 2024
1 parent 10b362d commit 0f6fd91
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions trace-to-events/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parameters::{DetectorSettings, Mode, Polarity};
use rdkafka::{
consumer::{CommitMode, Consumer},
message::{BorrowedHeaders, BorrowedMessage},
producer::{FutureProducer, FutureRecord},
producer::{DeliveryFuture, FutureProducer, FutureRecord},
Message,
};
use std::{net::SocketAddr, path::PathBuf};
Expand All @@ -32,8 +32,8 @@ use supermusr_streaming_types::{
flatbuffers::{FlatBufferBuilder, InvalidFlatbuffer},
FrameMetadata,
};
use tokio::task::JoinSet;
use tracing::{debug, error, instrument, metadata::LevelFilter, trace, warn};
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tracing::{debug, error, info, instrument, metadata::LevelFilter, trace, warn};

#[derive(Debug, Parser)]
#[clap(author, version, about)]
Expand Down Expand Up @@ -61,6 +61,11 @@ struct Cli {
#[clap(long, default_value = "0")]
baseline: Intensity,

/// Size of the send eventlist buffer.
/// If this limit is exceeded, the component will exit.
#[clap(long, default_value = "1024")]
send_eventlist_buffer_size: usize,

/// Endpoint on which OpenMetrics flavour metrics are available
#[clap(long, env, default_value = "127.0.0.1:9090")]
observability_address: SocketAddr,
Expand Down Expand Up @@ -135,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
"Number of failures encountered"
);

let mut kafka_producer_thread_set = JoinSet::new();
let sender = create_producer_task(args.send_eventlist_buffer_size);

loop {
tokio::select! {
Expand All @@ -144,18 +149,53 @@ async fn main() -> anyhow::Result<()> {
process_kafka_message(
&tracer,
&args,
&mut kafka_producer_thread_set,
&sender,
&producer,
&m,
);
)?;
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
Err(e) => warn!("Kafka error: {}", e)
},
join_next = kafka_producer_thread_set.join_next() => {
if let Some(Err(e)) = join_next {
error!("Error Joining Kafka Producer Task: {e}");
}
}
}
}

type DigitiserEventListToBufferSender = Sender<DeliveryFuture>;
type TrySendDigitiserEventListError = TrySendError<DeliveryFuture>;

// The following functions control the kafka producer thread
fn create_producer_task(
send_digitiser_eventlist_buffer_size: usize,
) -> DigitiserEventListToBufferSender {
let (channel_send, channel_recv) =
tokio::sync::mpsc::channel::<DeliveryFuture>(send_digitiser_eventlist_buffer_size);

tokio::spawn(produce_to_kafka(channel_recv));
channel_send
}

async fn produce_to_kafka(mut channel_recv: Receiver<DeliveryFuture>) {
loop {
// Blocks until a frame is received
match channel_recv.recv().await {
Some(future) => match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
}
},
None => {
info!("Send-Frame channel closed");
return;
}
}
}
Expand All @@ -172,10 +212,10 @@ fn spanned_root_as_digitizer_analog_trace_message(
fn process_kafka_message(
tracer: &TracerEngine,
args: &Cli,
kafka_producer_thread_set: &mut JoinSet<()>,
sender: &DigitiserEventListToBufferSender,
producer: &FutureProducer,
m: &BorrowedMessage,
) {
) -> Result<(), TrySendDigitiserEventListError> {
debug!(
"key: '{:?}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(),
Expand All @@ -197,10 +237,10 @@ fn process_kafka_message(
tracer,
m.headers(),
args,
kafka_producer_thread_set,
sender,
producer,
thing,
),
)?,
Err(e) => {
warn!("Failed to parse message: {}", e);
counter!(
Expand All @@ -219,6 +259,7 @@ fn process_kafka_message(
.increment(1);
}
}
Ok(())
}

#[instrument(
Expand All @@ -238,10 +279,10 @@ fn process_digitiser_trace_message(
tracer: &TracerEngine,
headers: Option<&BorrowedHeaders>,
args: &Cli,
kafka_producer_thread_set: &mut JoinSet<()>,
sender: &DigitiserEventListToBufferSender,
producer: &FutureProducer,
thing: DigitizerAnalogTraceMessage,
) {
) -> Result<(), TrySendDigitiserEventListError> {
thing
.metadata()
.try_into()
Expand Down Expand Up @@ -270,20 +311,17 @@ fn process_digitiser_trace_message(

let future = producer.send_result(future_record).expect("Producer sends");

kafka_producer_thread_set.spawn(async move {
match future.await {
Ok(_) => {
trace!("Published event message");
counter!(MESSAGES_PROCESSED).increment(1);
if let Err(e) = sender.try_send(future) {
match &e {
TrySendError::Closed(_) => {
error!("Send-Frame Channel Closed");
}
Err(e) => {
error!("{:?}", e);
counter!(
FAILURES,
&[failures::get_label(FailureKind::KafkaPublishFailed)]
)
.increment(1);
TrySendError::Full(_) => {
error!("Send-Frame Buffer Full");
}
}
});
Err(e)
} else {
Ok(())
}
}

0 comments on commit 0f6fd91

Please sign in to comment.