diff --git a/Makefile b/Makefile index b695f8a9b..586c8b91a 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ init_broker: @echo command | date @echo Initializing Kafka broker docker-compose -f docker-compose.yml up -d - docker exec kafka bash -c "kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-avro;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-json;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-json-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip-2" + docker exec kafka bash -c "kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-avro;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-json;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-json-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-protobuf-gzip-2;kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic test-pause-resume" shutdown_broker: @echo command | date diff --git a/src/KafkaFlow.IntegrationTests/ConpressionSerializationTest.cs b/src/KafkaFlow.IntegrationTests/ConpressionSerializationTest.cs index a5339379c..1647349be 100644 --- a/src/KafkaFlow.IntegrationTests/ConpressionSerializationTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConpressionSerializationTest.cs @@ -54,7 +54,6 @@ public async Task ProtoBufGzipMessageTest() // Act await Task.WhenAll(messages.Select(m => producer.ProduceAsync(m.Id.ToString(), m))); - // Assert foreach (var message in messages) { diff --git a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs index 3cfc24d93..3a796d775 100644 --- a/src/KafkaFlow.IntegrationTests/ConsumerTest.cs +++ b/src/KafkaFlow.IntegrationTests/ConsumerTest.cs @@ -68,6 +68,23 @@ public async Task MultipleTopicsSingleConsumerTest() await MessageStorage.AssertCountMessageAsync(message, 2); } } + + [TestMethod] + public async Task MultipleHandlersSingleTypeConsumerTest() + { + // Arrange + var producer = this.provider.GetRequiredService>(); + var messages = this.fixture.CreateMany(5).ToList(); + + // Act + messages.ForEach(m => producer.Produce(m.Id.ToString(), m)); + + // Assert + foreach (var message in messages) + { + await MessageStorage.AssertCountMessageAsync(message, 2); + } + } [TestMethod] public async Task MessageOrderingTest() @@ -102,12 +119,11 @@ public async Task MessageOrderingTest() } [TestMethod] - [Ignore] public async Task PauseResumeHeartbeatTest() { // Arrange var producer = this.provider.GetRequiredService>(); - var messages = this.fixture.CreateMany(5).ToList(); + var messages = this.fixture.CreateMany(5).ToList(); // Act await Task.WhenAll(messages.Select(m => producer.ProduceAsync( @@ -123,5 +139,5 @@ await Task.WhenAll(messages.Select(m => producer.ProduceAsync( await MessageStorage.AssertMessageAsync(message); } } - } + } } diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index b67269e45..80cd1d592 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -155,7 +155,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection }) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeSerializer() + .AddSingleTypeSerializer() .AddTypedHandlers( handlers => handlers @@ -177,8 +177,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) - .AddHandler() - .AddHandler()) + .AddHandlersFromAssemblyOf()) ) ) .AddConsumer( diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs new file mode 100644 index 000000000..a6ec5f2a3 --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs @@ -0,0 +1,15 @@ +namespace KafkaFlow.IntegrationTests.Core.Handlers +{ + using System.Threading.Tasks; + using KafkaFlow.TypedHandler; + using Messages; + + public class MessageHandler1 : IMessageHandler + { + public Task Handle(IMessageContext context, TestMessage1 message) + { + MessageStorage.Add(message); + return Task.CompletedTask; + } + } +} diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs index 3ce25f1f8..ec3bcc0c7 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageStorage.cs @@ -5,7 +5,6 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; - using Avro.Specific; using global::Microsoft.VisualStudio.TestTools.UnitTesting; using Messages; using MessageTypes; @@ -58,7 +57,7 @@ public static async Task AssertMessageAsync(ITestMessage message) { if (DateTime.Now.Subtract(start).Seconds > timeoutSec) { - Assert.Fail("Message not received"); + Assert.Fail("Message (ITestMessage) not received"); return; } @@ -74,7 +73,7 @@ public static async Task AssertMessageAsync(LogMessages2 message) { if (DateTime.Now.Subtract(start).Seconds > timeoutSec) { - Assert.Fail("Message not received"); + Assert.Fail("Message (LogMessages2) not received"); return; } @@ -90,7 +89,7 @@ public static async Task AssertMessageAsync(byte[] message) { if (DateTime.Now.Subtract(start).Seconds > timeoutSec) { - Assert.Fail("Message not received"); + Assert.Fail("Message (byte[]) not received"); return; } diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs index 1636a7311..30c654610 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs @@ -4,9 +4,9 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers using KafkaFlow.TypedHandler; using Messages; - public class PauseResumeHandler : IMessageHandler + public class PauseResumeHandler : IMessageHandler { - public async Task Handle(IMessageContext context, TestMessage1 message) + public async Task Handle(IMessageContext context, PauseResumeMessage message) { context.Consumer.Pause(); diff --git a/src/KafkaFlow.IntegrationTests/Core/Messages/PauseResumeMessage.cs b/src/KafkaFlow.IntegrationTests/Core/Messages/PauseResumeMessage.cs new file mode 100644 index 000000000..ff10f4bbf --- /dev/null +++ b/src/KafkaFlow.IntegrationTests/Core/Messages/PauseResumeMessage.cs @@ -0,0 +1,18 @@ +namespace KafkaFlow.IntegrationTests.Core.Messages +{ + using System; + using System.Runtime.Serialization; + + [DataContract] + public class PauseResumeMessage : ITestMessage + { + [DataMember(Order = 1)] + public Guid Id { get; set; } + + [DataMember(Order = 2)] + public string Value { get; set; } + + [DataMember(Order = 3)] + public int Version { get; set; } + } +} diff --git a/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs b/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs index 78230d971..3916ad442 100644 --- a/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs +++ b/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs @@ -2,19 +2,22 @@ namespace KafkaFlow.TypedHandler { using System; using System.Collections.Generic; + using System.Linq; internal class HandlerTypeMapping { - private readonly Dictionary mapping = new Dictionary(); + private readonly Dictionary> mapping = new Dictionary>(); public void AddMapping(Type messageType, Type handlerType) { - this.mapping.Add(messageType, handlerType); + var handlers = this.GetHandlersTypes(messageType); + + this.mapping[messageType] = handlers.Append(handlerType); } - public Type GetHandlerType(Type messageType) + public IEnumerable GetHandlersTypes(Type messageType) { - return this.mapping.TryGetValue(messageType, out var handlerType) ? handlerType : null; + return this.mapping.TryGetValue(messageType, out var handlerType) ? handlerType : Enumerable.Empty(); } } } diff --git a/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj b/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj index 65d0910c6..025395555 100644 --- a/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj +++ b/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj @@ -11,4 +11,10 @@ + + + <_Parameter1>KafkaFlow.UnitTests + + + diff --git a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs b/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs index 5418b4496..aa8417ce6 100644 --- a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs +++ b/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs @@ -1,5 +1,6 @@ namespace KafkaFlow.TypedHandler { + using System.Linq; using System.Threading.Tasks; internal class TypedHandlerMiddleware : IMessageMiddleware @@ -19,21 +20,17 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) { using (var scope = this.dependencyResolver.CreateScope()) { - var handlerType = this.configuration.HandlerMapping.GetHandlerType(context.Message.GetType()); - - if (handlerType == null) - { - return; - } - - var handler = scope.Resolver.Resolve(handlerType); - - await HandlerExecutor - .GetExecutor(context.Message.GetType()) - .Execute( - handler, - context, - context.Message) + await Task.WhenAll( + this.configuration + .HandlerMapping + .GetHandlersTypes(context.Message.GetType()) + .Select(t => + HandlerExecutor + .GetExecutor(context.Message.GetType()) + .Execute( + scope.Resolver.Resolve(t), + context, + context.Message))) .ConfigureAwait(false); } diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index e07ffb966..e1b9d43d8 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -21,6 +21,7 @@ + diff --git a/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs new file mode 100644 index 000000000..218110eb3 --- /dev/null +++ b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs @@ -0,0 +1,34 @@ +namespace KafkaFlow.UnitTests.TypedHandler +{ + using System; + using FluentAssertions; + using KafkaFlow.TypedHandler; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class HandlerTypeMappingTests + { + private HandlerTypeMapping target; + + [TestInitialize] + public void Setup() + { + this.target = new HandlerTypeMapping(); + } + + [TestMethod] + public void AddSeveralMappings_GetHandlersTypesReturnsListOfHandlers() + { + // Act + this.target.AddMapping(typeof(int), typeof(string)); + this.target.AddMapping(typeof(int), typeof(double)); + this.target.AddMapping(typeof(int), typeof(bool)); + + // Assert + this.target.GetHandlersTypes(typeof(int)).Should().BeEquivalentTo( + typeof(string), + typeof(double), + typeof(bool)); + } + } +} \ No newline at end of file