diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs index 253361c63c..acf81c3d3f 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs @@ -50,8 +50,11 @@ public Message CreateMessage(ConsumeResult consumeResult) if (correlationId.Success) messageHeader.CorrelationId = correlationId.Result; + //If we don't have a partition key in the header, assume a non-Brighter sender and use message key if (partitionKey.Success) messageHeader.PartitionKey = partitionKey.Result; + else + messageHeader.PartitionKey = consumeResult.Message.Key; if (contentType.Success) messageHeader.ContentType =contentType.Result; @@ -202,7 +205,7 @@ private HeaderResult ReadPartitionKey(Headers headers) if (string.IsNullOrEmpty(s)) { s_logger.LogDebug("No partition key found in message"); - return new HeaderResult(string.Empty, true); + return new HeaderResult(string.Empty, false); } return new HeaderResult(s, true); diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs index 7e894cc2da..9e3e27b755 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageProducer.cs @@ -306,8 +306,5 @@ private void PublishResults(PersistenceStatus status, Headers headers) Task.Run((() =>OnMessagePublished?.Invoke(false, Guid.Empty))); } - - - } } diff --git a/src/Paramore.Brighter/Policies/Handlers/ExceptionPolicyHandlerAsync.cs b/src/Paramore.Brighter/Policies/Handlers/ExceptionPolicyHandlerAsync.cs index db78581ef6..be38ea0526 100644 --- a/src/Paramore.Brighter/Policies/Handlers/ExceptionPolicyHandlerAsync.cs +++ b/src/Paramore.Brighter/Policies/Handlers/ExceptionPolicyHandlerAsync.cs @@ -55,8 +55,8 @@ public class ExceptionPolicyHandlerAsync : RequestHandlerAsyncCould not find the policy for this attribute, did you register it with the command processor's container;initializerList public override void InitializeFromAttributeParams(params object[] initializerList) { - //we expect the first and only parameter to be a string - var policyName = (string)initializerList[0]; + if (_initialized) return; + var policies = (List)initializerList[0]; policies.Each(p => _policies.Add(Context.Policies.Get(p))); _initialized = true; diff --git a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/TestDoubles/MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync.cs b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/TestDoubles/MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync.cs index 26dbdb11e7..acfbf48866 100644 --- a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/TestDoubles/MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync.cs +++ b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/TestDoubles/MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync.cs @@ -1,4 +1,5 @@ -using System.Threading; +using System; +using System.Threading; using System.Threading.Tasks; using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; using Paramore.Brighter.Policies.Attributes; @@ -18,7 +19,7 @@ public MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync() public override Task HandleAsync(MyCommand command, CancellationToken cancellationToken = default) { ReceivedCommand = true; - return base.HandleAsync(command, cancellationToken); + throw new DivideByZeroException(); } public static bool ShouldReceive(MyCommand myCommand) diff --git a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs index 455c2813c7..22d3ea34f8 100644 --- a/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/ExceptionPolicy/When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit_Async.cs @@ -1,4 +1,5 @@ -using System; +using System; +using System.Threading.Tasks; using FluentAssertions; using FluentAssertions.Extensions; using Microsoft.Extensions.DependencyInjection; @@ -31,7 +32,7 @@ public CommandProcessorWithBothRetryAndCircuitBreakerAsync() var container = new ServiceCollection(); container.AddSingleton(); - container.AddSingleton>(); + container.AddSingleton>(); container.AddSingleton(new BrighterOptions() { HandlerLifetime = ServiceLifetime.Transient @@ -71,20 +72,20 @@ public CommandProcessorWithBothRetryAndCircuitBreakerAsync() } [Fact] - public void When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit() + public async Task When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit() { //First two should be caught, and increment the count - _firstException = Catch.Exception(async () => await _commandProcessor.SendAsync(new MyCommand())); + _firstException = await Catch.ExceptionAsync(async () => await _commandProcessor.SendAsync(new MyCommand())); //should have retried three times _retryCount.Should().Be(3); _retryCount = 0; - _secondException = Catch.Exception(async() => await _commandProcessor.SendAsync(new MyCommand())); + _secondException = await Catch.ExceptionAsync(async() => await _commandProcessor.SendAsync(new MyCommand())); //should have retried three times _retryCount.Should().Be(3); _retryCount = 0; //this one should tell us that the circuit is broken - _thirdException = Catch.Exception(async() => await _commandProcessor.SendAsync(new MyCommand())); + _thirdException = await Catch.ExceptionAsync(async() => await _commandProcessor.SendAsync(new MyCommand())); //should have retried three times _retryCount.Should().Be(0); diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs index a948672d24..36055e26d5 100644 --- a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_posting_a_message.cs @@ -1,4 +1,5 @@ #region Licence + /* The MIT License (MIT) Copyright © 2014 Wayne Hunsley @@ -35,17 +36,16 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Kafka.Tests.MessagingGateway { [Trait("Category", "Kafka")] - [Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition + [Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition public class KafkaMessageProducerSendTests : IDisposable { private readonly ITestOutputHelper _output; - private readonly string _queueName = Guid.NewGuid().ToString(); + private readonly string _queueName = Guid.NewGuid().ToString(); private readonly string _topic = Guid.NewGuid().ToString(); private readonly IAmAProducerRegistry _producerRegistry; private readonly IAmAMessageConsumer _consumer; private readonly string _partitionKey = Guid.NewGuid().ToString(); - public KafkaMessageProducerSendTests(ITestOutputHelper output) { const string groupId = "Kafka Message Producer Send Test"; @@ -53,29 +53,30 @@ public KafkaMessageProducerSendTests(ITestOutputHelper output) _producerRegistry = new KafkaProducerRegistryFactory( new KafkaMessagingGatewayConfiguration { - Name = "Kafka Producer Send Test", - BootStrapServers = new[] {"localhost:9092"} + Name = "Kafka Producer Send Test", BootStrapServers = new[] { "localhost:9092" } }, - new KafkaPublication[] {new KafkaPublication + new KafkaPublication[] { - Topic = new RoutingKey(_topic), - NumPartitions = 1, - ReplicationFactor = 1, - //These timeouts support running on a container using the same host as the tests, - //your production values ought to be lower - MessageTimeoutMs = 2000, - RequestTimeoutMs = 2000, - MakeChannels = OnMissingChannel.Create - }}).Create(); - + new KafkaPublication + { + Topic = new RoutingKey(_topic), + NumPartitions = 1, + ReplicationFactor = 1, + //These timeouts support running on a container using the same host as the tests, + //your production values ought to be lower + MessageTimeoutMs = 2000, + RequestTimeoutMs = 2000, + MakeChannels = OnMissingChannel.Create + } + }).Create(); + _consumer = new KafkaMessageConsumerFactory( new KafkaMessagingGatewayConfiguration { - Name = "Kafka Consumer Test", - BootStrapServers = new[] { "localhost:9092" } + Name = "Kafka Consumer Test", BootStrapServers = new[] { "localhost:9092" } }) .Create(new KafkaSubscription( - channelName: new ChannelName(_queueName), + channelName: new ChannelName(_queueName), routingKey: new RoutingKey(_topic), groupId: groupId, numOfPartitions: 1, @@ -83,30 +84,26 @@ public KafkaMessageProducerSendTests(ITestOutputHelper output) makeChannels: OnMissingChannel.Create ) ); - } [Fact] public void When_posting_a_message() { - var command = new MyCommand{Value = "Test Content"}; - + var command = new MyCommand { Value = "Test Content" }; + //vanilla i.e. no Kafka specific bytes at the beginning var body = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options); - + var message = new Message( - new MessageHeader(Guid.NewGuid(), _topic, MessageType.MT_COMMAND) - { - PartitionKey = _partitionKey - }, + new MessageHeader(Guid.NewGuid(), _topic, MessageType.MT_COMMAND) { PartitionKey = _partitionKey }, new MessageBody(body)); - + ((IAmAMessageProducerSync)_producerRegistry.LookupBy(_topic)).Send(message); var receivedMessage = GetMessage(); - - var receivedCommand = JsonSerializer.Deserialize(message.Body.Value, JsonSerialisationOptions.Options); - + + var receivedCommand = JsonSerializer.Deserialize(receivedMessage.Body.Value, JsonSerialisationOptions.Options); + receivedMessage.Header.MessageType.Should().Be(MessageType.MT_COMMAND); receivedMessage.Header.PartitionKey.Should().Be(_partitionKey); receivedMessage.Body.Bytes.Should().Equal(message.Body.Bytes); @@ -132,19 +129,17 @@ private Message GetMessage() _consumer.Acknowledge(messages[0]); break; } - } catch (ChannelFailureException cfx) { //Lots of reasons to be here as Kafka propagates a topic, or the test cluster is still initializing _output.WriteLine($" Failed to read from topic:{_topic} because {cfx.Message} attempt: {maxTries}"); } - } while (maxTries <= 3); - + if (messages[0].Header.MessageType == MessageType.MT_NONE) throw new Exception($"Failed to read from topic:{_topic} after {maxTries} attempts"); - + return messages[0]; } diff --git a/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_recieving_a_message_without_partition_key_header.cs b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_recieving_a_message_without_partition_key_header.cs new file mode 100644 index 0000000000..8a71bfe22f --- /dev/null +++ b/tests/Paramore.Brighter.Kafka.Tests/MessagingGateway/When_recieving_a_message_without_partition_key_header.cs @@ -0,0 +1,133 @@ +using System; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Confluent.Kafka; +using FluentAssertions; +using Paramore.Brighter.Kafka.Tests.TestDoubles; +using Paramore.Brighter.MessagingGateway.Kafka; +using Xunit; +using Xunit.Abstractions; +using Acks = Confluent.Kafka.Acks; + +namespace Paramore.Brighter.Kafka.Tests.MessagingGateway; + +public class KafkaMessageProducerMissingHeaderTests : IDisposable +{ + private readonly ITestOutputHelper _output; + private readonly string _queueName = Guid.NewGuid().ToString(); + private readonly string _topic = Guid.NewGuid().ToString(); + private readonly IAmAMessageConsumer _consumer; + private readonly IProducer _producer; + + public KafkaMessageProducerMissingHeaderTests(ITestOutputHelper output) + { + const string groupId = "Kafka Message Producer Missing Header Test"; + _output = output; + + + var clientConfig = new ClientConfig + { + Acks = (Confluent.Kafka.Acks)((int)Acks.All), + BootstrapServers = string.Join(",", new[] { "localhost:9092" }), + ClientId = "Kafka Producer Send with Missing Header Tests", + }; + + var producerConfig = new ProducerConfig(clientConfig) + { + BatchNumMessages = 10, + EnableIdempotence = true, + MaxInFlight = 1, + LingerMs = 5, + MessageTimeoutMs = 5000, + MessageSendMaxRetries = 3, + Partitioner = Confluent.Kafka.Partitioner.ConsistentRandom, + QueueBufferingMaxMessages = 10, + QueueBufferingMaxKbytes = 1048576, + RequestTimeoutMs = 500, + RetryBackoffMs = 100, + }; + + _producer = new ProducerBuilder(producerConfig) + .SetErrorHandler((_, error) => + { + output.WriteLine($"Kafka producer failed with Code: {error.Code}, Reason: { error.Reason}, Fatal: {error.IsFatal}", error.Code, error.Reason, error.IsFatal); + }) + .Build(); + + _consumer = new KafkaMessageConsumerFactory( + new KafkaMessagingGatewayConfiguration + { + Name = "Kafka Consumer Test", BootStrapServers = new[] { "localhost:9092" } + }) + .Create(new KafkaSubscription( + channelName: new ChannelName(_queueName), + routingKey: new RoutingKey(_topic), + groupId: groupId, + numOfPartitions: 1, + replicationFactor: 1, + makeChannels: OnMissingChannel.Create + ) + ); + } + + [Fact] + public void When_recieving_a_message_without_partition_key_header() + { + var command = new MyCommand { Value = "Test Content" }; + + //vanilla i.e. no Kafka specific bytes at the beginning + var body = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options); + var value = Encoding.UTF8.GetBytes(body); + var kafkaMessage = new Confluent.Kafka.Message + { + Key = command.Id.ToString(), + Value = value + }; + + _producer.Produce(_topic, kafkaMessage, report => _output.WriteLine(report.ToString()) ); + + var receivedMessage = GetMessage(); + + //Where we lack a partition key header, assume non-Brighter header and set to message key + receivedMessage.Header.PartitionKey.Should().Be(command.Id.ToString()); + receivedMessage.Body.Bytes.Should().Equal(value); + } + + private Message GetMessage() + { + Message[] messages = new Message[0]; + int maxTries = 0; + do + { + try + { + maxTries++; + Task.Delay(500).Wait(); //Let topic propagate in the broker + messages = _consumer.Receive(1000); + + if (messages[0].Header.MessageType != MessageType.MT_NONE) + { + _consumer.Acknowledge(messages[0]); + break; + } + } + catch (ChannelFailureException cfx) + { + //Lots of reasons to be here as Kafka propagates a topic, or the test cluster is still initializing + _output.WriteLine($" Failed to read from topic:{_topic} because {cfx.Message} attempt: {maxTries}"); + } + } while (maxTries <= 3); + + if (messages[0].Header.MessageType == MessageType.MT_NONE) + throw new Exception($"Failed to read from topic:{_topic} after {maxTries} attempts"); + + return messages[0]; + } + + public void Dispose() + { + _producer?.Dispose(); + _consumer?.Dispose(); + } +}