From 9cc5cbc66939a8f68cb95b51f748a6c75efa1e03 Mon Sep 17 00:00:00 2001 From: "Luke S. Phillips" Date: Sun, 8 Oct 2023 17:16:08 +0100 Subject: [PATCH] Ensured flush token is released if offset commit fails. (#2860) --- .../KafkaMessageConsumer.cs | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index ef19d91dad..8332d40dc6 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -411,27 +411,33 @@ public int StoredOffsets() /// private void CommitOffsets() { - - var listOffsets = new List(); - for (int i = 0; i < _maxBatchSize; i++) - { - bool hasOffsets = _offsetStorage.TryTake(out var offset); - if (hasOffsets) - listOffsets.Add(offset); - else - break; - - } - - if (s_logger.IsEnabled(LogLevel.Information)) - { - var offsets = listOffsets.Select(tpo => $"Topic: {tpo.Topic} Partition: {tpo.Partition.Value} Offset: {tpo.Offset.Value}"); - var offsetAsString = string.Join(Environment.NewLine, offsets); - s_logger.LogInformation("Commiting offsets: {0} {Offset}", Environment.NewLine, offsetAsString); - } - - _consumer.Commit(listOffsets); - _flushToken.Release(1); + try + { + var listOffsets = new List(); + for (int i = 0; i < _maxBatchSize; i++) + { + bool hasOffsets = _offsetStorage.TryTake(out var offset); + if (hasOffsets) + listOffsets.Add(offset); + else + break; + + } + + if (s_logger.IsEnabled(LogLevel.Information)) + { + var offsets = listOffsets.Select(tpo => + $"Topic: {tpo.Topic} Partition: {tpo.Partition.Value} Offset: {tpo.Offset.Value}"); + var offsetAsString = string.Join(Environment.NewLine, offsets); + s_logger.LogInformation("Commiting offsets: {0} {Offset}", Environment.NewLine, offsetAsString); + } + + _consumer.Commit(listOffsets); + } + finally + { + _flushToken.Release(1); + } } private void CommitAllOffsets(DateTime flushTime)