From 007d31eaeb11e7de2403c51e95d5df899e7aaced Mon Sep 17 00:00:00 2001 From: Modularius Date: Thu, 28 Nov 2024 10:13:21 +0000 Subject: [PATCH] Need to join completed producer threads --- trace-to-events/src/main.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/trace-to-events/src/main.rs b/trace-to-events/src/main.rs index 67abb66c..15462bb5 100644 --- a/trace-to-events/src/main.rs +++ b/trace-to-events/src/main.rs @@ -138,18 +138,30 @@ async fn main() -> anyhow::Result<()> { let mut kafka_producer_thread_set = JoinSet::new(); loop { - match consumer.recv().await { - Ok(m) => { - process_kafka_message( - &tracer, - &args, - &mut kafka_producer_thread_set, - &producer, - &m, - ); - consumer.commit_message(&m, CommitMode::Async).unwrap(); + tokio::select! { + msg = consumer.recv() => match msg { + Ok(m) => { + process_kafka_message( + &tracer, + &args, + &mut kafka_producer_thread_set, + &producer, + &m, + ); + consumer.commit_message(&m, CommitMode::Async).unwrap(); + } + Err(e) => warn!("Kafka error: {}", e) + }, + r = kafka_producer_thread_set.join_next() => { + if let Some(r) = r { + match r { + Ok(_) => {} + Err(e) => { + error!("{e}"); + } + } + } } - Err(e) => warn!("Kafka error: {}", e), } } }