Skip to content

Commit

Permalink
fix: fix NullReferenceEXception when stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Jun 28, 2022
1 parent 6695dd2 commit eb8124b
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ public async Task StopAsync()

public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, CancellationToken stopCancellationToken)
{
var localOffsetManager = this.offsetManager;
var worker = (IConsumerWorker) await this.distributionStrategy
.GetWorkerAsync(message.Message.Key, stopCancellationToken)
.ConfigureAwait(false);

if (worker == null)
if (worker is null || localOffsetManager is null)
{
return;
}

this.offsetManager.Enqueue(message.TopicPartitionOffset);
localOffsetManager.Enqueue(message.TopicPartitionOffset);

await worker
.EnqueueAsync(message, stopCancellationToken)
Expand Down

0 comments on commit eb8124b

Please sign in to comment.