Skip to content

Commit

Permalink
feat: add support for statistics handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
joelfoliveira authored and filipeesch committed Oct 8, 2020
1 parent 7300d3e commit 123a387
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,19 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>()
/// <param name="middlewares">A handler to register middlewares</param>
/// <returns></returns>
IConsumerConfigurationBuilder AddMiddlewares(Action<IConsumerMiddlewareConfigurationBuilder> middlewares);

/// <summary>
/// Adds a handler for the Kafka consumer statistics
/// </summary>
/// <param name="statisticsHandler">A handler for the statistics</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithStatisticsHandler(Action<string> statisticsHandler);

/// <summary>
/// Sets the interval the statistics are emitted
/// </summary>
/// <param name="statisticsIntervalMs">The interval in miliseconds</param>
/// <returns></returns>
IConsumerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,19 @@ public interface IProducerConfigurationBuilder
/// <param name="acks"></param>
/// <returns></returns>
IProducerConfigurationBuilder WithAcks(Acks acks);

/// <summary>
/// Adds a handler for the Kafka producer statistics
/// </summary>
/// <param name="statisticsHandler">A handler for the statistics</param>
/// <returns></returns>
IProducerConfigurationBuilder WithStatisticsHandler(Action<string> statisticsHandler);

/// <summary>
/// Sets the interval the statistics are emitted
/// </summary>
/// <param name="statisticsIntervalMs">The interval in miliseconds</param>
/// <returns></returns>
IProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs);
}
}
6 changes: 5 additions & 1 deletion src/KafkaFlow/Configuration/ConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public ConsumerConfiguration(
Factory<IDistributionStrategy> distributionStrategyFactory,
MiddlewareConfiguration middlewareConfiguration,
bool autoStoreOffsets,
TimeSpan autoCommitInterval)
TimeSpan autoCommitInterval,
IReadOnlyList<Action<string>> statisticsHandlers)
{
this.consumerConfig = consumerConfig ?? throw new ArgumentNullException(nameof(consumerConfig));

Expand All @@ -35,6 +36,7 @@ public ConsumerConfiguration(
this.Topics = topics ?? throw new ArgumentNullException(nameof(topics));
this.ConsumerName = consumerName ?? Guid.NewGuid().ToString();
this.WorkerCount = workerCount;
this.StatisticsHandlers = statisticsHandlers;

this.BufferSize = bufferSize > 0 ?
bufferSize :
Expand Down Expand Up @@ -72,6 +74,8 @@ public int WorkerCount

public TimeSpan AutoCommitInterval { get; }

public IReadOnlyList<Action<string>> StatisticsHandlers { get; }

public ConsumerConfig GetKafkaConfig()
{
return this.consumerConfig;
Expand Down
18 changes: 17 additions & 1 deletion src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace KafkaFlow.Configuration
internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder
{
private readonly List<string> topics = new List<string>();
private readonly List<Action<string>> statisticsHandlers = new List<Action<string>>();
private readonly ConsumerMiddlewareConfigurationBuilder middlewareConfigurationBuilder;

private ConsumerConfig consumerConfig;
Expand All @@ -21,6 +22,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild
private int workersCount;
private int bufferSize;
private bool autoStoreOffsets = true;
private int statisticsInterval;

private Factory<IDistributionStrategy> distributionStrategyFactory = resolver => new BytesSumDistributionStrategy();
private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5);
Expand Down Expand Up @@ -138,6 +140,18 @@ public IConsumerConfigurationBuilder AddMiddlewares(Action<IConsumerMiddlewareCo
return this;
}

public IConsumerConfigurationBuilder WithStatisticsHandler(Action<string> statisticsHandler)
{
this.statisticsHandlers.Add(statisticsHandler);
return this;
}

public IConsumerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs)
{
this.statisticsInterval = statisticsIntervalMs;
return this;
}

public ConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
var middlewareConfiguration = this.middlewareConfigurationBuilder.Build();
Expand All @@ -147,6 +161,7 @@ public ConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.consumerConfig.GroupId ??= this.groupId;
this.consumerConfig.AutoOffsetReset ??= this.autoOffsetReset;
this.consumerConfig.MaxPollIntervalMs ??= this.maxPollIntervalMs;
this.consumerConfig.StatisticsIntervalMs ??= this.statisticsInterval;

this.consumerConfig.EnableAutoOffsetStore = false;
this.consumerConfig.EnableAutoCommit = false;
Expand All @@ -162,7 +177,8 @@ public ConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.distributionStrategyFactory,
middlewareConfiguration,
this.autoStoreOffsets,
this.autoCommitInterval);
this.autoCommitInterval,
this.statisticsHandlers);
}
}
}
7 changes: 6 additions & 1 deletion src/KafkaFlow/Configuration/ProducerConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using Acks = KafkaFlow.Acks;

