Skip to content

Commit

Permalink
Kafka Missing Headers and Partition Key (#2628)
Browse files Browse the repository at this point in the history
* Support multiple policy async

* Fix issue that we should read partition key from message, if no header.

* Fix async test issues
  • Loading branch information
iancooper authored Apr 18, 2023
1 parent 910e990 commit fea6516
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ public Message CreateMessage(ConsumeResult<string, byte[]> consumeResult)
if (correlationId.Success)
messageHeader.CorrelationId = correlationId.Result;

//If we don't have a partition key in the header, assume a non-Brighter sender and use message key
if (partitionKey.Success)
messageHeader.PartitionKey = partitionKey.Result;
else
messageHeader.PartitionKey = consumeResult.Message.Key;

if (contentType.Success)
messageHeader.ContentType =contentType.Result;
Expand Down Expand Up @@ -202,7 +205,7 @@ private HeaderResult<string> ReadPartitionKey(Headers headers)
if (string.IsNullOrEmpty(s))
{
s_logger.LogDebug("No partition key found in message");
return new HeaderResult<string>(string.Empty, true);
return new HeaderResult<string>(string.Empty, false);
}

return new HeaderResult<string>(s, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,5 @@ private void PublishResults(PersistenceStatus status, Headers headers)

Task.Run((() =>OnMessagePublished?.Invoke(false, Guid.Empty)));
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class ExceptionPolicyHandlerAsync<TRequest> : RequestHandlerAsync<TReques
/// <exception cref="System.ArgumentException">Could not find the policy for this attribute, did you register it with the command processor's container;initializerList</exception>
public override void InitializeFromAttributeParams(params object[] initializerList)
{
//we expect the first and only parameter to be a string
var policyName = (string)initializerList[0];
if (_initialized) return;

var policies = (List<string>)initializerList[0];
policies.Each(p => _policies.Add(Context.Policies.Get<AsyncPolicy>(p)));
_initialized = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles;
using Paramore.Brighter.Policies.Attributes;
Expand All @@ -18,7 +19,7 @@ public MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync()
public override Task<MyCommand> HandleAsync(MyCommand command, CancellationToken cancellationToken = default)
{
ReceivedCommand = true;
return base.HandleAsync(command, cancellationToken);
throw new DivideByZeroException();
}

public static bool ShouldReceive(MyCommand myCommand)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System;
using System.Threading.Tasks;
using FluentAssertions;
using FluentAssertions.Extensions;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -31,7 +32,7 @@ public CommandProcessorWithBothRetryAndCircuitBreakerAsync()

var container = new ServiceCollection();
container.AddSingleton<MyMultiplePoliciesFailsWithDivideByZeroHandlerAsync>();
container.AddSingleton<ExceptionPolicyHandler<MyCommand>>();
container.AddSingleton<ExceptionPolicyHandlerAsync<MyCommand>>();
container.AddSingleton<IBrighterOptions>(new BrighterOptions()
{
HandlerLifetime = ServiceLifetime.Transient
Expand Down Expand Up @@ -71,20 +72,20 @@ public CommandProcessorWithBothRetryAndCircuitBreakerAsync()
}

[Fact]
public void When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit()
public async Task When_Sending_A_Command_That_Retries_Then_Repeatedly_Fails_Breaks_The_Circuit()
{
//First two should be caught, and increment the count
_firstException = Catch.Exception(async () => await _commandProcessor.SendAsync(new MyCommand()));
_firstException = await Catch.ExceptionAsync(async () => await _commandProcessor.SendAsync(new MyCommand()));
//should have retried three times
_retryCount.Should().Be(3);
_retryCount = 0;
_secondException = Catch.Exception(async() => await _commandProcessor.SendAsync(new MyCommand()));
_secondException = await Catch.ExceptionAsync(async() => await _commandProcessor.SendAsync(new MyCommand()));
//should have retried three times
_retryCount.Should().Be(3);
_retryCount = 0;

//this one should tell us that the circuit is broken
_thirdException = Catch.Exception(async() => await _commandProcessor.SendAsync(new MyCommand()));
_thirdException = await Catch.ExceptionAsync(async() => await _commandProcessor.SendAsync(new MyCommand()));
//should have retried three times
_retryCount.Should().Be(0);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#region Licence

/* The MIT License (MIT)
Copyright © 2014 Wayne Hunsley <whunsley@gmail.com>
Expand Down Expand Up @@ -35,78 +36,74 @@ THE SOFTWARE. */
namespace Paramore.Brighter.Kafka.Tests.MessagingGateway
{
[Trait("Category", "Kafka")]
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
[Collection("Kafka")] //Kafka doesn't like multiple consumers of a partition
public class KafkaMessageProducerSendTests : IDisposable
{
private readonly ITestOutputHelper _output;
private readonly string _queueName = Guid.NewGuid().ToString();
private readonly string _queueName = Guid.NewGuid().ToString();
private readonly string _topic = Guid.NewGuid().ToString();
private readonly IAmAProducerRegistry _producerRegistry;
private readonly IAmAMessageConsumer _consumer;
private readonly string _partitionKey = Guid.NewGuid().ToString();


public KafkaMessageProducerSendTests(ITestOutputHelper output)
{
const string groupId = "Kafka Message Producer Send Test";
_output = output;
_producerRegistry = new KafkaProducerRegistryFactory(
new KafkaMessagingGatewayConfiguration
{
Name = "Kafka Producer Send Test",
BootStrapServers = new[] {"localhost:9092"}
Name = "Kafka Producer Send Test", BootStrapServers = new[] { "localhost:9092" }
},
new KafkaPublication[] {new KafkaPublication
new KafkaPublication[]
{
Topic = new RoutingKey(_topic),
NumPartitions = 1,
ReplicationFactor = 1,
//These timeouts support running on a container using the same host as the tests,
//your production values ought to be lower
MessageTimeoutMs = 2000,
RequestTimeoutMs = 2000,
MakeChannels = OnMissingChannel.Create
}}).Create();

new KafkaPublication
{
Topic = new RoutingKey(_topic),
NumPartitions = 1,
ReplicationFactor = 1,
//These timeouts support running on a container using the same host as the tests,
//your production values ought to be lower
MessageTimeoutMs = 2000,
RequestTimeoutMs = 2000,
MakeChannels = OnMissingChannel.Create
}
}).Create();

_consumer = new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration
{
Name = "Kafka Consumer Test",
BootStrapServers = new[] { "localhost:9092" }
Name = "Kafka Consumer Test", BootStrapServers = new[] { "localhost:9092" }
})
.Create(new KafkaSubscription<MyCommand>(
channelName: new ChannelName(_queueName),
channelName: new ChannelName(_queueName),
routingKey: new RoutingKey(_topic),
groupId: groupId,
numOfPartitions: 1,
replicationFactor: 1,
makeChannels: OnMissingChannel.Create
)
);

}

[Fact]
public void When_posting_a_message()
{
var command = new MyCommand{Value = "Test Content"};
var command = new MyCommand { Value = "Test Content" };

//vanilla i.e. no Kafka specific bytes at the beginning
var body = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options);

var message = new Message(
new MessageHeader(Guid.NewGuid(), _topic, MessageType.MT_COMMAND)
{
PartitionKey = _partitionKey
},
new MessageHeader(Guid.NewGuid(), _topic, MessageType.MT_COMMAND) { PartitionKey = _partitionKey },
new MessageBody(body));

((IAmAMessageProducerSync)_producerRegistry.LookupBy(_topic)).Send(message);

var receivedMessage = GetMessage();
var receivedCommand = JsonSerializer.Deserialize<MyCommand>(message.Body.Value, JsonSerialisationOptions.Options);

var receivedCommand = JsonSerializer.Deserialize<MyCommand>(receivedMessage.Body.Value, JsonSerialisationOptions.Options);

receivedMessage.Header.MessageType.Should().Be(MessageType.MT_COMMAND);
receivedMessage.Header.PartitionKey.Should().Be(_partitionKey);
receivedMessage.Body.Bytes.Should().Equal(message.Body.Bytes);
Expand All @@ -132,19 +129,17 @@ private Message GetMessage()
_consumer.Acknowledge(messages[0]);
break;
}

}
catch (ChannelFailureException cfx)
{
//Lots of reasons to be here as Kafka propagates a topic, or the test cluster is still initializing
_output.WriteLine($" Failed to read from topic:{_topic} because {cfx.Message} attempt: {maxTries}");
}

} while (maxTries <= 3);

if (messages[0].Header.MessageType == MessageType.MT_NONE)
throw new Exception($"Failed to read from topic:{_topic} after {maxTries} attempts");

return messages[0];
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Paramore.Brighter.Kafka.Tests.TestDoubles;
using Paramore.Brighter.MessagingGateway.Kafka;
using Xunit;
using Xunit.Abstractions;
using Acks = Confluent.Kafka.Acks;

namespace Paramore.Brighter.Kafka.Tests.MessagingGateway;

public class KafkaMessageProducerMissingHeaderTests : IDisposable
{
private readonly ITestOutputHelper _output;
private readonly string _queueName = Guid.NewGuid().ToString();
private readonly string _topic = Guid.NewGuid().ToString();
private readonly IAmAMessageConsumer _consumer;
private readonly IProducer<string,byte[]> _producer;

public KafkaMessageProducerMissingHeaderTests(ITestOutputHelper output)
{
const string groupId = "Kafka Message Producer Missing Header Test";
_output = output;


var clientConfig = new ClientConfig
{
Acks = (Confluent.Kafka.Acks)((int)Acks.All),
BootstrapServers = string.Join(",", new[] { "localhost:9092" }),
ClientId = "Kafka Producer Send with Missing Header Tests",
};

var producerConfig = new ProducerConfig(clientConfig)
{
BatchNumMessages = 10,
EnableIdempotence = true,
MaxInFlight = 1,
LingerMs = 5,
MessageTimeoutMs = 5000,
MessageSendMaxRetries = 3,
Partitioner = Confluent.Kafka.Partitioner.ConsistentRandom,
QueueBufferingMaxMessages = 10,
QueueBufferingMaxKbytes = 1048576,
RequestTimeoutMs = 500,
RetryBackoffMs = 100,
};

_producer = new ProducerBuilder<string, byte[]>(producerConfig)
.SetErrorHandler((_, error) =>
{
output.WriteLine($"Kafka producer failed with Code: {error.Code}, Reason: { error.Reason}, Fatal: {error.IsFatal}", error.Code, error.Reason, error.IsFatal);
})
.Build();

_consumer = new KafkaMessageConsumerFactory(
new KafkaMessagingGatewayConfiguration
{
Name = "Kafka Consumer Test", BootStrapServers = new[] { "localhost:9092" }
})
.Create(new KafkaSubscription<MyCommand>(
channelName: new ChannelName(_queueName),
routingKey: new RoutingKey(_topic),
groupId: groupId,
numOfPartitions: 1,
replicationFactor: 1,
makeChannels: OnMissingChannel.Create
)
);
}

[Fact]
public void When_recieving_a_message_without_partition_key_header()
{
var command = new MyCommand { Value = "Test Content" };

//vanilla i.e. no Kafka specific bytes at the beginning
var body = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options);
var value = Encoding.UTF8.GetBytes(body);
var kafkaMessage = new Confluent.Kafka.Message<string, byte[]>
{
Key = command.Id.ToString(),
Value = value
};

_producer.Produce(_topic, kafkaMessage, report => _output.WriteLine(report.ToString()) );

var receivedMessage = GetMessage();

//Where we lack a partition key header, assume non-Brighter header and set to message key
receivedMessage.Header.PartitionKey.Should().Be(command.Id.ToString());
receivedMessage.Body.Bytes.Should().Equal(value);
}

private Message GetMessage()
{
Message[] messages = new Message[0];
int maxTries = 0;
do
{
try
{
maxTries++;
Task.Delay(500).Wait(); //Let topic propagate in the broker
messages = _consumer.Receive(1000);

if (messages[0].Header.MessageType != MessageType.MT_NONE)
{
_consumer.Acknowledge(messages[0]);
break;
}
}
catch (ChannelFailureException cfx)
{
//Lots of reasons to be here as Kafka propagates a topic, or the test cluster is still initializing
_output.WriteLine($" Failed to read from topic:{_topic} because {cfx.Message} attempt: {maxTries}");
}
} while (maxTries <= 3);

if (messages[0].Header.MessageType == MessageType.MT_NONE)
throw new Exception($"Failed to read from topic:{_topic} after {maxTries} attempts");

return messages[0];
}

public void Dispose()
{
_producer?.Dispose();
_consumer?.Dispose();
}
}

0 comments on commit fea6516

Please sign in to comment.