Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
simaoribeiro committed Nov 20, 2023
1 parent 21db5e3 commit 5154e1f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 21 deletions.
12 changes: 12 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"KafkaFlow.Admin.Dashboard": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:63908;http://localhost:63909"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;

internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
MessageStorage.Add((byte[]) context.Message.Value);
var source = new ActivitySource("KafkaFlow.OpenTelemetry");
using var activity = source.StartActivity("integration-test", ActivityKind.Internal);

MessageStorage.Add((byte[])context.Message.Value);
await next(context);
}
}
Expand Down
40 changes: 35 additions & 5 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,43 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
// Arrange
var provider = await this.GetServiceProvider();
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
Expand Down Expand Up @@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Expand Down Expand Up @@ -181,9 +210,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;
Activity producerSpan = null, consumerSpan = null, internalSpan = null;

await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
Expand All @@ -192,11 +221,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);

return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return (producerSpan, consumerSpan);
return (producerSpan, consumerSpan, internalSpan);
}
}
}
33 changes: 19 additions & 14 deletions src/KafkaFlow/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ internal class Event<TArg> : IEvent<TArg>
{
private readonly ILogHandler logHandler;

private readonly List<Func<TArg, Task>> handlers = new();
private readonly List<Func<TArg, Task>> asyncHandlers = new();
private readonly List<Action<TArg>> syncHandlers = new();

public Event(ILogHandler logHandler)
{
Expand All @@ -17,31 +18,35 @@ public Event(ILogHandler logHandler)

public IEventSubscription Subscribe(Func<TArg, Task> handler)
{
if (!this.handlers.Contains(handler))
if (!this.asyncHandlers.Contains(handler))
{
this.handlers.Add(handler);
this.asyncHandlers.Add(handler);
}

return new EventSubscription(() => this.handlers.Remove(handler));
return new EventSubscription(() => this.asyncHandlers.Remove(handler));
}

internal async Task FireAsync(TArg arg)
internal Task FireAsync(TArg arg)
{
foreach (var handler in this.handlers)
return Task.WhenAll(this.FireAsyncCall(arg));
}

private IEnumerable<Task> FireAsyncCall(TArg arg)
{
foreach (var handler in this.asyncHandlers)
{
try
{
if (handler is null)
{
continue;
}

await handler.Invoke(arg);
}
catch (Exception e)
{
this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name });
}
yield return handler.Invoke(arg).ContinueWith(t =>
{
if (t.IsFaulted)
{
this.logHandler.Error("Error firing event", t.Exception, new { Event = this.GetType().Name });
}
});
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/KafkaFlow/GlobalEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public GlobalEvents(ILogHandler log)
public IEvent<MessageEventContext> MessageProduceStarted => this.messageProduceStarted;

public Task FireMessageConsumeStartedAsync(MessageEventContext context)
=> this.messageConsumeStarted.FireAsync(context);
{
return this.messageConsumeStarted.FireAsync(context);
}

public Task FireMessageConsumeErrorAsync(MessageErrorEventContext context)
=> this.messageConsumeError.FireAsync(context);
Expand Down

0 comments on commit 5154e1f

Please sign in to comment.