From c0e5bffb7944a590b16a21f89a0886deb4df54ff Mon Sep 17 00:00:00 2001 From: Joel Oliveira Date: Mon, 23 Nov 2020 14:54:16 +0000 Subject: [PATCH] feat: access message timestamp from message consumer context --- .../IMessageContextConsumer.cs | 6 ++++ .../MessageContextConsumerTests.cs | 36 +++++++++++++++++++ .../Consumers/MessageContextConsumer.cs | 7 ++-- 3 files changed, 47 insertions(+), 2 deletions(-) create mode 100644 src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs diff --git a/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs b/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs index 23187567c..4eaa0d5d8 100644 --- a/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs +++ b/src/KafkaFlow.Abstractions/IMessageContextConsumer.cs @@ -1,5 +1,6 @@ namespace KafkaFlow { + using System; using System.Threading; /// @@ -17,6 +18,11 @@ public interface IMessageContextConsumer /// CancellationToken WorkerStopped { get; } + /// + /// Message timestamp. By default is the UTC timestamp when the message was produced + /// + DateTime MessageTimestamp { get; } + /// /// Gets or Sets if the framework should store the current offset in the end when auto store offset is used /// diff --git a/src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs b/src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs new file mode 100644 index 000000000..b550cc0a4 --- /dev/null +++ b/src/KafkaFlow.UnitTests/MessageContextConsumerTests.cs @@ -0,0 +1,36 @@ +namespace KafkaFlow.UnitTests +{ + using System; + using System.Threading; + using Confluent.Kafka; + using FluentAssertions; + using KafkaFlow.Consumers; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class MessageContextConsumerTests + { + [TestMethod] + public void MessageTimestamp_ConsumeResultHasMessageTimestamp_ReturnsMessageTimestampFromResult() + { + // Arrange + var expectedMessageTimestamp = new DateTime(2020, 1, 1, 0, 0, 0); + + var consumerResult = new ConsumeResult + { + Message = new Message + { + Timestamp = new Timestamp(expectedMessageTimestamp) + } + }; + + var target = new MessageContextConsumer(null, "consumer", null, consumerResult, CancellationToken.None); + + // Act + var messageTimestamp = target.MessageTimestamp; + + // Assert + messageTimestamp.Should().Be(expectedMessageTimestamp); + } + } +} diff --git a/src/KafkaFlow/Consumers/MessageContextConsumer.cs b/src/KafkaFlow/Consumers/MessageContextConsumer.cs index ee93a4638..913ee9fae 100644 --- a/src/KafkaFlow/Consumers/MessageContextConsumer.cs +++ b/src/KafkaFlow/Consumers/MessageContextConsumer.cs @@ -1,5 +1,6 @@ namespace KafkaFlow.Consumers { + using System; using System.Threading; using Confluent.Kafka; @@ -13,7 +14,7 @@ public MessageContextConsumer( IConsumer consumer, string name, IOffsetManager offsetManager, - ConsumeResult kafkaResult, + ConsumeResult kafkaResult, CancellationToken workerStopped) { this.Name = name; @@ -24,11 +25,13 @@ public MessageContextConsumer( } public string Name { get; } - + public CancellationToken WorkerStopped { get; } public bool ShouldStoreOffset { get; set; } = true; + public DateTime MessageTimestamp => this.kafkaResult.Message.Timestamp.UtcDateTime; + public void StoreOffset() { this.offsetManager.StoreOffset(this.kafkaResult.TopicPartitionOffset);