Skip to content

Commit

Permalink
Need to join completed producer threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Modularius committed Nov 28, 2024
1 parent b86a241 commit 007d31e
Showing 1 changed file with 23 additions and 11 deletions.
34 changes: 23 additions & 11 deletions trace-to-events/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,30 @@ async fn main() -> anyhow::Result<()> {
let mut kafka_producer_thread_set = JoinSet::new();

loop {
match consumer.recv().await {
Ok(m) => {
process_kafka_message(
&tracer,
&args,
&mut kafka_producer_thread_set,
&producer,
&m,
);
consumer.commit_message(&m, CommitMode::Async).unwrap();
tokio::select! {
msg = consumer.recv() => match msg {
Ok(m) => {
process_kafka_message(
&tracer,
&args,
&mut kafka_producer_thread_set,
&producer,
&m,
);
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
Err(e) => warn!("Kafka error: {}", e)
},
r = kafka_producer_thread_set.join_next() => {
if let Some(r) = r {
match r {
Ok(_) => {}
Err(e) => {
error!("{e}");
}
}
}
}
Err(e) => warn!("Kafka error: {}", e),
}
}
}
Expand Down

0 comments on commit 007d31e

Please sign in to comment.