Skip to content

Commit

Permalink
Merge branch 'feat/include-open-telemetry' of https://github.com/Farf…
Browse files Browse the repository at this point in the history
…etch/kafkaflow into feat/include-open-telemetry
  • Loading branch information
simaoribeiro committed Oct 17, 2023
2 parents dd8759d + 9ef12a4 commit 5a90dce
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -289,30 +289,29 @@ await Policy
.WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2))))
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}
}

internal class TriggerErrorMessageMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
private class TriggerErrorMessageMiddleware : IMessageMiddleware
{
MessageStorage.Add((byte[])context.Message.Value);
throw new ErrorExecutingMiddlewareException(nameof(TriggerErrorMessageMiddleware));
await next(context);
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
MessageStorage.Add((byte[])context.Message.Value);
throw new ErrorExecutingMiddlewareException(nameof(TriggerErrorMessageMiddleware));
}
}
}

public class TriggerErrorSerializerMiddleware : ISerializer
{
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
private class TriggerErrorSerializerMiddleware : ISerializer
{
var error = new Error(ErrorCode.BrokerNotAvailable);
throw new ProduceException<byte[], byte[]>(error, null);
}
public Task SerializeAsync(object message, Stream output, ISerializerContext context)
{
var error = new Error(ErrorCode.BrokerNotAvailable);
throw new ProduceException<byte[], byte[]>(error, null);
}

public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
var error = new Error(ErrorCode.BrokerNotAvailable);
throw new ProduceException<byte[], byte[]>(error, null);
public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
{
var error = new Error(ErrorCode.BrokerNotAvailable);
throw new ProduceException<byte[], byte[]>(error, null);
}
}
}
}

0 comments on commit 5a90dce

Please sign in to comment.