diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs
index f9a055b4f..0a6617bc3 100644
--- a/src/KafkaFlow.Abstractions/IEvent.cs
+++ b/src/KafkaFlow.Abstractions/IEvent.cs
@@ -8,12 +8,19 @@
///
public interface IEvent
{
+ ///
+ /// Subscribes to the async event.
+ ///
+ /// The handler to be called when the async event is fired.
+ /// Event subscription reference
+ IEventSubscription Subscribe(Func handler);
+
///
/// Subscribes to the event.
///
/// The handler to be called when the event is fired.
/// Event subscription reference
- IEventSubscription Subscribe(Func handler);
+ IEventSubscription Subscribe(Action handler);
}
///
@@ -22,11 +29,18 @@ public interface IEvent
/// The argument expected by the event.
public interface IEvent
{
+ ///
+ /// Subscribes to the async event.
+ ///
+ /// The handler to be called when the async event is fired.
+ /// Event subscription reference
+ IEventSubscription Subscribe(Func handler);
+
///
/// Subscribes to the event.
///
/// The handler to be called when the event is fired.
/// Event subscription reference
- IEventSubscription Subscribe(Func handler);
+ IEventSubscription Subscribe(Action handler);
}
}
\ No newline at end of file
diff --git a/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
new file mode 100644
index 000000000..a3f79fedf
--- /dev/null
+++ b/src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
@@ -0,0 +1,12 @@
+{
+ "profiles": {
+ "KafkaFlow.Admin.Dashboard": {
+ "commandName": "Project",
+ "launchBrowser": true,
+ "environmentVariables": {
+ "ASPNETCORE_ENVIRONMENT": "Development"
+ },
+ "applicationUrl": "https://localhost:63908;http://localhost:63909"
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
index 4a73f4f4d..69bc10b42 100644
--- a/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
+++ b/src/KafkaFlow.IntegrationTests/Core/Middlewares/GzipMiddleware.cs
@@ -1,13 +1,18 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
+ using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;
+ using KafkaFlow.OpenTelemetry;
internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
- MessageStorage.Add((byte[]) context.Message.Value);
+ var source = new ActivitySource(KafkaFlowInstrumentation.ActivitySourceName);
+ using var activity = source.StartActivity("integration-test", ActivityKind.Internal);
+
+ MessageStorage.Add((byte[])context.Message.Value);
await next(context);
}
}
diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
index 05a61f306..53c3f2ec3 100644
--- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
+++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
@@ -57,7 +57,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);
// Assert
- var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
@@ -65,6 +65,35 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}
+ [TestMethod]
+ public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
+ {
+ // Arrange
+ var provider = await this.GetServiceProvider();
+ MessageStorage.Clear();
+
+ using var tracerProvider = Sdk.CreateTracerProviderBuilder()
+ .AddSource("KafkaFlow.OpenTelemetry")
+ .AddInMemoryExporter(this.exportedItems)
+ .Build();
+
+ var producer = provider.GetRequiredService>();
+ var message = this.fixture.Create();
+
+ // Act
+ await producer.ProduceAsync(null, message);
+
+ // Assert
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
+
+ Assert.IsNotNull(this.exportedItems);
+ Assert.IsNull(producerSpan.ParentId);
+ Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
+ Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
+ Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
+ Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
+ }
+
[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
@@ -98,7 +127,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);
// Assert
- var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
+ var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();
Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
@@ -182,9 +211,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}
- private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
+ private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
- Activity producerSpan = null, consumerSpan = null;
+ Activity producerSpan = null, consumerSpan = null, internalSpan = null;
await Policy
.HandleResult(isAvailable => !isAvailable)
@@ -193,11 +222,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
+ internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);
return Task.FromResult(producerSpan != null && consumerSpan != null);
});
- return (producerSpan, consumerSpan);
+ return (producerSpan, consumerSpan, internalSpan);
}
}
}
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
index cea4febc2..bae4f525b 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
@@ -17,7 +17,7 @@ internal static class OpenTelemetryConsumerEventsHandler
private const string AttributeMessagingKafkaSourcePartition = "messaging.kafka.source.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
- public static Task OnConsumeStarted(IMessageContext context)
+ public static void OnConsumeStarted(IMessageContext context)
{
try
{
@@ -50,21 +50,17 @@ public static Task OnConsumeStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}
-
- return Task.CompletedTask;
}
- public static Task OnConsumeCompleted(IMessageContext context)
+ public static void OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}
-
- return Task.CompletedTask;
}
- public static Task OnConsumeError(IMessageContext context, Exception ex)
+ public static void OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
@@ -73,8 +69,6 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
activity?.Dispose();
}
-
- return Task.CompletedTask;
}
private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessageContext context, string key)
diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
index 403e257d1..a7fa35d6a 100644
--- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
+++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
@@ -16,7 +16,7 @@ internal static class OpenTelemetryProducerEventsHandler
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
- public static Task OnProducerStarted(IMessageContext context)
+ public static void OnProducerStarted(IMessageContext context)
{
try
{
@@ -60,21 +60,17 @@ public static Task OnProducerStarted(IMessageContext context)
{
// If there is any failure, do not propagate the context.
}
-
- return Task.CompletedTask;
}
- public static Task OnProducerCompleted(IMessageContext context)
+ public static void OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
}
-
- return Task.CompletedTask;
}
- public static Task OnProducerError(IMessageContext context, Exception ex)
+ public static void OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
@@ -83,8 +79,6 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
activity?.Dispose();
}
-
- return Task.CompletedTask;
}
private static void InjectTraceContextIntoBasicProperties(IMessageContext context, string key, string value)
diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs
index 44d73ad48..c965e8ef4 100644
--- a/src/KafkaFlow/Event.cs
+++ b/src/KafkaFlow/Event.cs
@@ -2,13 +2,15 @@
{
using System;
using System.Collections.Generic;
+ using System.Linq;
using System.Threading.Tasks;
internal class Event : IEvent
{
private readonly ILogHandler logHandler;
- private readonly List> handlers = new();
+ private readonly List> asyncHandlers = new();
+ private readonly List> syncHandlers = new();
public Event(ILogHandler logHandler)
{
@@ -17,33 +19,64 @@ public Event(ILogHandler logHandler)
public IEventSubscription Subscribe(Func handler)
{
- if (!this.handlers.Contains(handler))
+ return this.Subscribe(this.asyncHandlers, handler);
+ }
+
+ public IEventSubscription Subscribe(Action handler)
+ {
+ return this.Subscribe(this.syncHandlers, handler);
+ }
+
+ internal Task FireAsync(TArg arg)
+ {
+ this.InvokeSyncHandlers(arg);
+ return this.InvokeAsyncHandlers(arg);
+ }
+
+ private IEventSubscription Subscribe(List handlersList, T handler)
+ {
+ if (!handlersList.Contains(handler))
{
- this.handlers.Add(handler);
+ handlersList.Add(handler);
}
- return new EventSubscription(() => this.handlers.Remove(handler));
+ return new EventSubscription(() => handlersList.Remove(handler));
}
- internal async Task FireAsync(TArg arg)
+ private async Task InvokeAsyncHandlers(TArg arg)
{
- foreach (var handler in this.handlers)
+ foreach (var handler in this.asyncHandlers.Where(h => h is not null))
{
try
{
- if (handler is null)
- {
- continue;
- }
-
await handler.Invoke(arg);
}
- catch (Exception e)
+ catch (Exception ex)
+ {
+ this.LogHandlerOnError(ex);
+ }
+ }
+ }
+
+ private void InvokeSyncHandlers(TArg arg)
+ {
+ foreach (var handler in this.syncHandlers.Where(h => h is not null))
+ {
+ try
{
- this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name });
+ handler.Invoke(arg);
+ }
+ catch (Exception ex)
+ {
+ this.LogHandlerOnError(ex);
}
}
}
+
+ private void LogHandlerOnError(Exception ex)
+ {
+ this.logHandler.Error("Error firing event", ex, new { Event = this.GetType().Name });
+ }
}
internal class Event : IEvent
@@ -57,6 +90,8 @@ public Event(ILogHandler logHandler)
public IEventSubscription Subscribe(Func handler) => this.evt.Subscribe(_ => handler.Invoke());
+ public IEventSubscription Subscribe(Action handler) => this.evt.Subscribe(_ => handler.Invoke());
+
internal Task FireAsync() => this.evt.FireAsync(null);
}
}
\ No newline at end of file