Expand All @@ -12,14 +13,16 @@ public ProducerConfiguration(
string defaultTopic,
Acks? acks,
MiddlewareConfiguration middlewareConfiguration,
ProducerConfig baseProducerConfig)
ProducerConfig baseProducerConfig,
IReadOnlyList<Action<string>> statisticsHandlers)
{
this.Cluster = cluster ?? throw new ArgumentNullException(nameof(cluster));
this.Name = name;
this.DefaultTopic = defaultTopic;
this.Acks = acks;
this.MiddlewareConfiguration = middlewareConfiguration;
this.BaseProducerConfig = baseProducerConfig;
this.StatisticsHandlers = statisticsHandlers;
}

public ClusterConfiguration Cluster { get; }
Expand All @@ -34,6 +37,8 @@ public ProducerConfiguration(

public MiddlewareConfiguration MiddlewareConfiguration { get; }

public IReadOnlyList<Action<string>> StatisticsHandlers { get; }

public ProducerConfig GetKafkaConfig()
{
this.BaseProducerConfig.BootstrapServers = string.Join(",", this.Cluster.Brokers);
Expand Down
19 changes: 18 additions & 1 deletion src/KafkaFlow/Configuration/ProducerConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;
using Confluent.Kafka;
using Acks = KafkaFlow.Acks;

internal class ProducerConfigurationBuilder : IProducerConfigurationBuilder
{
private readonly string name;
private readonly ProducerMiddlewareConfigurationBuilder middlewareConfigurationBuilder;
private readonly List<Action<string>> statisticsHandlers = new List<Action<string>>();

private string topic;
private ProducerConfig producerConfig;
private Acks? acks;
private int statisticsInterval;

public ProducerConfigurationBuilder(IDependencyConfigurator dependencyConfigurator, string name)
{
Expand Down Expand Up @@ -46,9 +49,22 @@ public IProducerConfigurationBuilder WithAcks(Acks acks)
return this;
}

public IProducerConfigurationBuilder WithStatisticsHandler(Action<string> statisticsHandler)
{
this.statisticsHandlers.Add(statisticsHandler);
return this;
}

public IProducerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs)
{
this.statisticsInterval = statisticsIntervalMs;
return this;
}

public ProducerConfiguration Build(ClusterConfiguration clusterConfiguration)
{
this.producerConfig ??= new ProducerConfig();
this.producerConfig.StatisticsIntervalMs = this.statisticsInterval;

this.producerConfig.ReadSecurityInformation(clusterConfiguration);

Expand All @@ -58,7 +74,8 @@ public ProducerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.topic,
this.acks,
this.middlewareConfigurationBuilder.Build(),
this.producerConfig);
this.producerConfig,
this.statisticsHandlers);

return configuration;
}
Expand Down
7 changes: 7 additions & 0 deletions src/KafkaFlow/Consumers/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public KafkaConsumer(
{
this.logHandler.Warning("Kafka Consumer Error", new { Error = error });
}
})
.SetStatisticsHandler((consumer, statistics) =>
{
foreach (var handler in configuration.StatisticsHandlers)
{
handler.Invoke(statistics);
}
});
}

Expand Down
7 changes: 7 additions & 0 deletions src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ private IProducer<byte[], byte[]> EnsureProducer()
.Warning("Kafka Producer Error", new { Error = error });
}
})
.SetStatisticsHandler((producer, statistics) =>
{
foreach (var handler in this.configuration.StatisticsHandlers)
{
handler.Invoke(statistics);
}
})
.Build();
}
}
Expand Down

0 comments on commit 123a387

Please sign in to comment.