Skip to content

Commit

Permalink
feat: enable workers count calculation in runtime
Browse files Browse the repository at this point in the history
fix: change dependency injection lifetime management for worker and consumer/producer
fix: create AdminClient without authentication
perf: memory optimizations on MessageContext
  • Loading branch information
filipeesch committed Jul 10, 2023
1 parent 3561fe6 commit 8578112
Show file tree
Hide file tree
Showing 56 changed files with 3,035 additions and 11,283 deletions.
131 changes: 131 additions & 0 deletions samples/KafkaFlow.Sample.Dashboard/ConsumerLagWorkerBalancer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
namespace KafkaFlow.Sample.Dashboard;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Clusters;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using TopicMetadata = KafkaFlow.TopicMetadata;
using TopicPartitionOffset = KafkaFlow.TopicPartitionOffset;

public class ConsumerLagWorkerBalancer
{
private readonly IClusterManager clusterManager;
private readonly IConsumerAccessor consumerAccessor;
private readonly int totalConsumerWorkers;
private readonly int maxInstanceWorkers;

public ConsumerLagWorkerBalancer(
IClusterManager clusterManager,
IConsumerAccessor consumerAccessor,
int totalConsumerWorkers,
int maxInstanceWorkers)
{
this.clusterManager = clusterManager;
this.consumerAccessor = consumerAccessor;
this.totalConsumerWorkers = totalConsumerWorkers;
this.maxInstanceWorkers = maxInstanceWorkers;
}

public async Task<int> GetWorkersCountAsync(WorkersCountContext context)
{
var topicsMetadata = await this.GetTopicsMetadataAsync(context);

var lastOffsets = this.GetPartitionsLastOffset(context.ConsumerName, topicsMetadata);

var partitionsOffset = await this.clusterManager.GetConsumerGroupOffsetsAsync(
context.ConsumerGroupId,
context.AssignedTopicsPartitions.Select(t => t.Name));

var partitionsLag = CalculatePartitionsLag(lastOffsets, partitionsOffset);
var myLag = CalculateMyPartitionsLag(context, partitionsLag);

decimal totalConsumerLag = partitionsLag.Sum(p => p.Lag);

var ratio = myLag / totalConsumerLag;

var workers = (int)Math.Round(this.totalConsumerWorkers * ratio);

if (workers > this.maxInstanceWorkers)
{
return this.maxInstanceWorkers;
}

return workers < 1 ? 1 : workers;
}

private static long CalculateMyPartitionsLag(
WorkersCountContext context,
IReadOnlyList<(string Topic, int Partition, long Lag)> partitionsLag)
{
return partitionsLag
.Where(
partitionLag => context.AssignedTopicsPartitions
.Any(
topic => topic.Name == partitionLag.Topic &&
topic.Partitions.Any(p => p == partitionLag.Partition)))
.Sum(partitionLag => partitionLag.Lag);
}

private static IReadOnlyList<(string Topic, int Partition, long Lag)> CalculatePartitionsLag(
IEnumerable<(string Topic, int Partition, long Offset)> lastOffsets,
IEnumerable<TopicPartitionOffset> currentPartitionsOffset)
{
return lastOffsets
.Select(
last =>
{
var currentOffset = currentPartitionsOffset
.Where(current => current.Topic == last.Topic && current.Partition == last.Partition)
.Select(current => current.Offset)
.FirstOrDefault(0);

var lastOffset = Math.Max(0, last.Offset);
currentOffset = Math.Max(0, currentOffset);

return (last.Topic, last.Partition, lastOffset - currentOffset);
})
.ToList();
}

private IReadOnlyList<(string TopicName, int Partition, long Offset)> GetPartitionsLastOffset(
string consumerName,
IEnumerable<(string Name, TopicMetadata Metadata)> topicsMetadata)
{
var consumer = this.consumerAccessor[consumerName];

var offsets = new List<(string TopicName, int Partition, long Offset)>();

foreach (var topic in topicsMetadata)
{
foreach (var partition in topic.Metadata.Partitions)
{
offsets.Add(
(
topic.Name,
partition.Id,
consumer.QueryWatermarkOffsets(
new TopicPartition(topic.Name, new Partition(partition.Id)),
TimeSpan.FromSeconds(30))
.High.Value));
}
}

return offsets;
}

private async Task<IReadOnlyList<(string Name, TopicMetadata Metadata)>> GetTopicsMetadataAsync(WorkersCountContext context)
{
var topicsMetadata = new List<(string Name, TopicMetadata Metadata)>(context.AssignedTopicsPartitions.Count);

foreach (var topic in context.AssignedTopicsPartitions)
{
topicsMetadata.Add((topic.Name, await this.clusterManager.GetTopicMetadataAsync(topic.Name)));
}

return topicsMetadata;
}
}
40 changes: 36 additions & 4 deletions samples/KafkaFlow.Sample.Dashboard/Program.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
using System;
using KafkaFlow.Producers;
using KafkaFlow.Sample.Dashboard;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

await CreateHostBuilder(args)
.Build()
.RunAsync();
var host = CreateHostBuilder(args)
.Build();

var producer = host.Services.GetRequiredService<IProducerAccessor>()["producer"];

_ = host.RunAsync();

static IHostBuilder CreateHostBuilder(string[] args) =>
Host
.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); });
.ConfigureWebHostDefaults(
webBuilder =>
{
webBuilder
.UseStartup<Startup>()
.UseKestrel(options => options.ListenAnyIP(int.Parse(args[0])));
});

