From 7e314e1f0e29f45dd63f0053b62a206a74be54f2 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Thu, 6 Jul 2023 15:59:40 +0200 Subject: [PATCH 1/7] chore: too much vim --- Unitee.EventDriven.RedisStream.Tests/: | 401 ------------------------- 1 file changed, 401 deletions(-) delete mode 100644 Unitee.EventDriven.RedisStream.Tests/: diff --git a/Unitee.EventDriven.RedisStream.Tests/: b/Unitee.EventDriven.RedisStream.Tests/: deleted file mode 100644 index aabebb7..0000000 --- a/Unitee.EventDriven.RedisStream.Tests/: +++ /dev/null @@ -1,401 +0,0 @@ -using Unitee.EventDriven.DependencyInjection; -using FluentAssertions; -using Microsoft.Extensions.DependencyInjection; -using Moq; -using StackExchange.Redis; -using Unitee.EventDriven.Abstraction; -using Unitee.EventDriven.Models; -using System.Text.Json; - -namespace Unitee.EventDriven.RedisStream.Tests; - - -public class BaseTests : IClassFixture -{ - private readonly IConnectionMultiplexer _redis; - - public BaseTests(RedisFixtures redis) - { - _redis = redis.Redis; - } - - IServiceCollection GetServices(string name) - { - var _services = new ServiceCollection(); - - _services.AddLogging(); - _services.AddSingleton(_redis); - _services.AddScoped(); - _services.AddScoped(); - _services.AddScoped(provider => new RedisStreamMessagesProcessor(name, "Default", "DEAD_LETTER", provider)); - _services.AddSingleton(x => new RedisStreamBackgroundReceiver(x)); - return _services; - } - - [Fact] - public async Task PendingMessages_ShouldBeConsumedAtStart() - { - var db = _redis.GetDatabase(); - try - { - db.StreamCreateConsumerGroup("TEST_EVENT_1", "Test1", StreamPosition.NewMessages); - } - catch (RedisException) - { - } - - var services = GetServices("Test1"); - var consumerInstance = new Mock>(); - services.AddTransient(x => consumerInstance.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - await publisher.PublishAsync(new TestEvent1("World")); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_1"); - - consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent1("World")), Times.Once); - } - - [Fact] - public async Task NewMessages_ShouldBeConsumed() - { - var db = _redis.GetDatabase(); - var services = GetServices("Test1"); - var consumerInstance = new Mock>(); - services.AddTransient(x => consumerInstance.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent2("World")); - await Task.Delay(100); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_2"); - - consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent2("World")), Times.Once); - } - - [Fact] - public async Task MultipleConsumers_ShoudBeCaled() - { - var db = _redis.GetDatabase(); - var services = GetServices("Test2"); - - var consumerInstance1 = new Mock>(); - var consumerInstance2 = new Mock>(); - var consumerInstance3 = new Mock>(); - - services.AddTransient(x => consumerInstance1.Object); - services.AddTransient(x => consumerInstance2.Object); - services.AddTransient(x => consumerInstance3.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent3("World")); - await Task.Delay(100); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_3"); - - consumerInstance1.Verify(x => x.ConsumeAsync(new TestEvent3("World")), Times.Once); - consumerInstance2.Verify(x => x.ConsumeAsync(new TestEvent3("World")), Times.Once); - consumerInstance3.Verify(x => x.ConsumeAsync(new TestEvent3("World")), Times.Once); - } - - [Fact] - public async Task ResponseRequest_ShouldReply() - { - var db = _redis.GetDatabase(); - var services = GetServices(Guid.NewGuid().ToString()); - - services.AddTransient(); - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - var resp = await publisher.RequestResponseAsync(new TestEvent4("World"), new() - { - SessionId = Guid.NewGuid().ToString() - }); - await Task.Delay(100); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_4"); - - Assert.Equal("Received", resp); - } - - [Fact] - public async Task Scheduled_ShouldConsumeAMessageSentInThePast() - { - var db = _redis.GetDatabase(); - var services = GetServices(Guid.NewGuid().ToString()); - - var consumerInstance1 = new Mock>(); - services.AddTransient(x => consumerInstance1.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent5("World"), new MessageOptions() - { - ScheduledEnqueueTime = DateTime.UtcNow.AddSeconds(-5) - }); - await Task.Delay(3500); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_5"); - consumerInstance1.Verify(x => x.ConsumeAsync(new TestEvent5("World")), Times.Once); - } - - [Fact] - public async Task Concurrency_ScheduledMessageShouldNotBeSentMultipleTime() - { - var db = _redis.GetDatabase(); - var services = GetServices(Guid.NewGuid().ToString()); - - var consumerInstance1 = new Mock>(); - services.AddTransient(x => consumerInstance1.Object); - - services.AddSingleton(x => new RedisStreamBackgroundReceiver(x)); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent6("World"), new MessageOptions() - { - ScheduledEnqueueTime = DateTime.UtcNow.AddSeconds(-5) - }); - await Task.Delay(3500); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_6"); - consumerInstance1.Verify(x => x.ConsumeAsync(new TestEvent6("World")), Times.Once); - } - - [Fact] - public async Task FireAndForget_OldMessagesShouldNotBlockAtStart() - { - var db = _redis.GetDatabase(); - - try - { - db.StreamCreateConsumerGroup("TEST_EVENT_7", "DefaultConsumer", StreamPosition.NewMessages); - } - catch (RedisException) - { - } - - var services = GetServices("DefaultConsumer"); - - var slowConsumerInstance = new Mock>(); - - slowConsumerInstance.Setup(x => x.ConsumeAsync(new TestEvent7("World"))) - .Callback(async () => - { - await Task.Delay(1000); - }) - .Returns(Task.CompletedTask); - - services.AddTransient(x => slowConsumerInstance.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - var backgroundService = provider.GetService(); - - for (int i = 0; i < 50; i++) - { - await publisher.PublishAsync(new TestEvent7("World")); - } - - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(3000); - - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_7"); - - slowConsumerInstance.Verify(x => x.ConsumeAsync(new TestEvent7("World")), Times.Exactly(50)); - } - - [Fact] - public async Task DeadLetter_MessageNotConsumedShoudBePushToDeadLetter() - { - var db = _redis.GetDatabase(); - - var services = GetServices(Guid.NewGuid().ToString()); - - var throwingConsumer = new Mock>(); - - throwingConsumer.Setup(x => x.ConsumeAsync(new TestEvent8("World"))) - .Throws(new Exception("Hello World")); - - var deadLetterConsumer = new Mock>(); - - services.AddTransient(x => throwingConsumer.Object); - services.AddTransient(x => deadLetterConsumer.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent8("World")); - await Task.Delay(100); - - await backgroundService.StopAsync(CancellationToken.None); - - db.KeyDelete("TEST_EVENT_8"); - db.KeyDelete("DEAD_LETTER"); - - deadLetterConsumer.Verify(x => x.ConsumeAsync(It.IsAny()), Times.Once); - } - - [Fact] - public async Task Concurrency_TwoConsumerShouoldBeExecutedConcurrently() - { - var db = _redis.GetDatabase(); - var services = GetServices(Guid.NewGuid().ToString()); - - var slowConsumerInstance1 = new Mock>(); - var slowConsumerInstance2 = new Mock>(); - - - var consumer1 = slowConsumerInstance1.Setup(x => x.ConsumeAsync(new TestEvent9("World"))) - .Callback(async () => - { - await Task.Delay(1000); - }) - .Returns(Task.CompletedTask); - - var consumer2 = slowConsumerInstance1.Setup(x => x.ConsumeAsync(new TestEvent9("World"))) - .Callback(async () => - { - await Task.Delay(1000); - }) - .Returns(Task.CompletedTask); - - - services.AddTransient(x => slowConsumerInstance1.Object); - services.AddTransient(x => slowConsumerInstance2.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent9("World")); - await Task.Delay(1000 + 100); - - await backgroundService.StopAsync(CancellationToken.None); - - slowConsumerInstance1.Verify(x => x.ConsumeAsync(It.IsAny()), Times.Once); - slowConsumerInstance2.Verify(x => x.ConsumeAsync(It.IsAny()), Times.Once); - - db.KeyDelete("TEST_EVENT_9"); - } - - - [Fact] - public async Task RequestReply_ShouldOnlySendOneEventInTheStream() - { - var db = _redis.GetDatabase(); - db.KeyDelete("TEST_EVENT_10"); - - var services = GetServices(Guid.NewGuid().ToString()); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - try - { - await publisher.RequestResponseAsync(new TestEvent10("World"), new()); - } catch (TimeoutException) - { - } - - db.StreamLength("TEST_EVENT_10").Should().Be(1); - db.KeyDelete("TEST_EVENT_10"); - } - - - [Fact] - public async Task Json_CustomOptionsShouldByApplied() - { - var db = _redis.GetDatabase(); - db.KeyDelete("TEST_EVENT_11"); - - var services = GetServices(Guid.NewGuid().ToString()); - services.AddRedisStreamOptions(options => - { - options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase; - }); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - await publisher.PublishAsync(new TestEvent11("World")); - - var stream = db.StreamRead("TEST_EVENT_11", "0-0").First(); - stream.Values.First().Value.ToString().Should().Be("{\"aTestString\":\"World\"}"); - } - - [Fact] - public async Task Json_WithCustomOptionsShouldWorkAsNormal() - { - var db = _redis.GetDatabase(); - db.KeyDelete("TEST_EVENT_12"); - - var services = GetServices(Guid.NewGuid().ToString()); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - await publisher.PublishAsync(new TestEvent12("World")); - - var stream = db.StreamRead("TEST_EVENT_12", "0-0").First(); - stream.Values.First().Value.ToString().Should().Be("{\"ATestString\":\"World\"}"); - } - - [Fact] - public async Task Json_ShouldBeConsumedEvenWithCustomJsonOptions() - { - var db = _redis.GetDatabase(); - var services = GetServices(Guid.NewGuid().ToString()); - var consumerInstance = new Mock>(); - services.AddTransient(x => consumerInstance.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - - var backgroundService = provider.GetService(); - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent13("World")); - await Task.Delay(100); - await backgroundService.StopAsync(CancellationToken.None); - db.KeyDelete("TEST_EVENT_13"); - - consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent13("World")), Times.Once); - } - - -} From 7cca3e61d6ef3c00bdc5b3d7f8ae5dc42134a5b4 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Thu, 6 Jul 2023 17:30:23 +0200 Subject: [PATCH 2/7] feat: clean code + ttl + use serializer options only to serialize body (not scheduled message payload) --- .../Model/MessageOptions.cs | 4 +- ...entDriven.RedisStream.Tests.code-workspace | 4 +- .../EventsFixtures.cs | 4 + .../IntegrationTests.cs | 62 ++++++++++ .../RedisStreamMessagesProcessor.cs | 34 +++++- .../RedisStreamPublisher.cs | 109 ++++++++++++------ .../RedisStreamScheduledMessageType.cs | 2 +- .../Unitee.EventDriven.RedisStream.csproj | 1 + 8 files changed, 177 insertions(+), 43 deletions(-) diff --git a/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs b/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs index 79fe934..d026530 100644 --- a/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs +++ b/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs @@ -6,4 +6,6 @@ public record MessageOptions() public DateTimeOffset? ScheduledEnqueueTime { get; init; } public string? MessageId { get; init; } public string? SessionId { get; set; } -} \ No newline at end of file + public DateTimeOffset? TimeToLive { get; init; } + public string? Locale { get; init; } +} diff --git a/Unitee.EventDriven.RedisStream.Tests.code-workspace b/Unitee.EventDriven.RedisStream.Tests.code-workspace index 2963e58..3d186d0 100644 --- a/Unitee.EventDriven.RedisStream.Tests.code-workspace +++ b/Unitee.EventDriven.RedisStream.Tests.code-workspace @@ -7,5 +7,7 @@ "path": "Unitee.EventDriven.RedisStream" } ], - "settings": {} + "settings": { + "dotnet.defaultSolution": "/Users/tcreach/Documents/Projects/Unitee.EventDriven/Unitee.EventDriven.RedisStream.Tests/Unitee.EventDriven.RedisStream.Tests.sln" + } } \ No newline at end of file diff --git a/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs b/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs index 2c733e7..9f735c8 100644 --- a/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs +++ b/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs @@ -28,6 +28,10 @@ public record TestEvent11(string ATestString); public record TestEvent12(string ATestString); [Subject("TEST_EVENT_13")] public record TestEvent13(string ATestString); +[Subject("TEST_EVENT_14")] +public record TestEvent14(string ATestString); +[Subject("TEST_EVENT_15")] +public record TestEvent15(string ATestString); [Subject("DEAD_LETTER")] diff --git a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs index aabebb7..8548db3 100644 --- a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs +++ b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs @@ -398,4 +398,66 @@ public async Task Json_ShouldBeConsumedEvenWithCustomJsonOptions() } + [Fact] + public async Task TTL_MessageWithATtlShouldBeProcessed() + { + var db = _redis.GetDatabase(); + + try + { + db.StreamCreateConsumerGroup("TEST_EVENT_14", "DefaultConsumer", StreamPosition.NewMessages); + } + catch (RedisException) + { + } + + var services = GetServices("DefaultConsumer"); + var consumerInstance = new Mock>(); + services.AddTransient(x => consumerInstance.Object); + + var provider = services.BuildServiceProvider(); + var publisher = provider.GetRequiredService(); + var backgroundService = provider.GetService(); + + await publisher.PublishAsync(new TestEvent14("World"), new MessageOptions() { TimeToLive = DateTimeOffset.UtcNow.AddSeconds(5) }); + await backgroundService.StartAsync(CancellationToken.None); + await Task.Delay(1000); + await backgroundService.StopAsync(CancellationToken.None); + db.KeyDelete("TEST_EVENT_14"); + + consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent14("World")), Times.Once); + } + + [Fact] + public async Task TTL_MessageWithAExpiredTTLShouldNotBeProcessed() + { + var db = _redis.GetDatabase(); + + try + { + db.StreamCreateConsumerGroup("TEST_EVENT_15", "DefaultConsumer", StreamPosition.NewMessages); + } + catch (RedisException) + { + } + var services = GetServices("DefaultConsumer"); + var consumerInstance = new Mock>(); + services.AddTransient(x => consumerInstance.Object); + + var provider = services.BuildServiceProvider(); + var publisher = provider.GetRequiredService(); + var backgroundService = provider.GetService(); + + await publisher.PublishAsync(new TestEvent15("World"), new MessageOptions() { TimeToLive = DateTimeOffset.UtcNow.AddSeconds(1) }); + await Task.Delay(2000); + await backgroundService.StartAsync(CancellationToken.None); + await Task.Delay(1000); + await backgroundService.StopAsync(CancellationToken.None); + db.KeyDelete("TEST_EVENT_15"); + + consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent15("World")), Times.Never()); + } + + + } diff --git a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs index 54c5a55..3c31991 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using CSharpFunctionalExtensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using StackExchange.Redis; @@ -11,7 +12,7 @@ namespace Unitee.EventDriven.RedisStream; -public record ParsedStreamEntry(string? Id, object? Body, string? ReplyTo); +public record ParsedStreamEntry(string? Id, object? Body, string? ReplyTo, Maybe TTL, Maybe Locale); public class RedisStreamMessagesProcessor { @@ -109,6 +110,14 @@ private void ProcessStreamEntries(IEnumerable entries) { var processed = ParseStreamEntry(entry); + // TTL + var hasExpired = processed.TTL.Map(ttl => ttl < DateTimeOffset.UtcNow).GetValueOrDefault(false); + if (hasExpired) + { + _logger.LogWarning("Message {Id} has expired", processed.Id); + return; + } + if (processed.Id is not null && processed.Body is not null) { var consumers = _services.GetServices(); @@ -195,17 +204,30 @@ private async Task Read(string subject) } } + private Maybe AsMaybe(RedisValue v) + { + if (v.HasValue) + { + return Maybe.From((T)Convert.ChangeType(v, typeof(T))); + } + + return Maybe.None; + } + private ParsedStreamEntry ParseStreamEntry(StreamEntry entry) { var body = entry["Body"]; var replyTo = entry["ReplyTo"]; + var ttl = AsMaybe(entry["TTL"]).Map(x => DateTimeOffset.FromUnixTimeMilliseconds(x)); + var locale = AsMaybe(entry["Locale"]); + if (body.IsNull) { - return new ParsedStreamEntry(entry.Id, null, replyTo); + return new ParsedStreamEntry(entry.Id, null, replyTo, ttl, locale); } - return new ParsedStreamEntry(entry.Id, JsonSerializer.Deserialize(body!, _config.JsonSerializerOptions), replyTo); + return new ParsedStreamEntry(entry.Id, JsonSerializer.Deserialize(body!, _config.JsonSerializerOptions), replyTo, ttl, locale); } private async Task TryConsume(IRedisStreamConsumer consumer, TMessage message) @@ -247,7 +269,6 @@ public async Task ExecuteCrons() var cron = GetKeys("Cron:Schedule:*"); - foreach (var key in cron) { var values = db.HashGetAll(key); @@ -319,7 +340,7 @@ public async Task ReadAndPublishScheduledMessagesAsync() var distance = TimeSpan.FromMilliseconds(now - topMessage.Score); _logger.LogInformation("Processing delayed message with delayed time {Distance}", distance); - var message = JsonSerializer.Deserialize>(topMessage!.Element!, _config.JsonSerializerOptions); + var message = JsonSerializer.Deserialize>(topMessage!.Element!); if (message?.Subject is not null) { @@ -339,7 +360,8 @@ where attributes.Cast().FirstOrDefault()?.Subject == message.S throw new CannotHandleScheduledMessageException("A scheduled message was found but a class with the subject was not found in the assembly. So the message has been ignored."); } - JsonElement? concreteBodyType = message.Body as JsonElement?; + JsonElement? concreteBodyType = JsonSerializer.Deserialize(message.Body, _config.JsonSerializerOptions); + if (concreteBodyType is null) { throw new CannotHandleScheduledMessageException("Body cannot be null. As a workaround, you can use an empty object instead."); diff --git a/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs b/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs index 2608c20..ab5bd43 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs @@ -33,6 +33,68 @@ public async Task CancelAsync(RedisValue member, string? topic = null) await db.SortedSetRemoveAsync("SCHEDULED_MESSAGES", member); } + + private async Task InternalPublishAsync(TMessage message, string subject, MessageOptions options) + { + var payload = new List(); + + payload.Add(new("Body", JsonSerializer.Serialize(message, _config.JsonSerializerOptions))); + + if (options.SessionId is not null) + { + payload.Add(new("ReplyTo", $"{subject}_{options.SessionId}")); + } + + if (options.TimeToLive is not null) + { + payload.Add(new("TTL", options.TimeToLive.Value.ToUnixTimeMilliseconds().ToString())); + } + + if (options.Locale is not null) + { + payload.Add(new("Locale", options.Locale)); + } + + var db = _redis.GetDatabase(); + var redisChannel = new RedisChannel(subject, RedisChannel.PatternMode.Literal); + + var res = await db.StreamAddAsync(subject, payload.ToArray(), maxLength: 100); + + await db.PublishAsync(redisChannel, "1"); + + return res; + } + + /// + /// + /// Scheduling consists of put the message in a sorted set with the scheduled time as score + /// The value part consists of: + /// - a Guid to identify the message and then, cancel it + /// - the message body (serialized with the configured serializer) + /// - the subject + /// + /// The value part is serialized as Json with the default serializer + /// + /// + /// + /// + /// To remove a element from a set in redis, we need its value, here, the json payload. + /// We return it as an opaque RedisValue + /// + /// + private async Task InternalSchedule(TMessage body, string subject, MessageOptions options) + { + var serializedBody = JsonSerializer.Serialize(body, _config.JsonSerializerOptions); + var value = new RedisStreamScheduledMessageType(Guid.NewGuid(), serializedBody, subject); + var json = JsonSerializer.Serialize(value); + + var scheduledTime = options.ScheduledEnqueueTime!.Value!.ToUnixTimeMilliseconds(); + var db = _redis.GetDatabase(); + await db.SortedSetAddAsync("SCHEDULED_MESSAGES", json, scheduledTime); + + return json; + } + public async Task PublishAsync(TMessage message) { var subject = MessageHelper.GetSubject(); @@ -42,61 +104,43 @@ public async Task PublishAsync(TMessage message) throw new ApplicationException("Cannot publish message without subject"); } - return await PublishAsync(message, subject); + return await InternalPublishAsync(message, subject, new MessageOptions()); } public async Task PublishAsync(TMessage message, string subject) { - var db = _redis.GetDatabase(); - var res = await db.StreamAddAsync(subject, "Body", JsonSerializer.Serialize(message, _config.JsonSerializerOptions), maxLength: 100); - var redisChannel = new RedisChannel(subject, RedisChannel.PatternMode.Literal); - await db.PublishAsync(redisChannel, ""); - return res; + return await InternalPublishAsync(message, subject, new MessageOptions()); } public async Task PublishAsync(TMessage message, MessageOptions options) { - var db = _redis.GetDatabase(); var subject = MessageHelper.GetSubject(); + if (subject is null) { throw new ApplicationException("Cannot publish message without subject"); } - if (options.SessionId is not null) - { - var res = await db.StreamAddAsync(subject, new NameValueEntry[] { - new("Body", JsonSerializer.Serialize(message, _config.JsonSerializerOptions)), - new("ReplyTo", $"{subject}_{options.SessionId}") }, maxLength: 100); - - var redisChannel = new RedisChannel(subject, RedisChannel.PatternMode.Literal); - await db.PublishAsync(redisChannel, "1"); - return res; - } - - if (options.ScheduledEnqueueTime is null) + if (options.ScheduledEnqueueTime is not null) { - return await PublishAsync(message); + return await InternalSchedule(message, subject, options); } - var member = new RedisStreamScheduledMessageType(Guid.NewGuid(), message, subject); - - var json = JsonSerializer.Serialize(member, _config.JsonSerializerOptions); - var scheduledTime = options.ScheduledEnqueueTime.Value.ToUnixTimeMilliseconds(); - - await db.SortedSetAddAsync("SCHEDULED_MESSAGES", json, scheduledTime); - - return json; + return await InternalPublishAsync(message, subject, options); } - - public async Task RequestResponseAsync(T message, MessageOptions? options = null, ReplyOptions? replyOptions = null) + public async Task RequestResponseAsync(T body, MessageOptions? options = null, ReplyOptions? replyOptions = null) { var sessionId = options?.SessionId ?? Guid.NewGuid().ToString(); var subject = MessageHelper.GetSubject(); var uniqueName = $"{subject}_{sessionId}"; var db = _redis.GetDatabase(); + if (subject is null) + { + throw new ApplicationException("Cannot publish message without subject"); + } + var promise = new TaskCompletionSource(); var redisChannel = new RedisChannel(uniqueName, RedisChannel.PatternMode.Literal); @@ -107,10 +151,7 @@ public async Task RequestResponseAsync(T message, MessageOptions? optio promise.TrySetResult(response!); }); - await PublishAsync(message, new MessageOptions - { - SessionId = sessionId - }); + await InternalPublishAsync(body, subject, (options ?? new MessageOptions()) with { SessionId = sessionId }); var completed = await Task.WhenAny(promise.Task, Task.Delay(replyOptions?.Timeout ?? TimeSpan.FromSeconds(5))); diff --git a/Unitee.EventDriven.RedisStream/RedisStreamScheduledMessageType.cs b/Unitee.EventDriven.RedisStream/RedisStreamScheduledMessageType.cs index 48006ab..4a3681c 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamScheduledMessageType.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamScheduledMessageType.cs @@ -1,3 +1,3 @@ namespace Unitee.EventDriven.RedisStream.Models; -public record RedisStreamScheduledMessageType(Guid Id, T Body, string Subject); \ No newline at end of file +public record RedisStreamScheduledMessageType(Guid Id, string Body, string Subject); diff --git a/Unitee.EventDriven.RedisStream/Unitee.EventDriven.RedisStream.csproj b/Unitee.EventDriven.RedisStream/Unitee.EventDriven.RedisStream.csproj index 650d303..98ef9e9 100644 --- a/Unitee.EventDriven.RedisStream/Unitee.EventDriven.RedisStream.csproj +++ b/Unitee.EventDriven.RedisStream/Unitee.EventDriven.RedisStream.csproj @@ -6,6 +6,7 @@ + From 075579420824a5388d3064f256178a82eb545481 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Thu, 6 Jul 2023 17:35:03 +0200 Subject: [PATCH 3/7] chore: ttl -> expireAt --- .../Model/MessageOptions.cs | 2 +- .../IntegrationTests.cs | 8 ++++---- .../RedisStreamMessagesProcessor.cs | 12 ++++++------ .../RedisStreamPublisher.cs | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs b/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs index d026530..57ba68a 100644 --- a/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs +++ b/Unitee.EventDriven.Abstraction/Model/MessageOptions.cs @@ -6,6 +6,6 @@ public record MessageOptions() public DateTimeOffset? ScheduledEnqueueTime { get; init; } public string? MessageId { get; init; } public string? SessionId { get; set; } - public DateTimeOffset? TimeToLive { get; init; } + public DateTimeOffset? ExpireAt { get; init; } public string? Locale { get; init; } } diff --git a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs index 8548db3..882691b 100644 --- a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs +++ b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs @@ -399,7 +399,7 @@ public async Task Json_ShouldBeConsumedEvenWithCustomJsonOptions() [Fact] - public async Task TTL_MessageWithATtlShouldBeProcessed() + public async Task TTL_MessageWithExpiredAtShouldBeProcessed() { var db = _redis.GetDatabase(); @@ -419,7 +419,7 @@ public async Task TTL_MessageWithATtlShouldBeProcessed() var publisher = provider.GetRequiredService(); var backgroundService = provider.GetService(); - await publisher.PublishAsync(new TestEvent14("World"), new MessageOptions() { TimeToLive = DateTimeOffset.UtcNow.AddSeconds(5) }); + await publisher.PublishAsync(new TestEvent14("World"), new MessageOptions() { ExpireAt = DateTimeOffset.UtcNow.AddSeconds(5) }); await backgroundService.StartAsync(CancellationToken.None); await Task.Delay(1000); await backgroundService.StopAsync(CancellationToken.None); @@ -429,7 +429,7 @@ public async Task TTL_MessageWithATtlShouldBeProcessed() } [Fact] - public async Task TTL_MessageWithAExpiredTTLShouldNotBeProcessed() + public async Task TTL_MessageWithAExpiredAtExpiredShouldNotBeProcessed() { var db = _redis.GetDatabase(); @@ -448,7 +448,7 @@ public async Task TTL_MessageWithAExpiredTTLShouldNotBeProcessed() var publisher = provider.GetRequiredService(); var backgroundService = provider.GetService(); - await publisher.PublishAsync(new TestEvent15("World"), new MessageOptions() { TimeToLive = DateTimeOffset.UtcNow.AddSeconds(1) }); + await publisher.PublishAsync(new TestEvent15("World"), new MessageOptions() { ExpireAt = DateTimeOffset.UtcNow.AddSeconds(1) }); await Task.Delay(2000); await backgroundService.StartAsync(CancellationToken.None); await Task.Delay(1000); diff --git a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs index 3c31991..956c88b 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs @@ -12,7 +12,7 @@ namespace Unitee.EventDriven.RedisStream; -public record ParsedStreamEntry(string? Id, object? Body, string? ReplyTo, Maybe TTL, Maybe Locale); +public record ParsedStreamEntry(string? Id, object? Body, string? ReplyTo, Maybe ExpireAt, Maybe Locale); public class RedisStreamMessagesProcessor { @@ -110,8 +110,8 @@ private void ProcessStreamEntries(IEnumerable entries) { var processed = ParseStreamEntry(entry); - // TTL - var hasExpired = processed.TTL.Map(ttl => ttl < DateTimeOffset.UtcNow).GetValueOrDefault(false); + // Expiration + var hasExpired = processed.ExpireAt.Map(dt => dt < DateTimeOffset.UtcNow).GetValueOrDefault(false); if (hasExpired) { _logger.LogWarning("Message {Id} has expired", processed.Id); @@ -219,15 +219,15 @@ private ParsedStreamEntry ParseStreamEntry(StreamEntry entry) var body = entry["Body"]; var replyTo = entry["ReplyTo"]; - var ttl = AsMaybe(entry["TTL"]).Map(x => DateTimeOffset.FromUnixTimeMilliseconds(x)); + var expireAt = AsMaybe(entry["ExpireAt"]).Map(x => DateTimeOffset.FromUnixTimeMilliseconds(x)); var locale = AsMaybe(entry["Locale"]); if (body.IsNull) { - return new ParsedStreamEntry(entry.Id, null, replyTo, ttl, locale); + return new ParsedStreamEntry(entry.Id, null, replyTo, expireAt, locale); } - return new ParsedStreamEntry(entry.Id, JsonSerializer.Deserialize(body!, _config.JsonSerializerOptions), replyTo, ttl, locale); + return new ParsedStreamEntry(entry.Id, JsonSerializer.Deserialize(body!, _config.JsonSerializerOptions), replyTo, expireAt, locale); } private async Task TryConsume(IRedisStreamConsumer consumer, TMessage message) diff --git a/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs b/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs index ab5bd43..803b4cc 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamPublisher.cs @@ -45,9 +45,9 @@ private async Task InternalPublishAsync(TMessage message, payload.Add(new("ReplyTo", $"{subject}_{options.SessionId}")); } - if (options.TimeToLive is not null) + if (options.ExpireAt is not null) { - payload.Add(new("TTL", options.TimeToLive.Value.ToUnixTimeMilliseconds().ToString())); + payload.Add(new("ExpireAt", options.ExpireAt.Value.ToUnixTimeMilliseconds().ToString())); } if (options.Locale is not null) From 05c4027d36591a20f2ace09dc70cf621bd5d0068 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Fri, 7 Jul 2023 10:05:44 +0200 Subject: [PATCH 4/7] feat(locale): start injecting custom data withing ConsumeWithContext. --- .../Abstractions/IConsumer.cs | 5 ++-- .../Abstractions/IMessageContext.cs | 2 +- .../Abstractions/IMessageHandler.cs | 11 +++++--- .../Unitee.EventDriven.Abstraction.csproj | 1 + .../RedisStreamConsumer.cs | 26 +++++++++++++++---- .../RedisStreamMessageContextFactory.cs | 20 +++++++++++--- .../RedisStreamMessagesProcessor.cs | 8 +++--- 7 files changed, 55 insertions(+), 18 deletions(-) diff --git a/Unitee.EventDriven.Abstraction/Abstractions/IConsumer.cs b/Unitee.EventDriven.Abstraction/Abstractions/IConsumer.cs index a2f2f20..9ae5877 100644 --- a/Unitee.EventDriven.Abstraction/Abstractions/IConsumer.cs +++ b/Unitee.EventDriven.Abstraction/Abstractions/IConsumer.cs @@ -1,8 +1,8 @@ namespace Unitee.EventDriven.Abstraction; /// -/// Juste pour pouvoir récupérer la liste de tous les consumers depuis l'injection de dépendances. -/// Ne doit jamais être implémenté directement sans passer par IConsummer. +/// Used to mark a class as a consumer. +/// Should never be implemented directly without passing through IConsummer / IConsumerWithContext or IConsumerWithMetadata /// public interface IConsumer { } @@ -15,3 +15,4 @@ public interface IConsumer : IConsumer { public Task ConsumeAsync(T message); } + diff --git a/Unitee.EventDriven.Abstraction/Abstractions/IMessageContext.cs b/Unitee.EventDriven.Abstraction/Abstractions/IMessageContext.cs index a7f3309..cd624b5 100644 --- a/Unitee.EventDriven.Abstraction/Abstractions/IMessageContext.cs +++ b/Unitee.EventDriven.Abstraction/Abstractions/IMessageContext.cs @@ -3,4 +3,4 @@ namespace Unitee.EventDriven.Abstraction; public interface IMessageContext { public Task ReplyAsync(TMessage message); -} \ No newline at end of file +} diff --git a/Unitee.EventDriven.Abstraction/Abstractions/IMessageHandler.cs b/Unitee.EventDriven.Abstraction/Abstractions/IMessageHandler.cs index 6f2fdfa..cc2e230 100644 --- a/Unitee.EventDriven.Abstraction/Abstractions/IMessageHandler.cs +++ b/Unitee.EventDriven.Abstraction/Abstractions/IMessageHandler.cs @@ -1,12 +1,17 @@ namespace Unitee.EventDriven.Abstraction; + +/// +/// +/// Used for Azure Service Bus messages only +/// public interface IMessageHandler { /// - /// Appelle le bon consumer enregistré pour le message de type T + /// Route to the right consumer. /// /// - /// True si un consumer a été trouvé et appelé, false sinon. + /// True if the message was handled, false otherwise. /// public Task HandleAsync(T originalMessage); -} \ No newline at end of file +} diff --git a/Unitee.EventDriven.Abstraction/Unitee.EventDriven.Abstraction.csproj b/Unitee.EventDriven.Abstraction/Unitee.EventDriven.Abstraction.csproj index 3c8e4ff..0aa111a 100644 --- a/Unitee.EventDriven.Abstraction/Unitee.EventDriven.Abstraction.csproj +++ b/Unitee.EventDriven.Abstraction/Unitee.EventDriven.Abstraction.csproj @@ -26,6 +26,7 @@ + diff --git a/Unitee.EventDriven.RedisStream/RedisStreamConsumer.cs b/Unitee.EventDriven.RedisStream/RedisStreamConsumer.cs index 0f0f0a9..41aad46 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamConsumer.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamConsumer.cs @@ -1,29 +1,45 @@ using System.Text.Json; +using CSharpFunctionalExtensions; +using Microsoft.Extensions.Logging; using StackExchange.Redis; using Unitee.EventDriven.Abstraction; namespace Unitee.EventDriven.RedisStream; +/// Precised interfaces for RedisStream public interface IRedisStreamConsumer : IConsumer { } - public interface IRedisStreamMessageContext : IMessageContext { }; - public interface IRedisStreamConsumerWithContext : IConsumerWithContext { } + +/// +/// This is created by the context factory when calling a IRedisStreamConsumerWithContext +/// public class RedisStreamMessageContext : IRedisStreamMessageContext { - private readonly string _replyTo; + private readonly Maybe _replyTo; private readonly IConnectionMultiplexer _redis; + private readonly ILogger _logger; - public RedisStreamMessageContext(IRedisStreamPublisher _, IConnectionMultiplexer redis, string replyTo) + public Maybe Locale { get; } + + public RedisStreamMessageContext(ILoggerFactory loggerFactory, IRedisStreamPublisher _, IConnectionMultiplexer redis, Maybe replyTo, Maybe locale) { _replyTo = replyTo; _redis = redis; + _logger = _logger = loggerFactory.CreateLogger(); + Locale = locale; } public async Task ReplyAsync(TMessage message) { - var redisChannel = new RedisChannel(_replyTo, RedisChannel.PatternMode.Literal); + if (_replyTo.HasNoValue) + { + _logger.LogError("You cannot because reply to this message because the value: replyTo is not defined. Please be sure to publish the message with RequestReply"); + return false; + } + + var redisChannel = new RedisChannel(_replyTo.GetValueOrThrow(), RedisChannel.PatternMode.Literal); await _redis.GetSubscriber().PublishAsync(redisChannel, JsonSerializer.Serialize(message)); return true; } diff --git a/Unitee.EventDriven.RedisStream/RedisStreamMessageContextFactory.cs b/Unitee.EventDriven.RedisStream/RedisStreamMessageContextFactory.cs index c33e18e..1a70605 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamMessageContextFactory.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamMessageContextFactory.cs @@ -1,20 +1,32 @@ +using CSharpFunctionalExtensions; +using Microsoft.Extensions.Logging; using StackExchange.Redis; namespace Unitee.EventDriven.RedisStream; +/// Injected automatically by the DI container public class RedisStreamMessageContextFactory { private readonly IRedisStreamPublisher _publisher; private readonly IConnectionMultiplexer _redis; + private readonly ILoggerFactory _loggerFactory; - public RedisStreamMessageContextFactory(IRedisStreamPublisher publisher, IConnectionMultiplexer redis) + public RedisStreamMessageContextFactory(IRedisStreamPublisher publisher, IConnectionMultiplexer redis, ILoggerFactory loggerFactory) { _publisher = publisher; _redis = redis; + _loggerFactory = loggerFactory; } - public IRedisStreamMessageContext Create(string replyTo) + /// TODO: Wrap reply to and locale in a record or tuple or something since it will probably grow + public IRedisStreamMessageContext Create(Maybe replyTo, Maybe locale) { - return new RedisStreamMessageContext(_publisher, _redis, replyTo); + return new RedisStreamMessageContext( + _loggerFactory, + _publisher, + _redis, + replyTo, + locale + ); } -} \ No newline at end of file +} diff --git a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs index 956c88b..f1029b0 100644 --- a/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs +++ b/Unitee.EventDriven.RedisStream/RedisStreamMessagesProcessor.cs @@ -122,6 +122,7 @@ private void ProcessStreamEntries(IEnumerable entries) { var consumers = _services.GetServices(); + // Classics consumers var matchedConsumers = consumers.Where(c => MessageHelper.GetSubject(c.GetType()?.GetInterface("IRedisStreamConsumer`1")?.GenericTypeArguments?[0]) == MessageHelper.GetSubject()) .ToList(); @@ -153,6 +154,7 @@ private void ProcessStreamEntries(IEnumerable entries) taskList.Add(task); } + // Consumers with context var matchedConsumersWithContext = consumers .Where(c => MessageHelper.GetSubject( c.GetType()?.GetInterface("IRedisStreamConsumerWithContext`1")?.GenericTypeArguments?[0]) == MessageHelper.GetSubject()) @@ -164,7 +166,7 @@ private void ProcessStreamEntries(IEnumerable entries) { try { - await TryConsumeWithContext((dynamic)consumer, (TMessage)processed.Body, processed.ReplyTo); + await TryConsumeWithContext((dynamic)consumer, (TMessage)processed.Body, processed.ReplyTo, processed.Locale); await db.StreamAcknowledgeAsync(subject, _serviceName, processed.Id); } catch (Exception e) @@ -237,10 +239,10 @@ private async Task TryConsume(IRedisStreamConsumer con return true; } - private async Task TryConsumeWithContext(IRedisStreamConsumerWithContext consumer, TMessage message, string replyTo) + private async Task TryConsumeWithContext(IRedisStreamConsumerWithContext consumer, TMessage message, string replyTo, Maybe locale = default) { _logger.LogInformation("Consuming message {Payload} with subject: {Type}", message, MessageHelper.GetSubject()); - var ctx = _msgContextFactory.Create(replyTo); + var ctx = _msgContextFactory.Create(replyTo, locale); await consumer.ConsumeAsync(message, ctx); return true; } From 77316271a4ac7d22c2c52f7dd0ae06fd7f858b25 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Fri, 7 Jul 2023 10:24:35 +0200 Subject: [PATCH 5/7] feat: add tests --- .../EventsFixtures.cs | 4 ++ .../IntegrationTests.cs | 49 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs b/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs index 9f735c8..77ecd34 100644 --- a/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs +++ b/Unitee.EventDriven.RedisStream.Tests/EventsFixtures.cs @@ -32,6 +32,10 @@ public record TestEvent13(string ATestString); public record TestEvent14(string ATestString); [Subject("TEST_EVENT_15")] public record TestEvent15(string ATestString); +[Subject("TEST_EVENT_16")] +public record TestEvent16(string ATestString); +[Subject("TEST_EVENT_17")] +public record TestEvent17(string ATestString); [Subject("DEAD_LETTER")] diff --git a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs index 882691b..c9ec52c 100644 --- a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs +++ b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs @@ -458,6 +458,55 @@ public async Task TTL_MessageWithAExpiredAtExpiredShouldNotBeProcessed() consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent15("World")), Times.Never()); } + [Fact] + public async Task Local_IfALocaleHasBeenSpecifiedItShouldBeAvailable() + { + var db = _redis.GetDatabase(); + + var services = GetServices(Guid.NewGuid().ToString()); + var consumerInstance = new Mock>(); + services.AddTransient(x => consumerInstance.Object); + + var provider = services.BuildServiceProvider(); + var publisher = provider.GetRequiredService(); + var backgroundService = provider.GetService(); + + await backgroundService.StartAsync(CancellationToken.None); + await Task.Delay(100); + await publisher.PublishAsync(new TestEvent16("World"), new MessageOptions() { Locale = "en-US" }); + await Task.Delay(500); + await backgroundService.StopAsync(CancellationToken.None); + + db.KeyDelete("TEST_EVENT_16"); + + consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent16("World"), It.Is(x => x.Locale == "en-US")), Times.Once()); + } + + [Fact] + public async Task Local_AMessageWithoutReplyCantBeReplyed() + { + var db = _redis.GetDatabase(); + + var services = GetServices(Guid.NewGuid().ToString()); + var consumerInstance = new Mock>(); + services.AddTransient(x => consumerInstance.Object); + + var provider = services.BuildServiceProvider(); + var publisher = provider.GetRequiredService(); + var backgroundService = provider.GetService(); + + await backgroundService.StartAsync(CancellationToken.None); + await Task.Delay(100); + await publisher.PublishAsync(new TestEvent17("World"), new MessageOptions() { Locale = "en-US" }); + await Task.Delay(500); + await backgroundService.StopAsync(CancellationToken.None); + + db.KeyDelete("TEST_EVENT_17"); + + consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent17("World"), It.Is(x => x.Locale == "en-US")), Times.Once()); + } + + } From 5d41ab2fb2df3ef9bacdeb4ed24ac9c565688557 Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Fri, 7 Jul 2023 10:29:23 +0200 Subject: [PATCH 6/7] chore: removing test 17 --- .../IntegrationTests.cs | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs index c9ec52c..704cfc5 100644 --- a/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs +++ b/Unitee.EventDriven.RedisStream.Tests/IntegrationTests.cs @@ -481,32 +481,4 @@ public async Task Local_IfALocaleHasBeenSpecifiedItShouldBeAvailable() consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent16("World"), It.Is(x => x.Locale == "en-US")), Times.Once()); } - - [Fact] - public async Task Local_AMessageWithoutReplyCantBeReplyed() - { - var db = _redis.GetDatabase(); - - var services = GetServices(Guid.NewGuid().ToString()); - var consumerInstance = new Mock>(); - services.AddTransient(x => consumerInstance.Object); - - var provider = services.BuildServiceProvider(); - var publisher = provider.GetRequiredService(); - var backgroundService = provider.GetService(); - - await backgroundService.StartAsync(CancellationToken.None); - await Task.Delay(100); - await publisher.PublishAsync(new TestEvent17("World"), new MessageOptions() { Locale = "en-US" }); - await Task.Delay(500); - await backgroundService.StopAsync(CancellationToken.None); - - db.KeyDelete("TEST_EVENT_17"); - - consumerInstance.Verify(x => x.ConsumeAsync(new TestEvent17("World"), It.Is(x => x.Locale == "en-US")), Times.Once()); - } - - - - } From 08209b62b8fc2e1708a60f514f281d29163f145b Mon Sep 17 00:00:00 2001 From: Titouan CREACH Date: Fri, 7 Jul 2023 10:36:41 +0200 Subject: [PATCH 7/7] doc: update --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 7be83d5..145230a 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ For now, we mainly focus on Redis as an event store because: - Scheduling messages - Treat pending messages at start - Recurring task (cron) + - Localized messages # How to use @@ -144,13 +145,15 @@ services.AddTransient(); All consumers should be added using `AddTransient`. So, they all have their own scope since they are executed concurrently. -If you want to your consumer to be able to reply, then, implement `IRedisStreamConsumerWithContext` instead. +If you want to your consumer to be able to reply or access metadata of the message, then, implement `IRedisStreamConsumerWithContext` instead. ```csharp -public class UserRegisteredConsumer : IRedisStreamConsumeWithContext +public class UserRegisteredConsumer : IRedisStreamConsumeWithContext // Use object or anything if you didn't plan to respond to the message. { public async Task ConsumeAsync(UserRegistered message, IRedisStreamMessageContext context) { + _logger.LogInformation(context.Locale); + await _email.Send(message.Email); await context.ReplyAsync(new MyResponse());