Skip to content

Commit

Permalink
feat: enable dynamic worker calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Jul 4, 2023
1 parent 3561fe6 commit c63fd40
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);

/// <summary>
/// Sets the number of threads that will be used to consume the messages dynamically based on a provided function
/// </summary>
/// <param name="calculator">A function that takes the current context of the consumer and returns the desired number of workers</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, int> calculator);

/// <summary>
/// Sets how many messages will be buffered for each worker
/// </summary>
Expand Down
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow.Configuration
{
public class WorkersCountContext
{
public WorkersCountContext(int PartitionsCount)
{
this.PartitionsCount = PartitionsCount;
}

public int PartitionsCount { get; }
}
}
5 changes: 5 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/TelemetryResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public class TopicPartitionAssignment
/// Gets or sets the consumer lag
/// </summary>
public long Lag { get; set; }

/// <summary>
/// Gets or sets the workers count
/// </summary>
public int Workers { get; set; }
}
}
}
5 changes: 1 addition & 4 deletions src/KafkaFlow.Admin.Dashboard/TelemetryResponseAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ internal static TelemetryResponse Adapt(this IEnumerable<ConsumerTelemetryMetric
return new TelemetryResponse.Consumer
{
Name = consumerName,
WorkersCount = consumerMetricsList
.OrderByDescending(x => x.SentAt)
.First()
.WorkersCount,
Assignments = consumerMetricsList
.OrderBy(x => x.Topic)
.Select(
Expand All @@ -40,6 +36,7 @@ internal static TelemetryResponse Adapt(this IEnumerable<ConsumerTelemetryMetric
InstanceName = m.InstanceName,
TopicName = m.Topic,
Status = m.Status.ToString(),
Workers = m.WorkersCount,
LastUpdate = m.SentAt,
PausedPartitions = m.PausedPartitions,
RunningPartitions = m.RunningPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void Build_RequiredCalls_ReturnDefaultValues()
// Assert
configuration.Topics.Should().BeEquivalentTo(topic1);
configuration.BufferSize.Should().Be(bufferSize);
configuration.WorkersCount.Should().Be(workers);
configuration.WorkersCountCalculator.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().BeNull();
configuration.GetKafkaConfig().EnableAutoOffsetStore.Should().Be(false);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void Build_AllCalls_ReturnPassedValues()
configuration.Topics.Should().BeEquivalentTo(topic1, topic2);
configuration.ConsumerName.Should().Be(name);
configuration.BufferSize.Should().Be(bufferSize);
configuration.WorkersCount.Should().Be(workers);
configuration.WorkersCountCalculator.Should().Be(workers);
configuration.GroupId.Should().Be(groupId);
configuration.GetKafkaConfig().AutoOffsetReset.Should().Be(offsetReset);
configuration.AutoStoreOffsets.Should().Be(false);
Expand Down
17 changes: 3 additions & 14 deletions src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace KafkaFlow.Configuration
internal class ConsumerConfiguration : IConsumerConfiguration
{
private readonly ConsumerConfig consumerConfig;
private int workersCount;

public ConsumerConfiguration(
ConsumerConfig consumerConfig,
Expand All @@ -16,7 +15,7 @@ public ConsumerConfiguration(
string consumerName,
ClusterConfiguration clusterConfiguration,
bool managementDisabled,
int workersCount,
Func<WorkersCountContext, int> workersCountCalculator,
int bufferSize,
TimeSpan workerStopTimeout,
Factory<IDistributionStrategy> distributionStrategyFactory,
Expand Down Expand Up @@ -52,7 +51,7 @@ public ConsumerConfiguration(
this.ConsumerName = consumerName ?? Guid.NewGuid().ToString();
this.ClusterConfiguration = clusterConfiguration;
this.ManagementDisabled = managementDisabled;
this.WorkersCount = workersCount;
this.WorkersCountCalculator = workersCountCalculator;
this.WorkerStopTimeout = workerStopTimeout;
this.StatisticsHandlers = statisticsHandlers;
this.PartitionsAssignedHandlers = partitionsAssignedHandlers;
Expand Down Expand Up @@ -82,17 +81,7 @@ public ConsumerConfiguration(

public bool ManagementDisabled { get; }

public int WorkersCount
{
get => this.workersCount;
set =>
this.workersCount = value > 0 ?
value :
throw new ArgumentOutOfRangeException(
nameof(this.WorkersCount),
this.WorkersCount,
$"The {nameof(this.WorkersCount)} value must be greater than 0");
}
public Func<WorkersCountContext, int> WorkersCountCalculator { get; set; }

public string GroupId => this.consumerConfig.GroupId;

Expand Down
12 changes: 9 additions & 3 deletions src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private string groupId = string.Empty;
private AutoOffsetReset? autoOffsetReset;
private int? maxPollIntervalMs;
private int workersCount;
private Func<WorkersCountContext, int> workersCountCalculator;
private int bufferSize;
private TimeSpan workerStopTimeout = TimeSpan.FromSeconds(30);
private bool autoStoreOffsets = true;
Expand Down Expand Up @@ -121,7 +121,13 @@ public IConsumerConfigurationBuilder WithMaxPollIntervalMs(int maxPollIntervalMs

public IConsumerConfigurationBuilder WithWorkersCount(int workersCount)
{
this.workersCount = workersCount;
this.workersCountCalculator = _ => workersCount;
return this;
}

public IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, int> calculator)
{
this.workersCountCalculator = calculator;
return this;
}

Expand Down Expand Up @@ -252,7 +258,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.name,
clusterConfiguration,
this.disableManagement,
this.workersCount,
this.workersCountCalculator,
this.bufferSize,
this.workerStopTimeout,
this.distributionStrategyFactory,
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public interface IConsumerConfiguration
bool ManagementDisabled { get; }

/// <summary>
/// Gets or sets the number of workers
/// Gets or sets the number of workers calculator
/// </summary>
int WorkersCount { get; set; }
Func<WorkersCountContext, int> WorkersCountCalculator { get; set; }

/// <summary>
/// Gets the consumer group
Expand Down
12 changes: 7 additions & 5 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public ConsumerWorkerPool(
this.distributionStrategyFactory = consumerConfiguration.DistributionStrategyFactory;
}

public async Task StartAsync(IEnumerable<TopicPartition> partitions)
public int CurrentWorkersCount { get; private set; }

public async Task StartAsync(IReadOnlyCollection<TopicPartition> partitions)
{
IOffsetCommitter offsetCommitter =
this.consumer.Configuration.NoStoreOffsets ?
Expand All @@ -51,13 +53,13 @@ public async Task StartAsync(IEnumerable<TopicPartition> partitions)
this.pendingOffsetsHandlers,
this.logHandler);

this.offsetManager = new OffsetManager(
offsetCommitter,
partitions);
this.offsetManager = new OffsetManager(offsetCommitter, partitions);

this.CurrentWorkersCount = this.consumer.Configuration.WorkersCountCalculator(new WorkersCountContext(partitions.Count));

await Task.WhenAll(
Enumerable
.Range(0, this.consumer.Configuration.WorkersCount)
.Range(0, this.CurrentWorkersCount)
.Select(
workerId =>
{
Expand Down
4 changes: 3 additions & 1 deletion src/KafkaFlow/Consumers/IConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ namespace KafkaFlow.Consumers

internal interface IConsumerWorkerPool
{
Task StartAsync(IEnumerable<TopicPartition> partitions);
int CurrentWorkersCount { get; }

Task StartAsync(IReadOnlyCollection<TopicPartition> partitions);

Task StopAsync();

Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Consumers/MessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public MessageConsumer(

public string ClientInstanceName => this.consumerManager.Consumer.ClientInstanceName;

public int WorkersCount => this.consumerManager.Consumer.Configuration.WorkersCount;
public int WorkersCount => this.consumerManager.WorkerPool.CurrentWorkersCount;

public IReadOnlyList<TopicPartition> PausedPartitions =>
this.consumerManager.Consumer.FlowManager?.PausedPartitions ??
Expand Down Expand Up @@ -121,7 +121,7 @@ public async Task OverrideOffsetsAndRestartAsync(IReadOnlyCollection<TopicPartit

public async Task ChangeWorkersCountAndRestartAsync(int workersCount)
{
this.consumerManager.Consumer.Configuration.WorkersCount = workersCount;
this.consumerManager.Consumer.Configuration.WorkersCountCalculator = _ => workersCount;

await this.InternalRestart().ConfigureAwait(false);

Expand Down

0 comments on commit c63fd40

Please sign in to comment.