while (true)
{
var input = Console.ReadLine();

var splitted = input.Split(" ");

var count = int.Parse(splitted[0]);

int? partition = null;

if (splitted.Length > 1)
{
partition = int.Parse(splitted[1]);
}

for (int i = 0; i < count; i++)
{
_ = producer.ProduceAsync(Guid.NewGuid().ToString(), Array.Empty<byte>(), partition: partition);
}
}
77 changes: 51 additions & 26 deletions samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
namespace KafkaFlow.Sample.Dashboard;

using System;
using System.Threading.Tasks;
using KafkaFlow.Admin.Dashboard;
using KafkaFlow.Clusters;
using KafkaFlow.Consumers;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -11,31 +15,44 @@ public class Startup
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddKafkaFlowHostedService(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster =>
{
const string topicName = "topic-dashboard";
cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
.EnableTelemetry("kafka-flow.admin", "kafka-flow.telemetry.group.id")
.CreateTopicIfNotExists(topicName, 3, 1)
.AddConsumer(
consumer =>
{
consumer
.Topics(topicName)
.WithGroupId("groupid-dashboard")
.WithName("consumer-dashboard")
.WithBufferSize(100)
.WithWorkersCount(20)
.WithAutoOffsetReset(AutoOffsetReset.Latest);
});
})
);
services
.AddKafkaFlowHostedService(
kafka => kafka
.UseConsoleLog()
.AddCluster(
cluster =>
{
const string topicName = "topic-dashboard";
cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
.EnableTelemetry("kafka-flow.admin", "kafka-flow.telemetry.group.id")
.CreateTopicIfNotExists(topicName, 3, 1)
.AddConsumer(
consumer =>
{
consumer
.Topics(topicName)
.WithGroupId("groupid-dashboard")
.WithName("consumer-dashboard")
.WithBufferSize(1)
.WithWorkersCount(
(context, resolver) =>
new ConsumerLagWorkerBalancer(
resolver.Resolve<IClusterManager>(),
resolver.Resolve<IConsumerAccessor>(),
2,
1)
.GetWorkersCountAsync(context),
TimeSpan.FromSeconds(60))
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
m =>
m.Add<DelayMiddleware>());
})
.AddProducer("producer", producer => producer.DefaultTopic(topicName));
})
);

services
.AddControllers();
Expand All @@ -49,4 +66,12 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp
.UseEndpoints(endpoints => { endpoints.MapControllers(); })
.UseKafkaFlowDashboard();
}
}
}

public class DelayMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
await Task.Delay(1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Used to build the consumer configuration
Expand Down Expand Up @@ -91,6 +92,23 @@ public interface IConsumerConfigurationBuilder
/// <returns></returns>
IConsumerConfigurationBuilder WithWorkersCount(int workersCount);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <param name="evaluationInterval">The interval that the calculator will be called</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(
Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator,
TimeSpan evaluationInterval);

/// <summary>
/// Configures a custom function to dynamically calculate the number of workers.
/// </summary>
/// <param name="calculator">A function that takes a WorkersCountContext object and returns a Task yielding the new workers count</param>
/// <returns>The IConsumerConfigurationBuilder instance for method chaining</returns>
IConsumerConfigurationBuilder WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator);

/// <summary>
/// Sets how many messages will be buffered for each worker
/// </summary>
Expand Down
29 changes: 29 additions & 0 deletions src/KafkaFlow.Abstractions/Configuration/WorkersCountContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace KafkaFlow.Configuration
{
using System.Collections.Generic;

/// <summary>
/// A metadata class with some context information help to calculate the number of workers
/// </summary>
public class WorkersCountContext
{
public WorkersCountContext(
string consumerName,
string consumerGroupId,
IReadOnlyCollection<TopicPartitions> assignedTopicsPartitions)
{
this.ConsumerName = consumerName;
this.ConsumerGroupId = consumerGroupId;
this.AssignedTopicsPartitions = assignedTopicsPartitions;
}

public string ConsumerName { get; }

public string ConsumerGroupId { get; }

/// <summary>
/// Gets the assigned partitions to the consumer
/// </summary>
public IReadOnlyCollection<TopicPartitions> AssignedTopicsPartitions { get; }
}
}
15 changes: 15 additions & 0 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ public interface IConsumerContext
/// </summary>
bool ShouldStoreOffset { get; set; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the consumer scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single consumer.
/// </summary>
IDependencyResolver ConsumerDependencyResolver { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the worker scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single worker.
/// </summary>
IDependencyResolver WorkerDependencyResolver { get; }


/// <summary>
/// Store the message offset when manual store option is used
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/KafkaFlow.Abstractions/IDateTimeProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ public interface IDateTimeProvider
/// <inheritdoc cref="DateTime.Now"/>
DateTime Now { get; }

/// <inheritdoc cref="DateTime.UtcNow"/>
DateTime UtcNow { get; }

/// <inheritdoc cref="DateTime.MinValue"/>
DateTime MinValue { get; }
}
Expand Down
8 changes: 8 additions & 0 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public interface IMessageContext
/// </summary>
IProducerContext ProducerContext { get; }

/// <summary>
/// Gets an instance of IDependencyResolver which provides methods to resolve dependencies.
/// This instance is tied to the message scope, meaning it is capable of resolving dependencies
/// that are scoped to the lifecycle of a single processed message.
/// </summary>
IDependencyResolver DependencyResolver { get; }


/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
Expand Down
Loading

0 comments on commit 8578112

Please sign in to comment.