diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index 843c6fea2..28d5d3a31 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -1,4 +1,4 @@ - + Exe diff --git a/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs b/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs new file mode 100644 index 000000000..971216cf1 --- /dev/null +++ b/src/KafkaFlow.Abstractions/Configuration/IGlobalEvents.cs @@ -0,0 +1,38 @@ +namespace KafkaFlow.Configuration +{ + /// + /// Provides access to events fired by the internals of the library + /// + public interface IGlobalEvents + { + /// + /// Gets the message consume completed event + /// + IEvent MessageConsumeCompleted { get; } + + /// + /// Gets the message consume error event + /// + IEvent MessageConsumeError { get; } + + /// + /// Gets the message consume started event + /// + IEvent MessageConsumeStarted { get; } + + /// + /// Gets the message produce completed event + /// + IEvent MessageProduceCompleted { get; } + + /// + /// Gets the message produce error event + /// + IEvent MessageProduceError { get; } + + /// + /// Gets the message produce started event + /// + IEvent MessageProduceStarted { get; } + } +} diff --git a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs index a29a6cfdb..1b26034b0 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IKafkaConfigurationBuilder.cs @@ -21,5 +21,12 @@ public interface IKafkaConfigurationBuilder /// IKafkaConfigurationBuilder UseLogHandler() where TLogHandler : ILogHandler; + + /// + /// Subscribe the global events defined in + /// + /// A handle to subscribe the events + /// + IKafkaConfigurationBuilder SubscribeGlobalEvents(Action observers); } } diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs new file mode 100644 index 000000000..f9a055b4f --- /dev/null +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -0,0 +1,32 @@ +namespace KafkaFlow +{ + using System; + using System.Threading.Tasks; + + /// + /// Represents an Event to be subscribed. + /// + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + } + + /// + /// Represents an Event to be subscribed. + /// + /// The argument expected by the event. + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + /// Event subscription reference + IEventSubscription Subscribe(Func handler); + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/IEventSubscription.cs b/src/KafkaFlow.Abstractions/IEventSubscription.cs new file mode 100644 index 000000000..e6967e775 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IEventSubscription.cs @@ -0,0 +1,13 @@ +namespace KafkaFlow +{ + /// + /// Represents an Event subscription. + /// + public interface IEventSubscription + { + /// + /// Cancels the subscription to the event. + /// + void Cancel(); + } +} diff --git a/src/KafkaFlow.Abstractions/IMessageContext.cs b/src/KafkaFlow.Abstractions/IMessageContext.cs index be95315a3..da497840a 100644 --- a/src/KafkaFlow.Abstractions/IMessageContext.cs +++ b/src/KafkaFlow.Abstractions/IMessageContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Collections.Generic; /// /// A context that contains the message and metadata @@ -27,6 +28,11 @@ public interface IMessageContext /// IProducerContext ProducerContext { get; } + /// + /// Gets the items + /// + IDictionary Items { get; } + /// /// Creates a new with the new message /// diff --git a/src/KafkaFlow.Abstractions/MessageErrorEventContext.cs b/src/KafkaFlow.Abstractions/MessageErrorEventContext.cs new file mode 100644 index 000000000..fb5f38f2a --- /dev/null +++ b/src/KafkaFlow.Abstractions/MessageErrorEventContext.cs @@ -0,0 +1,26 @@ +namespace KafkaFlow +{ + using System; + + /// + /// Represents the errors in message context used in the events + /// + public class MessageErrorEventContext : MessageEventContext + { + /// + /// Initializes a new instance of the class. + /// + /// The message context + /// The event exception + public MessageErrorEventContext(IMessageContext messageContext, Exception exception) + : base(messageContext) + { + this.Exception = exception; + } + + /// + /// Gets the exception + /// + public Exception Exception { get; } + } +} diff --git a/src/KafkaFlow.Abstractions/MessageEventContext.cs b/src/KafkaFlow.Abstractions/MessageEventContext.cs new file mode 100644 index 000000000..121bd687e --- /dev/null +++ b/src/KafkaFlow.Abstractions/MessageEventContext.cs @@ -0,0 +1,22 @@ +namespace KafkaFlow +{ + /// + /// Represents a message context used in the events + /// + public class MessageEventContext + { + /// + /// Initializes a new instance of the class. + /// + /// The message context + public MessageEventContext(IMessageContext messageContext) + { + this.MessageContext = messageContext; + } + + /// + /// Gets the message context + /// + public IMessageContext MessageContext { get; } + } +} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs index 0087fd6ca..70264f101 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs @@ -11,6 +11,7 @@ public BatchConsumeMessageContext( { this.ConsumerContext = consumer; this.Message = new Message(null, batchMessage); + this.Items = new Dictionary(); } public Message Message { get; } @@ -21,6 +22,8 @@ public BatchConsumeMessageContext( public IProducerContext ProducerContext => null; + public IDictionary Items { get; } + public IMessageContext SetMessage(object key, object value) => throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow change the message"); diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index 293d8c60b..f743478d6 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -97,8 +97,9 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection }; services.AddKafka( - kafka => kafka - .UseLogHandler() + kafka => + { + kafka.UseLogHandler() .AddCluster( cluster => cluster .WithBrokers(kafkaBrokers.Split(';')) @@ -332,7 +333,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .DefaultTopic(GzipTopicName) .AddMiddlewares( middlewares => middlewares - .AddCompressor())))); + .AddCompressor()))); + }); services.AddSingleton(); services.AddSingleton(); diff --git a/src/KafkaFlow.IntegrationTests/Core/Exceptions/ErrorExecutingMiddlewareException.cs b/src/KafkaFlow.IntegrationTests/Core/Exceptions/ErrorExecutingMiddlewareException.cs new file mode 100644 index 000000000..a582da1e5 --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/Core/Exceptions/ErrorExecutingMiddlewareException.cs @@ -0,0 +1,12 @@ +namespace KafkaFlow.IntegrationTests.Core.Exceptions +{ + using System; + + public class ErrorExecutingMiddlewareException : Exception + { + public ErrorExecutingMiddlewareException(string middlewareName) + : base($"Exception thrown executing {middlewareName}") + { + } + } +} diff --git a/src/KafkaFlow.IntegrationTests/Core/Exceptions/PartitionAssignmentException.cs b/src/KafkaFlow.IntegrationTests/Core/Exceptions/PartitionAssignmentException.cs new file mode 100644 index 000000000..6f11c60da --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/Core/Exceptions/PartitionAssignmentException.cs @@ -0,0 +1,14 @@ +namespace KafkaFlow.IntegrationTests.Core.Exceptions +{ + using System; + + public class PartitionAssignmentException : Exception + { + private const string ExceptionMessage = "Partition assignment hasn't occurred yet."; + + public PartitionAssignmentException() + : base(ExceptionMessage) + { + } + } +} diff --git a/src/KafkaFlow.IntegrationTests/Core/Producers/JsonProducer2.cs b/src/KafkaFlow.IntegrationTests/Core/Producers/JsonProducer2.cs new file mode 100644 index 000000000..034ceeb79 --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/Core/Producers/JsonProducer2.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.IntegrationTests.Core.Producers +{ + internal class JsonProducer2 + { + } +} diff --git a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs new file mode 100644 index 000000000..d9055fd00 --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -0,0 +1,320 @@ +namespace KafkaFlow.IntegrationTests +{ + using System; + using System.IO; + using System.Linq; + using System.Threading.Tasks; + using AutoFixture; + using Confluent.Kafka; + using KafkaFlow.Configuration; + using KafkaFlow.IntegrationTests.Core; + using KafkaFlow.IntegrationTests.Core.Exceptions; + using KafkaFlow.IntegrationTests.Core.Handlers; + using KafkaFlow.IntegrationTests.Core.Messages; + using KafkaFlow.IntegrationTests.Core.Middlewares; + using KafkaFlow.IntegrationTests.Core.Producers; + using KafkaFlow.Serializer; + using Microsoft.Extensions.Configuration; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Hosting; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Polly; + + [TestClass] + public class GlobalEventsTest + { + private readonly Fixture fixture = new(); + private string topic; + private bool isPartitionAssigned; + + [TestInitialize] + public void Setup() + { + this.topic = $"GlobalEventsTestTopic_{Guid.NewGuid()}"; + + MessageStorage.Clear(); + } + + [TestMethod] + public async Task SubscribeGlobalEvents_AllEvents_TriggeredCorrectly() + { + // Arrange + bool isMessageProducedStarted = false, isMessageConsumeStarted = false, isMessageConsumeCompleted = false; + + void ConfigureGlobalEvents(IGlobalEvents observers) + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + isMessageProducedStarted = true; + return Task.CompletedTask; + }); + + observers.MessageConsumeStarted.Subscribe(eventContext => + { + isMessageConsumeStarted = true; + return Task.CompletedTask; + }); + + observers.MessageConsumeCompleted.Subscribe(eventContext => + { + isMessageConsumeCompleted = true; + return Task.CompletedTask; + }); + } + + var provider = await this.GetServiceProviderAsync( + ConfigureGlobalEvents, + this.ConfigureConsumer, + this.ConfigureProducer); + MessageStorage.Clear(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + await MessageStorage.AssertMessageAsync(message); + + // Assert + Assert.IsTrue(isMessageProducedStarted); + Assert.IsTrue(isMessageConsumeStarted); + Assert.IsTrue(isMessageConsumeCompleted); + } + + [TestMethod] + public async Task SubscribeGlobalEvents_MessageContext_IsAssignedCorrectly() + { + // Arrange + IMessageContext messageContext = null; + + void ConfigureGlobalEvents(IGlobalEvents observers) + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + messageContext = eventContext.MessageContext; + return Task.CompletedTask; + }); + } + + var provider = await this.GetServiceProviderAsync( + ConfigureGlobalEvents, + this.ConfigureConsumer, + this.ConfigureProducer); + + MessageStorage.Clear(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + producer.Produce(message.Id.ToString(), message); + + // Assert + Assert.IsNotNull(messageContext); + Assert.AreEqual(messageContext.Message.Key, message.Id.ToString()); + } + + [TestMethod] + public async Task SubscribeGlobalEvents_ConsumerErrorEvent_TriggeredCorrectly() + { + // Arrange + bool isMessageProducedStarted = false, isMessageConsumeStarted = false, isMessageConsumerError = false; + + void ConfigureGlobalEvents(IGlobalEvents observers) + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + isMessageProducedStarted = true; + return Task.CompletedTask; + }); + + observers.MessageConsumeStarted.Subscribe(eventContext => + { + isMessageConsumeStarted = true; + return Task.CompletedTask; + }); + + observers.MessageConsumeError.Subscribe(eventContext => + { + isMessageConsumerError = true; + return Task.CompletedTask; + }); + } + + var provider = await this.GetServiceProviderAsync( + ConfigureGlobalEvents, + this.ConfigureConsumer, + this.ConfigureProducer); + + MessageStorage.Clear(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + await MessageStorage.AssertMessageAsync(message); + + // Assert + Assert.IsTrue(isMessageProducedStarted); + Assert.IsTrue(isMessageConsumeStarted); + Assert.IsTrue(isMessageConsumerError); + } + + [TestMethod] + public async Task SubscribeGlobalEvents_ProducerErrorEvent_TriggeredCorrectly() + { + // Arrange + bool isMessageProducedStarted = false, isProduceErrorFired = false; + + void ConfigureGlobalEvents(IGlobalEvents observers) + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + isMessageProducedStarted = true; + return Task.CompletedTask; + }); + + observers.MessageProduceError.Subscribe(eventContext => + { + isProduceErrorFired = true; + return Task.CompletedTask; + }); + } + + var provider = await this.GetServiceProviderAsync( + ConfigureGlobalEvents, + this.ConfigureConsumer, + this.ConfigureProducer); + + MessageStorage.Clear(); + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + var errorOccured = false; + + // Act + try + { + await producer.ProduceAsync(null, message); + } + catch (ProduceException ex) + { + // Assert + errorOccured = true; + Assert.IsTrue(ex.GetType() == typeof(ProduceException)); + Assert.IsTrue(isMessageProducedStarted); + Assert.IsTrue(isProduceErrorFired); + } + + Assert.IsTrue(errorOccured); + } + + private void ConfigureConsumer(IConsumerConfigurationBuilder consumerConfigurationBuilder) + where T : class, IMessageMiddleware + { + consumerConfigurationBuilder + .Topic(this.topic) + .WithGroupId(this.topic) + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest) + .AddMiddlewares( + middlewares => middlewares + .AddSerializer() + .Add()) + .WithPartitionsAssignedHandler((_, _) => + { + this.isPartitionAssigned = true; + }); + } + + private void ConfigureProducer(IProducerConfigurationBuilder producerConfigurationBuilder) + where T : class, ISerializer + { + producerConfigurationBuilder + .DefaultTopic(this.topic) + .AddMiddlewares(middlewares => middlewares.AddSerializer()); + } + + private async Task GetServiceProviderAsync( + Action configureGlobalEvents, + Action consumerConfiguration, + Action producerConfiguration) + { + this.isPartitionAssigned = false; + + var builder = Host + .CreateDefaultBuilder() + .ConfigureAppConfiguration( + (_, config) => + { + config + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile( + "conf/appsettings.json", + false, + true) + .AddEnvironmentVariables(); + }) + .ConfigureServices((context, services) => + services.AddKafka( + kafka => kafka + .UseLogHandler() + .AddCluster( + cluster => cluster + .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) + .CreateTopicIfNotExists(this.topic, 1, 1) + .AddProducer(producerConfiguration) + .AddConsumer(consumerConfiguration)) + .SubscribeGlobalEvents(configureGlobalEvents))) + .UseDefaultServiceProvider( + (_, options) => + { + options.ValidateScopes = true; + options.ValidateOnBuild = true; + }); + + var host = builder.Build(); + var bus = host.Services.CreateKafkaBus(); + bus.StartAsync().GetAwaiter().GetResult(); + + await this.WaitForPartitionAssignmentAsync(); + + return host.Services; + } + + private async Task WaitForPartitionAssignmentAsync() + { + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); + } + + private class TriggerErrorMessageMiddleware : IMessageMiddleware + { + public async Task Invoke(IMessageContext context, MiddlewareDelegate _) + { + MessageStorage.Add((byte[])context.Message.Value); + throw new ErrorExecutingMiddlewareException(nameof(TriggerErrorMessageMiddleware)); + } + } + + private class TriggerErrorSerializerMiddleware : ISerializer + { + public Task SerializeAsync(object _, Stream output, ISerializerContext context) + { + var error = new Error(ErrorCode.BrokerNotAvailable); + throw new ProduceException(error, null); + } + + public Task DeserializeAsync(Stream _, Type type, ISerializerContext context) + { + var error = new Error(ErrorCode.BrokerNotAvailable); + throw new ProduceException(error,null); + } + } + } +} diff --git a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj index 6435d8d04..94944dc09 100644 --- a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj +++ b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj @@ -25,6 +25,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + @@ -37,6 +39,7 @@ + diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs new file mode 100644 index 000000000..855083632 --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -0,0 +1,202 @@ +namespace KafkaFlow.IntegrationTests +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Threading.Tasks; + using AutoFixture; + using global::OpenTelemetry; + using global::OpenTelemetry.Trace; + using KafkaFlow.Compressor; + using KafkaFlow.Compressor.Gzip; + using KafkaFlow.Configuration; + using KafkaFlow.IntegrationTests.Core; + using KafkaFlow.IntegrationTests.Core.Handlers; + using KafkaFlow.IntegrationTests.Core.Middlewares; + using KafkaFlow.IntegrationTests.Core.Producers; + using Microsoft.Extensions.Configuration; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Hosting; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Polly; + + [TestClass] + public class OpenTelemetryTests + { + private readonly Fixture fixture = new(); + + private List exportedItems; + + private bool isPartitionAssigned; + + [TestInitialize] + public void Setup() + { + this.exportedItems = new List(); + } + + [TestMethod] + public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpansAreCreatedCorrectly() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.IsNull(producerSpan.ParentId); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + } + + [TestMethod] + public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer() + { + // Arrange + var provider = await this.GetServiceProvider(); + MessageStorage.Clear(); + + var kafkaFlowTestString = "KafkaFlowTest"; + var baggageName1 = "TestBaggage1"; + var baggageValue1 = "TestBaggageValue1"; + var baggageName2 = "TestBaggage2"; + var baggageValue2 = "TestBaggageValue2"; + + using var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource("KafkaFlow.OpenTelemetry") + .AddSource(kafkaFlowTestString) + .AddInMemoryExporter(this.exportedItems) + .Build(); + + var producer = provider.GetRequiredService>(); + var message = this.fixture.Create(); + + // Act + ActivitySource activitySource = new(kafkaFlowTestString); + + var activity = activitySource.StartActivity("TestActivity", ActivityKind.Client); + + activity.AddBaggage(baggageName1, baggageValue1); + activity.AddBaggage(baggageName2, baggageValue2); + + await producer.ProduceAsync(null, message); + + // Assert + var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); + + Assert.IsNotNull(this.exportedItems); + Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); + Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); + Assert.AreEqual(producerSpan.GetBaggageItem(baggageName1), baggageValue1); + Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName1), baggageValue1); + Assert.AreEqual(producerSpan.GetBaggageItem(baggageName2), baggageValue2); + Assert.AreEqual(consumerSpan.GetBaggageItem(baggageName2), baggageValue2); + } + + private async Task GetServiceProvider() + { + var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}"; + + this.isPartitionAssigned = false; + + var builder = Host + .CreateDefaultBuilder() + .ConfigureAppConfiguration( + (_, config) => + { + config + .SetBasePath(Directory.GetCurrentDirectory()) + .AddJsonFile( + "conf/appsettings.json", + false, + true) + .AddEnvironmentVariables(); + }) + .ConfigureServices((context, services) => + services.AddKafka( + kafka => kafka + .UseLogHandler() + .AddCluster( + cluster => cluster + .WithBrokers(context.Configuration.GetValue("Kafka:Brokers").Split(';')) + .CreateTopicIfNotExists(topicName, 1, 1) + .AddProducer( + producer => producer + .DefaultTopic(topicName) + .AddMiddlewares( + middlewares => middlewares + .AddCompressor())) + .AddConsumer( + consumer => consumer + .Topic(topicName) + .WithGroupId(topicName) + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddCompressor() + .Add()) + .WithPartitionsAssignedHandler((_, _) => + { + this.isPartitionAssigned = true; + }))) + .AddOpenTelemetryInstrumentation())) + .UseDefaultServiceProvider( + (_, options) => + { + options.ValidateScopes = true; + options.ValidateOnBuild = true; + }); + + var host = builder.Build(); + var bus = host.Services.CreateKafkaBus(); + bus.StartAsync().GetAwaiter().GetResult(); + + await this.WaitForPartitionAssignmentAsync(); + + return host.Services; + } + + private async Task WaitForPartitionAssignmentAsync() + { + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); + } + + private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + { + Activity producerSpan = null, consumerSpan = null; + + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => + { + producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); + consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + + return Task.FromResult(producerSpan != null && consumerSpan != null); + }); + + return (producerSpan, consumerSpan); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs new file mode 100644 index 000000000..9a965dee5 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs @@ -0,0 +1,41 @@ +extern alias SemanticConventions; + +namespace KafkaFlow.OpenTelemetry +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Reflection; + using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; + + internal static class ActivitySourceAccessor + { + internal const string ActivityString = "otel_activity"; + internal const string ExceptionEventKey = "exception"; + internal const string MessagingSystemId = "kafka"; + internal const string AttributeMessagingOperation = "messaging.operation"; + internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; + internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; + internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName(); + internal static readonly string ActivitySourceName = AssemblyName.Name; + internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); + internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version); + + public static void SetGenericTags(Activity activity) + { + activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId); + } + + public static ActivityEvent CreateExceptionEvent(Exception exception) + { + var activityTagCollection = new ActivityTagsCollection( + new[] + { + new KeyValuePair(Conventions.AttributeExceptionMessage, exception.Message), + new KeyValuePair(Conventions.AttributeExceptionStacktrace, exception.StackTrace), + }); + + return new ActivityEvent(ExceptionEventKey, DateTimeOffset.UtcNow, activityTagCollection); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs new file mode 100644 index 000000000..9b169c949 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/ExtensionMethods.cs @@ -0,0 +1,35 @@ +namespace KafkaFlow.Configuration +{ + using KafkaFlow.OpenTelemetry; + + /// + /// Adds OpenTelemetry instrumentation + /// + public static class ExtensionMethods + { + /// + /// Adds OpenTelemetry instrumentation + /// + /// The Kafka configuration builder + /// + public static IKafkaConfigurationBuilder AddOpenTelemetryInstrumentation(this IKafkaConfigurationBuilder builder) + { + builder.SubscribeGlobalEvents(hub => + { + hub.MessageConsumeStarted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeStarted(eventContext.MessageContext)); + + hub.MessageConsumeError.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeError(eventContext.MessageContext, eventContext.Exception)); + + hub.MessageConsumeCompleted.Subscribe(eventContext => OpenTelemetryConsumerEventsHandler.OnConsumeCompleted(eventContext.MessageContext)); + + hub.MessageProduceStarted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerStarted(eventContext.MessageContext)); + + hub.MessageProduceError.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerError(eventContext.MessageContext, eventContext.Exception)); + + hub.MessageProduceCompleted.Subscribe(eventContext => OpenTelemetryProducerEventsHandler.OnProducerCompleted(eventContext.MessageContext)); + }); + + return builder; + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj new file mode 100644 index 000000000..506126fe9 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlow.OpenTelemetry.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + + + + + + SemanticConventions + + + + + + + + diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs new file mode 100644 index 000000000..10fa794e9 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -0,0 +1,97 @@ +namespace KafkaFlow.OpenTelemetry +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Text; + using System.Threading.Tasks; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + + internal static class OpenTelemetryConsumerEventsHandler + { + private const string ProcessString = "process"; + private const string AttributeMessagingSourceName = "messaging.source.name"; + private const string AttributeMessagingKafkaConsumerGroup = "messaging.kafka.consumer.group"; + private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition"; + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + + public static Task OnConsumeStarted(IMessageContext context) + { + try + { + var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString; + + // Extract the PropagationContext of the upstream parent from the message headers. + var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties); + Baggage.Current = parentContext.Baggage; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext); + + foreach (var item in Baggage.Current) + { + activity?.AddBaggage(item.Key, item.Value); + } + + context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); + + ActivitySourceAccessor.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + SetConsumerTags(context, activity); + } + } + catch + { + // If there is any failure, do not propagate the context. + } + + return Task.CompletedTask; + } + + public static Task OnConsumeCompleted(IMessageContext context) + { + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + { + activity?.Dispose(); + } + + return Task.CompletedTask; + } + + public static Task OnConsumeError(IMessageContext context, Exception ex) + { + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + { + var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + + activity?.AddEvent(exceptionEvent); + + activity?.Dispose(); + } + + return Task.CompletedTask; + } + + private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key) + { + return new[] { context.Headers.GetString(key, Encoding.UTF8) }; + } + + private static void SetConsumerTags(IMessageContext context, Activity activity) + { + var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]); + + activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString); + activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic); + activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId); + activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey); + activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset); + activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition); + } + } +} diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs new file mode 100644 index 000000000..3630157c1 --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -0,0 +1,107 @@ +namespace KafkaFlow.OpenTelemetry +{ + using System; + using System.Diagnostics; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using global::OpenTelemetry; + using global::OpenTelemetry.Context.Propagation; + + internal static class OpenTelemetryProducerEventsHandler + { + private const string PublishString = "publish"; + private const string AttributeMessagingDestinationName = "messaging.destination.name"; + private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition"; + private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; + + public static Task OnProducerStarted(IMessageContext context) + { + try + { + var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString; + + // Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification. + // The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity. + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md + var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer); + + // Depending on Sampling (and whether a listener is registered or not), the + // activity above may not be created. + // If it is created, then propagate its context. + // If it is not created, the propagate the Current context, if any. + ActivityContext contextToInject = default; + + if (activity != null) + { + context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); + + contextToInject = activity.Context; + } + else if (Activity.Current != null) + { + contextToInject = Activity.Current.Context; + } + + Baggage.Current = Baggage.Create(activity?.Baggage.ToDictionary(item => item.Key, item => item.Value)); + + // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. + Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties); + + ActivitySourceAccessor.SetGenericTags(activity); + + if (activity != null && activity.IsAllDataRequested) + { + SetProducerTags(context, activity); + } + } + catch + { + // If there is any failure, do not propagate the context. + } + + return Task.CompletedTask; + } + + public static Task OnProducerCompleted(IMessageContext context) + { + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + { + activity?.Dispose(); + } + + return Task.CompletedTask; + } + + public static Task OnProducerError(IMessageContext context, Exception ex) + { + if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) + { + var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); + + activity?.AddEvent(exceptionEvent); + + activity?.Dispose(); + } + + return Task.CompletedTask; + } + + private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value) + { + if (!context.Headers.Any(x => x.Key == key)) + { + context.Headers.SetString(key, value, Encoding.ASCII); + } + } + + private static void SetProducerTags(IMessageContext context, Activity activity) + { + activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString); + activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic); + activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition); + activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key); + activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset); + } + } +} diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln index a3012d169..0b4f56ea7 100644 --- a/src/KafkaFlow.sln +++ b/src/KafkaFlow.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29709.97 +# Visual Studio Version 17 +VisualStudioVersion = 17.7.34031.279 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow", "KafkaFlow\KafkaFlow.csproj", "{E1055352-9F5B-4980-80A3-50C335B79A16}" EndProject @@ -73,11 +73,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer.Schema EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Admin.Dashboard", "KafkaFlow.Admin.Dashboard\KafkaFlow.Admin.Dashboard.csproj", "{4072F646-9393-4BF3-A479-0550AC1BB6C4}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.Dashboard", "..\samples\KafkaFlow.Sample.Dashboard\KafkaFlow.Sample.Dashboard.csproj", "{F32DC7DA-36EA-4199-91F5-81960FD9C650}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.SchemaRegistry", "..\samples\KafkaFlow.Sample.SchemaRegistry\KafkaFlow.Sample.SchemaRegistry.csproj", "{2BD49C06-7A88-4B98-91B0-659282D2A45E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.FlowControl", "..\samples\KafkaFlow.Sample.FlowControl\KafkaFlow.Sample.FlowControl.csproj", "{7B61C99E-3AEB-4497-8A38-F780CB309130}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A390C-A63A-4371-86BB-28481AD6D4C0}" ProjectSection(SolutionItems) = preProject @@ -85,11 +85,15 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Deploy", "Deploy", "{4A6A39 ..\.github\workflows\publish.yml = ..\.github\workflows\publish.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.LogHandler.Microsoft", "KafkaFlow.LogHandler.Microsoft\KafkaFlow.LogHandler.Microsoft.csproj", "{8EAF0D96-F760-4FEF-9237-92779F66482D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.PauseConsumerOnError", "..\samples\KafkaFlow.Sample.PauseConsumerOnError\KafkaFlow.Sample.PauseConsumerOnError.csproj", "{B4A9E7CE-7A37-411E-967E-D9B5FD1A3992}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.ConsumerThrottling", "..\samples\KafkaFlow.Sample.ConsumerThrottling\KafkaFlow.Sample.ConsumerThrottling.csproj", "{4A16F519-FAF8-432C-AD0A-CC44F7BD392D}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Telemetry", "Telemetry", "{96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{1557B135-4925-4FA2-80DA-8AD13155F3BD}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -225,6 +229,10 @@ Global {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Debug|Any CPU.Build.0 = Debug|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.ActiveCfg = Release|Any CPU {4A16F519-FAF8-432C-AD0A-CC44F7BD392D}.Release|Any CPU.Build.0 = Release|Any CPU + {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -264,6 +272,7 @@ Global {8EAF0D96-F760-4FEF-9237-92779F66482D} = {EF626895-FDAE-4B28-9110-BA85671CBBF2} {B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} + {1557B135-4925-4FA2-80DA-8AD13155F3BD} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs index 56e72d129..b1758da1e 100644 --- a/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs @@ -11,6 +11,7 @@ internal class KafkaConfigurationBuilder : IKafkaConfigurationBuilder { private readonly IDependencyConfigurator dependencyConfigurator; private readonly List clusters = new(); + private readonly IList> globalEventsConfigurators = new List>(); private Type logHandlerType = typeof(NullLogHandler); public KafkaConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) @@ -44,7 +45,20 @@ public KafkaConfiguration Build() .AddSingleton() .AddSingleton(new ConsumerAccessor()) .AddSingleton(new ConsumerManagerFactory()) - .AddSingleton(); + .AddSingleton() + .AddSingleton(r => + { + var logHandler = r.Resolve(); + + var globalEvents = new GlobalEvents(logHandler); + + foreach (var del in this.globalEventsConfigurators) + { + del.Invoke(globalEvents); + } + + return globalEvents; + }); return configuration; } @@ -66,5 +80,12 @@ public IKafkaConfigurationBuilder UseLogHandler() this.logHandlerType = typeof(TLogHandler); return this; } + + public IKafkaConfigurationBuilder SubscribeGlobalEvents(Action observers) + { + this.globalEventsConfigurators.Add(observers); + + return this; + } } } diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index bb789af1e..cce7308ea 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -13,6 +13,7 @@ internal class ConsumerWorker : IConsumerWorker private readonly IOffsetManager offsetManager; private readonly IMiddlewareExecutor middlewareExecutor; private readonly ILogHandler logHandler; + private readonly GlobalEvents globalEvents; private readonly Channel> messagesBuffer; @@ -35,6 +36,7 @@ public ConsumerWorker( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.messagesBuffer = Channel.CreateBounded>(consumer.Configuration.BufferSize); + this.globalEvents = this.dependencyResolver.Resolve(); } public int Id { get; } @@ -125,9 +127,13 @@ private async Task ProcessMessageAsync(ConsumeResult message, Ca message.TopicPartitionOffset, () => scope.Dispose()); + await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)); + await this.middlewareExecutor .Execute(scope.Resolver, context, _ => Task.CompletedTask) .ConfigureAwait(false); + + await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context)); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { @@ -135,6 +141,8 @@ await this.middlewareExecutor } catch (Exception ex) { + await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, ex)); + this.logHandler.Error( "Error processing message", ex, diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs new file mode 100644 index 000000000..44d73ad48 --- /dev/null +++ b/src/KafkaFlow/Event.cs @@ -0,0 +1,62 @@ +namespace KafkaFlow +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + internal class Event : IEvent + { + private readonly ILogHandler logHandler; + + private readonly List> handlers = new(); + + public Event(ILogHandler logHandler) + { + this.logHandler = logHandler; + } + + public IEventSubscription Subscribe(Func handler) + { + if (!this.handlers.Contains(handler)) + { + this.handlers.Add(handler); + } + + return new EventSubscription(() => this.handlers.Remove(handler)); + } + + internal async Task FireAsync(TArg arg) + { + foreach (var handler in this.handlers) + { + try + { + if (handler is null) + { + continue; + } + + await handler.Invoke(arg); + } + catch (Exception e) + { + this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name }); + } + } + } + } + + internal class Event : IEvent + { + private readonly Event evt; + + public Event(ILogHandler logHandler) + { + this.evt = new Event(logHandler); + } + + public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke()); + + internal Task FireAsync() => this.evt.FireAsync(null); + } +} \ No newline at end of file diff --git a/src/KafkaFlow/EventSubscription.cs b/src/KafkaFlow/EventSubscription.cs new file mode 100644 index 000000000..36d725083 --- /dev/null +++ b/src/KafkaFlow/EventSubscription.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow +{ + using System; + + internal class EventSubscription : IEventSubscription + { + private readonly Action cancelDelegate; + + public EventSubscription(Action cancelDelegate) + { + this.cancelDelegate = cancelDelegate; + } + + public void Cancel() + { + this.cancelDelegate.Invoke(); + } + } +} diff --git a/src/KafkaFlow/GlobalEvents.cs b/src/KafkaFlow/GlobalEvents.cs new file mode 100644 index 000000000..c4e4212a3 --- /dev/null +++ b/src/KafkaFlow/GlobalEvents.cs @@ -0,0 +1,56 @@ +namespace KafkaFlow +{ + using System.Threading.Tasks; + using KafkaFlow.Configuration; + + internal class GlobalEvents : IGlobalEvents + { + private readonly Event messageConsumeCompleted; + private readonly Event messageConsumeError; + private readonly Event messageConsumeStarted; + private readonly Event messageProduceCompleted; + private readonly Event messageProduceError; + private readonly Event messageProduceStarted; + + public GlobalEvents(ILogHandler log) + { + this.messageConsumeCompleted = new(log); + this.messageConsumeError = new(log); + this.messageConsumeStarted = new(log); + this.messageProduceCompleted = new(log); + this.messageProduceError = new(log); + this.messageProduceStarted = new(log); + } + + public IEvent MessageConsumeCompleted => this.messageConsumeCompleted; + + public IEvent MessageConsumeError => this.messageConsumeError; + + public IEvent MessageConsumeStarted => this.messageConsumeStarted; + + public IEvent MessageProduceCompleted => this.messageProduceCompleted; + + public IEvent MessageProduceError => this.messageProduceError; + + public IEvent MessageProduceStarted => this.messageProduceStarted; + + public Task FireMessageConsumeStartedAsync(MessageEventContext context) + => this.messageConsumeStarted.FireAsync(context); + + public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context) + => this.messageConsumeError.FireAsync(context); + + public Task FireMessageConsumeCompletedAsync(MessageEventContext context) + => this.messageConsumeCompleted.FireAsync(context); + + public Task FireMessageProduceStartedAsync(MessageEventContext context) + => this.messageProduceStarted.FireAsync(context); + + public Task FireMessageProduceErrorAsync(MessageErrorEventContext context) + => this.messageProduceError.FireAsync(context); + + public Task FireMessageProduceCompletedAsync(MessageEventContext context) + => this.messageProduceCompleted.FireAsync(context); + + } +} diff --git a/src/KafkaFlow/MessageContext.cs b/src/KafkaFlow/MessageContext.cs index c37d72230..6a524efe1 100644 --- a/src/KafkaFlow/MessageContext.cs +++ b/src/KafkaFlow/MessageContext.cs @@ -1,5 +1,7 @@ namespace KafkaFlow { + using System.Collections.Generic; + internal class MessageContext : IMessageContext { public MessageContext( @@ -12,6 +14,7 @@ public MessageContext( this.Headers = headers ?? new MessageHeaders(); this.ConsumerContext = consumer; this.ProducerContext = producer; + this.Items = new Dictionary(); } public Message Message { get; } @@ -22,6 +25,8 @@ public MessageContext( public IMessageHeaders Headers { get; } + public IDictionary Items { get; } + public IMessageContext SetMessage(object key, object value) => new MessageContext( new Message(key, value), this.Headers, diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index b5d0c8059..85fd3096e 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -11,6 +11,7 @@ internal class MessageProducer : IMessageProducer, IDisposable private readonly IDependencyResolver dependencyResolver; private readonly IProducerConfiguration configuration; private readonly MiddlewareExecutor middlewareExecutor; + private readonly GlobalEvents globalEvents; private readonly object producerCreationSync = new(); @@ -23,6 +24,7 @@ public MessageProducer( this.dependencyResolver = dependencyResolver; this.configuration = configuration; this.middlewareExecutor = new MiddlewareExecutor(configuration.MiddlewaresConfigurations); + this.globalEvents = this.dependencyResolver.Resolve(); } public string ProducerName => this.configuration.Name; @@ -38,14 +40,16 @@ public async Task> ProduceAsync( using var scope = this.dependencyResolver.CreateScope(); - await this.middlewareExecutor + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + + await this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); + + try + { + await this.middlewareExecutor .Execute( scope.Resolver, - new MessageContext( - new Message(messageKey, messageValue), - headers, - null, - new ProducerContext(topic)), + messageContext, async context => { report = await this @@ -54,6 +58,14 @@ await this.middlewareExecutor }) .ConfigureAwait(false); + await this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext)); + } + catch(Exception e) + { + await this.globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(messageContext, e)); + throw; + } + return report; } @@ -87,14 +99,14 @@ public void Produce( { var scope = this.dependencyResolver.CreateScope(); + var messageContext = this.CreateMessageContext(topic, messageKey, messageValue, headers); + + this.globalEvents.FireMessageProduceStartedAsync(new MessageEventContext(messageContext)); + this.middlewareExecutor .Execute( scope.Resolver, - new MessageContext( - new Message(messageKey, messageValue), - headers, - null, - new ProducerContext(topic)), + messageContext, context => { var completionSource = new TaskCompletionSource(); @@ -134,6 +146,8 @@ public void Produce( scope.Dispose(); }); + + this.globalEvents.FireMessageProduceCompletedAsync(new MessageEventContext(messageContext)); } public void Produce( @@ -277,6 +291,8 @@ private async Task> InternalProduceAsync(IMessage } catch (ProduceException e) { + await this.globalEvents.FireMessageProduceErrorAsync(new MessageErrorEventContext(context, e)); + if (e.Error.IsFatal) { this.InvalidateProducer(e.Error, result); @@ -325,5 +341,18 @@ void Handler(DeliveryReport report) deliveryHandler(report); } } + + private MessageContext CreateMessageContext( + string topic, + object messageKey, + object messageValue, + IMessageHeaders headers = null) + { + return new( + new Message(messageKey, messageValue), + headers, + null, + new ProducerContext(topic)); + } } } diff --git a/website/docs/guides/configuration.md b/website/docs/guides/configuration.md index b9a4fb6a3..c40118981 100644 --- a/website/docs/guides/configuration.md +++ b/website/docs/guides/configuration.md @@ -8,7 +8,7 @@ In this section, we will introduce how configuration is done in KafkaFlow. KafkaFlow is a highly configured framework. You can customize it through a Fluent Builder. -Using the builder, you can configure [Logging](../guides/logging.md), Clusters, Producers, Consumers and others. +Using the builder, you can configure [Logging](../guides/logging.md), [Global Events](../guides/global-events.md), Clusters, Producers, Consumers and others. There are a few options to configure KafkaFlow: - [Using a Hosted Service](#hosted-service) diff --git a/website/docs/guides/global-events.md b/website/docs/guides/global-events.md new file mode 100644 index 000000000..87803b12d --- /dev/null +++ b/website/docs/guides/global-events.md @@ -0,0 +1,115 @@ +--- +sidebar_position: 9 +--- + +# Global Events + +In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes. + +KafkaFlow offers a range of Global Events that can be subscribed to. These events can be used to monitor and react to different stages of message handling. Below is a list of available events: + - [Message Produce Started Event](#message-produce-started-event) + - [Message Produce Completed Event](#message-produce-completed-event) + - [Message Produce Error Event](#message-produce-error-event) + - [Message Consume Started Event](#message-consume-started-event) + - [Message Consume Completed Event](#message-consume-completed-event) + - [Message Consume Error Event](#message-consume-error-event) + +## Message Produce Started Event {#message-produce-started-event} + +The Message Produce Started Event is triggered when the message production process begins. It provides an opportunity to perform tasks or gather information before middlewares execution. + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageProduceStarted.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` + +## Message Produce Completed Event {#message-produce-completed-event} + +The Message Produce Completed Event is triggered when a message is successfully produced or when error messages occur during the production process. Subscribing to this event enables you to track the successful completion of message production. + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageProduceCompleted.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` + +## Message Produce Error Event {#message-produce-error-event} + +In case an error occurs during message production, the Message Produce Error Event is triggered. By subscribing to this event, you will be able to catch any exceptions that may occur while producing a message. + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageProduceError.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` + +## Message Consume Started Event {#message-consume-started-event} + +The Message Consume Started Event is raised at the beginning of the message consumption process. It offers an opportunity to execute specific tasks or set up resources before message processing begins. + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageConsumeStarted.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` + +## Message Consume Completed Event {#message-consume-completed-event} + +The Message Consume Completed Event signals the successful completion of message consumption. By subscribing to this event, you can track when messages have been successfully processed. + +:::info +Please note that the current event is not compatible with Batch Consume in the current version (v2). However, this limitation is expected to be addressed in future releases (v3+). +:::info + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageProduceCompleted.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` + +## Message Consume Error Event {#message-consume-error-event} + +If an error occurs during message consumption, the Message Consume Error Event is triggered. Subscribing to this event allows you to manage and respond to consumption errors. + +```csharp +services.AddKafka( + kafka => kafka + .SubscribeGlobalEvents(observers => + { + observers.MessageConsumeError.Subscribe(eventContext => + { + // Add your logic here + }); + }) +``` \ No newline at end of file diff --git a/website/docs/guides/open-telemetry.md b/website/docs/guides/open-telemetry.md new file mode 100644 index 000000000..bf2f6af15 --- /dev/null +++ b/website/docs/guides/open-telemetry.md @@ -0,0 +1,23 @@ +--- +sidebar_position: 10 +--- + +# OpenTelemetry instrumentation + +KafkaFlow includes support for [Traces](https://opentelemetry.io/docs/concepts/signals/traces/) and [Baggage](https://opentelemetry.io/docs/concepts/signals/baggage/) signals using [OpenTelemetry instrumentation](https://opentelemetry.io/docs/instrumentation/net/). + +## Including OpenTelemetry instrumentation in your code + +Add the package [KafkaFlow.OpenTelemetry](https://www.nuget.org/packages/KafkaFlow.OpenTelemetry/) to the project and add the extension method `AddOpenTelemetryInstrumentation` in your Startup: + +```csharp +services.AddKafka( + kafka => kafka + .AddCluster(...) + .AddOpenTelemetryInstrumentation() +); +``` + +## Using KafkaFlow instrumentation with .NET Automatic Instrumentation + +When using [.NET automatic instrumentation](https://github.com/open-telemetry/opentelemetry-dotnet-instrumentation), the KafkaFlow activity can be captured by including the ActivitySource name `KafkaFlow.OpenTelemetry` as a parameter to the variable `OTEL_DOTNET_AUTO_TRACES_ADDITIONAL_SOURCES`. diff --git a/website/docs/introduction.md b/website/docs/introduction.md index 134cb7249..e7b17b25e 100644 --- a/website/docs/introduction.md +++ b/website/docs/introduction.md @@ -27,6 +27,8 @@ To do that, KafkaFlow gives you access to features like: - [Serializer middleware](guides/middlewares/serializer-middleware.md) with **ApacheAvro**, **ProtoBuf** and **Json** algorithms. - [Schema Registry](guides/middlewares/serializer-middleware.md#adding-schema-registry-support) support. - [Compression](guides/compression.md) using native Confluent Kafka client compression or compressor middleware. +- [Global Events Subcription](guides/global-events.md) for message production and consumption. +- [Open Telemetry Instrumentation](guides/open-telemetry.md) for traces and baggage signals. - Graceful shutdown (wait to finish processing to shutdown). - Store offset when processing ends, avoiding message loss. - Supports .NET Core and .NET Framework.