Skip to content

Commit

Permalink
fix: wait for WorkerStopTimeout on bus stop
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Oct 17, 2022
1 parent 2f74c2c commit feadfc9
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithBufferSize(int size);

/// <summary>
/// Sets the time that the worker will wait to process the buffered messages
/// before canceling the <see cref="IConsumerContext.WorkerStopped"/>
/// </summary>
/// <param name="seconds">The seconds to wait</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkerStopTimeout(int seconds);

/// <summary>
/// Sets the time that the worker will wait to process the buffered messages
/// before canceling the <see cref="IConsumerContext.WorkerStopped"/>
/// </summary>
/// <param name="timeout">The time to wait</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout);

/// <summary>
/// Sets the strategy to choose a worker when a message arrives
/// </summary>
Expand Down
15 changes: 10 additions & 5 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ private async Task Dispatch(IMessageContext context, MiddlewareDelegate next)

await next(batchContext).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (context.ConsumerContext.WorkerStopped.IsCancellationRequested)
{
return;
}
}
catch (Exception ex)
{
this.logHandler.Error(
Expand All @@ -69,12 +76,10 @@ private async Task Dispatch(IMessageContext context, MiddlewareDelegate next)
context.ConsumerContext.WorkerId,
});
}
finally

foreach (var messageContext in localBatch)
{
foreach (var messageContext in localBatch)
{
messageContext.ConsumerContext.StoreOffset();
}
messageContext.ConsumerContext.StoreOffset();
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public ConsumerConfiguration(
bool managementDisabled,
int workersCount,
int bufferSize,
TimeSpan workerStopTimeout,
Factory<IDistributionStrategy> distributionStrategyFactory,
IReadOnlyList<MiddlewareConfiguration> middlewaresConfigurations,
bool autoStoreOffsets,
Expand Down Expand Up @@ -48,6 +49,7 @@ public ConsumerConfiguration(
this.ClusterConfiguration = clusterConfiguration;
this.ManagementDisabled = managementDisabled;
this.WorkersCount = workersCount;
this.WorkerStopTimeout = workerStopTimeout;
this.StatisticsHandlers = statisticsHandlers;
this.PartitionsAssignedHandlers = partitionsAssignedHandlers;
this.PartitionsRevokedHandlers = partitionsRevokedHandlers;
Expand Down Expand Up @@ -90,6 +92,8 @@ public int WorkersCount

public int BufferSize { get; }

public TimeSpan WorkerStopTimeout { get; }

public bool AutoStoreOffsets { get; }

public ConsumerInitialState InitialState { get; }
Expand Down
14 changes: 14 additions & 0 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private int? maxPollIntervalMs;
private int workersCount;
private int bufferSize;
private TimeSpan workerStopTimeout = TimeSpan.FromSeconds(30);
private bool autoStoreOffsets = true;
private ConsumerInitialState initialState = ConsumerInitialState.Running;
private int statisticsInterval;
Expand Down Expand Up @@ -122,6 +123,18 @@ public IConsumerConfigurationBuilder WithBufferSize(int size)
return this;
}

public IConsumerConfigurationBuilder WithWorkerStopTimeout(int seconds)
{
this.workerStopTimeout = TimeSpan.FromSeconds(seconds);
return this;
}

public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout)
{
this.workerStopTimeout = timeout;
return this;
}

public IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory)
where T : class, IDistributionStrategy
{
Expand Down Expand Up @@ -226,6 +239,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.disableManagement,
this.workersCount,
this.bufferSize,
this.workerStopTimeout,
this.distributionStrategyFactory,
middlewareConfiguration,
this.autoStoreOffsets,
Expand Down
11 changes: 10 additions & 1 deletion src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public interface IConsumerConfiguration
/// </summary>
int BufferSize { get; }

/// <summary>
/// Gets the time that the worker will wait to process the buffered messages
/// before canceling the <see cref="IConsumerContext.WorkerStopped"/>
/// </summary>
TimeSpan WorkerStopTimeout { get; }

/// <summary>
/// Gets a value indicating whether if the application should store store at the end
/// </summary>
Expand Down Expand Up @@ -82,7 +88,10 @@ public interface IConsumerConfiguration
/// <summary>
/// Gets the handlers that will be called when there are pending offsets
/// </summary>
IReadOnlyList<(Action<IDependencyResolver, IEnumerable<TopicPartitionOffset>> handler, TimeSpan interval)> PendingOffsetsHandlers { get; }
IReadOnlyList<(Action<IDependencyResolver, IEnumerable<TopicPartitionOffset>> handler, TimeSpan interval)> PendingOffsetsHandlers
{
get;
}

/// <summary>
/// Gets the custom factory used to create a new <see cref="KafkaFlow.Consumers.IConsumer"/>
Expand Down
122 changes: 69 additions & 53 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,64 +53,24 @@ public Task StartAsync()
this.backgroundTask = Task.Run(
async () =>
{
while (!this.stopCancellationTokenSource.IsCancellationRequested)
{
try
{
var message = await this.messagesBuffer.Reader
.ReadAsync(this.stopCancellationTokenSource.Token)
.ConfigureAwait(false);

var context = new MessageContext(
new Message(message.Message.Key, message.Message.Value),
new MessageHeaders(message.Message.Headers),
new ConsumerContext(
this.consumer,
this.offsetManager,
message,
this.stopCancellationTokenSource.Token,
this.Id),
null);

try
{
var scope = this.dependencyResolver.CreateScope();
var cancellationTokenSource = new CancellationTokenSource();

this.offsetManager.OnOffsetProcessed(
message.TopicPartitionOffset,
() => scope.Dispose());
this.stopCancellationTokenSource.Token.Register(
() => cancellationTokenSource.CancelAfter(this.consumer.Configuration.WorkerStopTimeout));

await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
.ConfigureAwait(false);
}
catch (Exception ex)
{
this.logHandler.Error(
"Error executing consumer",
ex,
new
{
context.Message,
context.ConsumerContext.Topic,
MessageKey = context.Message.Key,
context.ConsumerContext.ConsumerName,
});
}
finally
try
{
while (await this.messagesBuffer.Reader.WaitToReadAsync(CancellationToken.None).ConfigureAwait(false))
{
while (this.messagesBuffer.Reader.TryRead(out var message))
{
if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset)
{
this.offsetManager.MarkAsProcessed(message.TopicPartitionOffset);
}

this.onMessageFinishedHandler?.Invoke();
await this.ProcessMessageAsync(message, cancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// Ignores the exception
}
}
catch (OperationCanceledException)
{
// Ignores the exception
}
});

Expand All @@ -119,6 +79,8 @@ await this.middlewareExecutor

public async Task StopAsync()
{
this.messagesBuffer.Writer.TryComplete();

if (this.stopCancellationTokenSource.Token.CanBeCanceled)
{
this.stopCancellationTokenSource.Cancel();
Expand All @@ -132,5 +94,59 @@ public void OnTaskCompleted(Action handler)
{
this.onMessageFinishedHandler = handler;
}

private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, CancellationToken cancellationToken)
{
var context = new MessageContext(
new Message(message.Message.Key, message.Message.Value),
new MessageHeaders(message.Message.Headers),
new ConsumerContext(
this.consumer,
this.offsetManager,
message,
cancellationToken,
this.Id),
null);

try
{
var scope = this.dependencyResolver.CreateScope();

this.offsetManager.OnOffsetProcessed(
message.TopicPartitionOffset,
() => scope.Dispose());

await this.middlewareExecutor
.Execute(scope.Resolver, context, _ => Task.CompletedTask)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
}
catch (Exception ex)
{
this.logHandler.Error(
"Error processing message",
ex,
new
{
context.Message,
context.ConsumerContext.Topic,
MessageKey = context.Message.Key,
context.ConsumerContext.ConsumerName,
});
}

if (this.consumer.Configuration.AutoStoreOffsets && context.ConsumerContext.ShouldStoreOffset)
{
this.offsetManager.MarkAsProcessed(message.TopicPartitionOffset);
}

this.onMessageFinishedHandler?.Invoke();
}
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/OffsetCommitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private void CommitHandler()
if (!this.consumer.Configuration.ManagementDisabled)
{
this.logHandler.Verbose(
"Committed offsets",
"Offsets committed",
new
{
Offsets = offsets.GroupBy(
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow/Consumers/WorkerPoolFeeder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void Start()
.ConsumeAsync(token)
.ConfigureAwait(false);

if (message is null)
{
continue;
}

await this.workerPool
.EnqueueAsync(message, token)
.ConfigureAwait(false);
Expand Down

0 comments on commit feadfc9

Please sign in to comment.