diff --git a/Brighter.sln b/Brighter.sln index d96bbe130d..75beff5339 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.28922.388 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.32112.339 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{235DE1F1-E71B-4817-8E27-3B34FF006E4C}" EndProject @@ -223,17 +223,17 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsWorker", "samples\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.PostgreSql", "src\Paramore.Brighter.PostgreSql\Paramore.Brighter.PostgreSql.csproj", "{08E6D0F8-B6CE-454F-8761-77731D99F743}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.PostgreSql.EntityFrameworkCore", "src\Paramore.Brighter.PostgreSql.EntityFrameworkCore\Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj", "{AA85493A-4120-4DA0-BAA5-CBF34D238A64}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.PostgreSql.EntityFrameworkCore", "src\Paramore.Brighter.PostgreSql.EntityFrameworkCore\Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj", "{AA85493A-4120-4DA0-BAA5-CBF34D238A64}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GreetingsWeb", "samples\WebAPI_EFCore\GreetingsWeb\GreetingsWeb.csproj", "{BC4C3DF7-52C0-41EA-98DD-70E402AD5116}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsWeb", "samples\WebAPI_EFCore\GreetingsWeb\GreetingsWeb.csproj", "{BC4C3DF7-52C0-41EA-98DD-70E402AD5116}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SalutationPorts", "samples\WebAPI_EFCore\SalutationPorts\SalutationPorts.csproj", "{5A6A95C0-F82E-49DD-B4C6-98D7A765ECFF}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SalutationPorts", "samples\WebAPI_EFCore\SalutationPorts\SalutationPorts.csproj", "{5A6A95C0-F82E-49DD-B4C6-98D7A765ECFF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SalutationEntities", "samples\WebAPI_EFCore\SalutationEntities\SalutationEntities.csproj", "{F701369D-EDA3-4407-8655-6B81DD6EBCBA}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SalutationEntities", "samples\WebAPI_EFCore\SalutationEntities\SalutationEntities.csproj", "{F701369D-EDA3-4407-8655-6B81DD6EBCBA}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Salutations_MySqlMigrations", "samples\WebAPI_EFCore\Salutations_MySqlMigrations\Salutations_MySqlMigrations.csproj", "{82E64F30-8D74-4E01-A974-5A78EBAD916C}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Salutations_MySqlMigrations", "samples\WebAPI_EFCore\Salutations_MySqlMigrations\Salutations_MySqlMigrations.csproj", "{82E64F30-8D74-4E01-A974-5A78EBAD916C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Salutations_SqliteMigrations", "samples\WebAPI_EFCore\Salutations_SqliteMigrations\Salutations_SqliteMigrations.csproj", "{05647D1B-87A3-4440-B468-82866B206E49}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Salutations_SqliteMigrations", "samples\WebAPI_EFCore\Salutations_SqliteMigrations\Salutations_SqliteMigrations.csproj", "{05647D1B-87A3-4440-B468-82866B206E49}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1229,6 +1229,30 @@ Global {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|Mixed Platforms.Build.0 = Release|Any CPU {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.ActiveCfg = Release|Any CPU {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.Build.0 = Release|Any CPU {BC4C3DF7-52C0-41EA-98DD-70E402AD5116}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {BC4C3DF7-52C0-41EA-98DD-70E402AD5116}.Debug|Any CPU.Build.0 = Debug|Any CPU {BC4C3DF7-52C0-41EA-98DD-70E402AD5116}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs index 9b09195856..bd6731bc71 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs @@ -36,19 +36,31 @@ private void DoWork(object state) s_logger.LogInformation("Outbox Sweeper looking for unsent messages"); var scope = _serviceScopeFactory.CreateScope(); - IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService(); + try + { + IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService(); - var outBoxSweeper = new OutboxSweeper( - milliSecondsSinceSent: _options.MinimumMessageAge, - commandProcessor: commandProcessor, - _options.BatchSize, - _options.UseBulk); - - if(_options.UseBulk) - outBoxSweeper.SweepAsync(CancellationToken.None).RunSynchronously(); - else - outBoxSweeper.Sweep(); + var outBoxSweeper = new OutboxSweeper( + millisecondsSinceSent: _options.MinimumMessageAge, + commandProcessor: commandProcessor, + _options.BatchSize, + _options.UseBulk); + if (_options.UseBulk) + outBoxSweeper.SweepAsyncOutbox(); + else + outBoxSweeper.Sweep(); + } + catch (Exception e) + { + s_logger.LogError(e, "Error while sweeping the outbox."); + throw; + } + finally + { + scope.Dispose(); + } + s_logger.LogInformation("Outbox Sweeper sleeping"); } diff --git a/src/Paramore.Brighter/ExternalBusServices.cs b/src/Paramore.Brighter/ExternalBusServices.cs index 9e64c0483b..b6b9a059d8 100644 --- a/src/Paramore.Brighter/ExternalBusServices.cs +++ b/src/Paramore.Brighter/ExternalBusServices.cs @@ -26,13 +26,13 @@ internal class ExternalBusServices : IDisposable internal IAmAProducerRegistry ProducerRegistry { get; set; } - private static SemaphoreSlim _clearSemaphoreToken = new SemaphoreSlim(1, 1); + private static readonly SemaphoreSlim _clearSemaphoreToken = new SemaphoreSlim(1, 1); + private static readonly SemaphoreSlim _backgroundClearSemaphoreToken = new SemaphoreSlim(1, 1); private DateTime _lastOutStandingMessageCheckAt = DateTime.UtcNow; //Used to checking the limit on outstanding messages for an Outbox. We throw at that point. Writes to the static bool should be made thread-safe by locking the object private readonly object _checkOutStandingMessagesObject = new object(); - private readonly object _implicitClearMessagesObject = new object(); //Uses -1 to indicate no outbox and will thus force a throw on a failed publish private int _outStandingCount; @@ -198,7 +198,7 @@ internal void ClearOutbox(int amountToClear, int minimumAge, bool useAsync, bool private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge) { - if (Monitor.TryEnter(_implicitClearMessagesObject)) + if (await _backgroundClearSemaphoreToken.WaitAsync(TimeSpan.Zero)) { await _clearSemaphoreToken.WaitAsync(CancellationToken.None); try @@ -209,26 +209,35 @@ private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge Dispatch(messages); s_logger.LogInformation("Messages have been cleared"); } + catch (Exception e) + { + s_logger.LogError(e, "Error while dispatching from outbox"); + } finally { _clearSemaphoreToken.Release(); - Monitor.Exit(_implicitClearMessagesObject); + _backgroundClearSemaphoreToken.Release(); } CheckOutstandingMessages(); } + else + { + s_logger.LogInformation("Skipping dispatch of messages as another thread is running"); + } } private async Task BackgroundDispatchUsingAsync(int amountToClear, int minimumAge, bool useBulk) { - if (Monitor.TryEnter(_implicitClearMessagesObject)) + + if (await _backgroundClearSemaphoreToken.WaitAsync(TimeSpan.Zero)) { await _clearSemaphoreToken.WaitAsync(CancellationToken.None); try { var messages = await AsyncOutbox.OutstandingMessagesAsync(minimumAge, amountToClear); - + s_logger.LogInformation("Found {NumberOfMessages} to clear out of amount {AmountToClear}", messages.Count(), amountToClear); @@ -236,17 +245,25 @@ private async Task BackgroundDispatchUsingAsync(int amountToClear, int minimumAg await BulkDispatchAsync(messages, CancellationToken.None); else await DispatchAsync(messages, false, CancellationToken.None); - + s_logger.LogInformation("Messages have been cleared"); } + catch (Exception e) + { + s_logger.LogError(e, "Error while dispatching from outbox"); + } finally { _clearSemaphoreToken.Release(); - Monitor.Exit(_implicitClearMessagesObject); + _backgroundClearSemaphoreToken.Release(); } CheckOutstandingMessages(); } + else + { + s_logger.LogInformation("Skipping dispatch of messages as another thread is running"); + } } private void Dispatch(IEnumerable posts) @@ -323,7 +340,7 @@ private async Task BulkDispatchAsync(IEnumerable posts, CancellationTok foreach (var topicBatch in messagesByTopic) { - var producer = ProducerRegistry.LookupBy(topicBatch.Key); + var producer = ProducerRegistry.LookupByOrDefault(topicBatch.Key); if (producer is IAmABulkMessageProducerAsync bulkMessageProducer) { diff --git a/src/Paramore.Brighter/OutboxSweeper.cs b/src/Paramore.Brighter/OutboxSweeper.cs index c0b9a63280..aa69330bc8 100644 --- a/src/Paramore.Brighter/OutboxSweeper.cs +++ b/src/Paramore.Brighter/OutboxSweeper.cs @@ -1,13 +1,8 @@ -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace Paramore.Brighter +namespace Paramore.Brighter { public class OutboxSweeper { - private readonly int _milliSecondsSinceSent; + private readonly int _millisecondsSinceSent; private readonly IAmACommandProcessor _commandProcessor; private readonly int _batchSize; private readonly bool _useBulk; @@ -15,29 +10,33 @@ public class OutboxSweeper /// /// This sweeper clears an outbox of any outstanding messages within the time interval /// - /// How long can a message sit in the box before we attempt to resend + /// How long can a message sit in the box before we attempt to resend /// Who should post the messages /// The maximum number of messages to dispatch. /// Use the producers bulk dispatch functionality. - public OutboxSweeper(int milliSecondsSinceSent, IAmACommandProcessor commandProcessor, int batchSize = 100, + public OutboxSweeper(int millisecondsSinceSent, IAmACommandProcessor commandProcessor, int batchSize = 100, bool useBulk = false) { - _milliSecondsSinceSent = milliSecondsSinceSent; + _millisecondsSinceSent = millisecondsSinceSent; _commandProcessor = commandProcessor; _batchSize = batchSize; _useBulk = useBulk; } + /// + /// Dispatches the oldest un-dispatched messages from the outbox in a background thread. + /// public void Sweep() { - _commandProcessor.ClearOutbox(_batchSize, _milliSecondsSinceSent); + _commandProcessor.ClearOutbox(_batchSize, _millisecondsSinceSent); } - public Task SweepAsync(CancellationToken cancellationToken = default) + /// + /// Dispatches the oldest un-dispatched messages from the asynchronous outbox in a background thread. + /// + public void SweepAsyncOutbox() { - _commandProcessor.ClearAsyncOutbox(_batchSize, _milliSecondsSinceSent, _useBulk); - - return Task.CompletedTask; + _commandProcessor.ClearAsyncOutbox(_batchSize, _millisecondsSinceSent, _useBulk); } } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs index b659cdcfd9..b2c05648c6 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducer.cs @@ -30,15 +30,14 @@ THE SOFTWARE. */ namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles { - public class FakeMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync, ISupportPublishConfirmation, IAmABulkMessageProducerAsync + public class FakeMessageProducer : IAmAMessageProducerSync, IAmAMessageProducerAsync, IAmABulkMessageProducerAsync { - public event Action OnMessagePublished; public int MaxOutStandingMessages { get; set; } = -1; public int MaxOutStandingCheckIntervalMilliSeconds { get; set; } = 0; public List SentMessages = new List(); public bool MessageWasSent { get; set; } - + public void Dispose() { } public Task SendAsync(Message message) @@ -52,7 +51,7 @@ public async IAsyncEnumerable SendAsync(IEnumerable messages, [ { foreach (var msg in messages) { - yield return new[] {msg.Id}; + yield return new[] { msg.Id }; } MessageWasSent = true; SentMessages.AddRange(messages); @@ -62,13 +61,11 @@ public void Send(Message message) { MessageWasSent = true; SentMessages.Add(message); - OnMessagePublished?.Invoke(true, message.Id); } - + public void SendWithDelay(Message message, int delayMilliseconds = 0) { Send(message); } - - } + } } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducerWithPublishConfirmation.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducerWithPublishConfirmation.cs new file mode 100644 index 0000000000..ad20ce3ec7 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeMessageProducerWithPublishConfirmation.cs @@ -0,0 +1,40 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2015 Toby Henderson + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles +{ + public class FakeMessageProducerWithPublishConfirmation : FakeMessageProducer, ISupportPublishConfirmation + { + public event Action OnMessagePublished; + + public new void Send(Message message) + { + MessageWasSent = true; + SentMessages.Add(message); + OnMessagePublished?.Invoke(true, message.Id); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs index bb47b3bd0b..7e6b2110b2 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/TestDoubles/FakeOutboxSync.cs @@ -122,10 +122,13 @@ public Task MarkDispatchedAsync(Guid id, DateTime? dispatchedAt = null, Dictiona return tcs.Task; } - public Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, + public async Task MarkDispatchedAsync(IEnumerable ids, DateTime? dispatchedAt = null, Dictionary args = null, CancellationToken cancellationToken = default(CancellationToken)) { - throw new NotImplementedException(); + foreach (var id in ids) + { + await MarkDispatchedAsync(id, dispatchedAt, args, cancellationToken); + } } public Task> OutstandingMessagesAsync(double millSecondsSinceSent, int pageSize = 100, int pageNumber = 1, diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs index 0fb5ffb4be..6d9787d0c7 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Bulk_Clearing_The_PostBox_On_The_Command_Processor _Async.cs @@ -43,7 +43,7 @@ public class CommandProcessorPostBoxBulkClearAsyncTests : IDisposable private readonly Message _message; private readonly Message _message2; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorPostBoxBulkClearAsyncTests() { @@ -51,7 +51,7 @@ public CommandProcessorPostBoxBulkClearAsyncTests() var myCommand2 = new MyCommand { Value = "Hello World 2" }; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); var topic = "MyCommand"; var topic2 = "MyCommand2"; @@ -82,7 +82,7 @@ public CommandProcessorPostBoxBulkClearAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutboxSync, - new ProducerRegistry(new Dictionary() { { topic, _fakeMessageProducer }, { topic2, _fakeMessageProducer } })); + new ProducerRegistry(new Dictionary() { { topic, _fakeMessageProducerWithPublishConfirmation }, { topic2, _fakeMessageProducerWithPublishConfirmation } })); } [Fact] @@ -96,15 +96,15 @@ public async Task When_Clearing_The_PostBox_On_The_Command_Processor_Async() await Task.Delay(1000); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); - var sentMessage = _fakeMessageProducer.SentMessages[0]; + var sentMessage = _fakeMessageProducerWithPublishConfirmation.SentMessages[0]; sentMessage.Should().NotBeNull(); sentMessage.Id.Should().Be(_message.Id); sentMessage.Header.Topic.Should().Be(_message.Header.Topic); sentMessage.Body.Value.Should().Be(_message.Body.Value); - var sentMessage2 = _fakeMessageProducer.SentMessages[1]; + var sentMessage2 = _fakeMessageProducerWithPublishConfirmation.SentMessages[1]; sentMessage2.Should().NotBeNull(); sentMessage2.Id.Should().Be(_message2.Id); sentMessage2.Header.Topic.Should().Be(_message2.Header.Topic); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor.cs index c0e7c23f08..b16248d358 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor.cs @@ -16,14 +16,14 @@ public class CommandProcessorCallTests : IDisposable private readonly CommandProcessor _commandProcessor; private readonly MyRequest _myRequest = new MyRequest(); private readonly Message _message; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorCallTests() { _myRequest.RequestValue = "Hello World"; - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyRequest"; var header = new MessageHeader( @@ -76,7 +76,7 @@ public CommandProcessorCallTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, new InMemoryOutbox(), - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},}), + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},}), replySubs, responseChannelFactory: inMemoryChannelFactory); @@ -91,10 +91,10 @@ public void When_Calling_A_Server_Via_The_Command_Processor() _commandProcessor.Call(_myRequest, 500); //should send a message via the messaging gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); //should convert the command into a message - _fakeMessageProducer.SentMessages[0].Should().Be(_message); + _fakeMessageProducerWithPublishConfirmation.SentMessages[0].Should().Be(_message); //should forward response to a handler MyResponseHandler.ShouldReceive(new MyResponse(_myRequest.ReplyAddress) {Id = _myRequest.Id}); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs index 783a4e6039..b592ac0ccd 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_In_Mapper.cs @@ -54,7 +54,7 @@ public CommandProcessorNoInMapperTests() }, messageMapperRegistry, new InMemoryOutbox(), - new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducer()},}), + new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducerWithPublishConfirmation()},}), replySubscriptions, responseChannelFactory: new InMemoryChannelFactory()); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs index 11a2870b01..7c7c9b6291 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Out_Mapper.cs @@ -57,7 +57,7 @@ public CommandProcessorMissingOutMapperTests() }, messageMapperRegistry, new InMemoryOutbox(), - new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducer()},}), + new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducerWithPublishConfirmation()},}), replySubs, responseChannelFactory: new InMemoryChannelFactory()); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs index a97d384021..2ce925cd7b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Calling_A_Server_Via_The_Command_Processor_With_No_Timeout.cs @@ -60,7 +60,7 @@ public CommandProcessorCallTestsNoTimeout() }, messageMapperRegistry, new InMemoryOutbox(), - new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducer()},}), + new ProducerRegistry(new Dictionary() {{"MyRequest", new FakeMessageProducerWithPublishConfirmation()},}), replySubs, responseChannelFactory: new InMemoryChannelFactory()); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs index 6fde860cab..ca1ed6b83e 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor _Async.cs @@ -41,14 +41,14 @@ public class CommandProcessorPostBoxClearAsyncTests : IDisposable private readonly CommandProcessor _commandProcessor; private readonly Message _message; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorPostBoxClearAsyncTests() { var myCommand = new MyCommand{ Value = "Hello World"}; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -72,7 +72,7 @@ public CommandProcessorPostBoxClearAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutboxSync, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] @@ -83,9 +83,9 @@ public async Task When_Clearing_The_PostBox_On_The_Command_Processor_Async() await _commandProcessor.ClearOutboxAsync(new []{_message.Id}); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); - var sentMessage = _fakeMessageProducer.SentMessages.FirstOrDefault(); + var sentMessage = _fakeMessageProducerWithPublishConfirmation.SentMessages.FirstOrDefault(); sentMessage.Should().NotBeNull(); sentMessage.Id.Should().Be(_message.Id); sentMessage.Header.Topic.Should().Be(_message.Header.Topic); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor.cs index 99e5910cad..9084d606eb 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Clearing_The_PostBox_On_The_Command_Processor.cs @@ -40,14 +40,14 @@ public class CommandProcessorPostBoxClearTests : IDisposable private readonly CommandProcessor _commandProcessor; private readonly Message _message; private readonly FakeOutboxSync _fakeOutbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorPostBoxClearTests() { var myCommand = new MyCommand{ Value = "Hello World"}; _fakeOutbox = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); var topic = "MyCommand"; _message = new Message( @@ -71,7 +71,7 @@ public CommandProcessorPostBoxClearTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] @@ -82,9 +82,9 @@ public void When_Clearing_The_PostBox_On_The_Command_Processor() _commandProcessor.ClearOutbox(_message.Id); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); - var sentMessage = _fakeMessageProducer.SentMessages.FirstOrDefault(); + var sentMessage = _fakeMessageProducerWithPublishConfirmation.SentMessages.FirstOrDefault(); sentMessage.Should().NotBeNull(); sentMessage.Id.Should().Be(_message.Id); sentMessage.Header.Topic.Should().Be(_message.Header.Topic); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_Store.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_Store.cs index 6d88d2add5..4f2d852bc7 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_Store.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_Store.cs @@ -20,14 +20,14 @@ public class CommandProcessorDepositPostTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly FakeOutboxSync _fakeOutbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorDepositPostTests() { _myCommand.Value = "Hello World"; _fakeOutbox = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -51,7 +51,7 @@ public CommandProcessorDepositPostTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } @@ -64,7 +64,7 @@ public void When_depositing_a_message_in_the_outbox() //assert //message should not be posted - _fakeMessageProducer.MessageWasSent.Should().BeFalse(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeFalse(); //message should correspond to the command var depositedPost = _fakeOutbox.Get(postedMessageId); diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_StoreAsync.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_StoreAsync.cs index 9333a081b5..39cb09116c 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_StoreAsync.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Depositing_A_Message_In_The_Message_StoreAsync.cs @@ -20,14 +20,14 @@ public class CommandProcessorDepositPostTestsAsync: IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorDepositPostTestsAsync() { _myCommand.Value = "Hello World"; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); var topic = "MyCommand"; _message = new Message( @@ -52,7 +52,7 @@ public CommandProcessorDepositPostTestsAsync() policyRegistry, messageMapperRegistry, _fakeOutboxSync, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } @@ -64,7 +64,7 @@ public async Task When_depositing_a_message_in_the_outbox() //assert //message should not be posted - _fakeMessageProducer.MessageWasSent.Should().BeFalse(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeFalse(); //message should be in the store var depositedPost = _fakeOutboxSync diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor _Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor _Async.cs new file mode 100644 index 0000000000..59cab6c3b3 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor _Async.cs @@ -0,0 +1,131 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2015 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Polly; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors +{ + [Collection("CommandProcessor")] + public class CommandProcessorPostBoxImplicitClearAsyncTests : IDisposable + { + private readonly CommandProcessor _commandProcessor; + private readonly Message _message; + private readonly Message _message2; + private readonly FakeOutboxSync _fakeOutboxSync; + private readonly FakeMessageProducer _fakeMessageProducer; + + public CommandProcessorPostBoxImplicitClearAsyncTests() + { + var myCommand = new MyCommand{ Value = "Hello World"}; + + _fakeOutboxSync = new FakeOutboxSync(); + _fakeMessageProducer = new FakeMessageProducer(); + + const string topic = "MyCommand"; + _message = new Message( + new MessageHeader(myCommand.Id, topic, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand, JsonSerialisationOptions.Options)) + ); + + _message2 = new Message( + new MessageHeader(Guid.NewGuid(), topic, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand, JsonSerialisationOptions.Options)) + ); + + var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper())); + messageMapperRegistry.Register(); + + var retryPolicy = Policy + .Handle() + .RetryAsync(); + + var circuitBreakerPolicy = Policy + .Handle() + .CircuitBreakerAsync(1, TimeSpan.FromMilliseconds(1)); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, + messageMapperRegistry, + _fakeOutboxSync, + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + } + + [Fact] + public async Task When_Implicit_Clearing_The_PostBox_On_The_Command_Processor_Async() + { + await _fakeOutboxSync.AddAsync(_message); + await _fakeOutboxSync.AddAsync(_message2); + + _commandProcessor.ClearAsyncOutbox(1,1); + + for (var i = 1; i <= 10; i++) + { + if (_fakeMessageProducer.SentMessages.Count == 1) break; + await Task.Delay(i * 100); + } + + _commandProcessor.ClearAsyncOutbox(1, 1); + + //Try again and kick off another background thread + for (var i = 1; i <= 10; i++) + { + if (_fakeMessageProducer.SentMessages.Count == 2) + break; + await Task.Delay(i * 100); + _commandProcessor.ClearAsyncOutbox(1, 1); + } + + //_should_send_a_message_via_the_messaging_gateway + _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + + var sentMessage = _fakeMessageProducer.SentMessages.FirstOrDefault(m => m.Id == _message.Id); + sentMessage.Should().NotBeNull(); + sentMessage.Id.Should().Be(_message.Id); + sentMessage.Header.Topic.Should().Be(_message.Header.Topic); + sentMessage.Body.Value.Should().Be(_message.Body.Value); + + var sentMessage2 = _fakeMessageProducer.SentMessages.FirstOrDefault(m => m.Id == _message2.Id); + sentMessage2.Should().NotBeNull(); + sentMessage2.Id.Should().Be(_message2.Id); + sentMessage2.Header.Topic.Should().Be(_message2.Header.Topic); + sentMessage2.Body.Value.Should().Be(_message2.Body.Value); + } + + public void Dispose() + { + CommandProcessor.ClearExtServiceBus(); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs new file mode 100644 index 0000000000..4f9b03f69e --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Implicit_Clearing_The_PostBox_On_The_Command_Processor.cs @@ -0,0 +1,129 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2015 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the “Software”), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using System.Threading; +using FluentAssertions; +using Paramore.Brighter.Core.Tests.CommandProcessors.TestDoubles; +using Polly; +using Polly.Registry; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.CommandProcessors +{ + [Collection("CommandProcessor")] + public class CommandProcessorPostBoxImplicitClearTests : IDisposable + { + private readonly CommandProcessor _commandProcessor; + private readonly Message _message; + private readonly Message _message2; + private readonly FakeOutboxSync _fakeOutbox; + private readonly FakeMessageProducer _fakeMessageProducer; + + public CommandProcessorPostBoxImplicitClearTests() + { + var myCommand = new MyCommand{ Value = "Hello World"}; + + _fakeOutbox = new FakeOutboxSync(); + _fakeMessageProducer = new FakeMessageProducer(); + + var topic = "MyCommand"; + _message = new Message( + new MessageHeader(myCommand.Id, topic, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand, JsonSerialisationOptions.Options)) + ); + + _message2 = new Message( + new MessageHeader(Guid.NewGuid(), topic, MessageType.MT_COMMAND), + new MessageBody(JsonSerializer.Serialize(myCommand, JsonSerialisationOptions.Options)) + ); + + var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper())); + messageMapperRegistry.Register(); + + var retryPolicy = Policy + .Handle() + .Retry(); + + var circuitBreakerPolicy = Policy + .Handle() + .CircuitBreaker(1, TimeSpan.FromMilliseconds(1)); + + _commandProcessor = new CommandProcessor( + new InMemoryRequestContextFactory(), + new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, + messageMapperRegistry, + _fakeOutbox, + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + } + + [Fact] + public void When_Implicit_Clearing_The_PostBox_On_The_Command_Processor() + { + _fakeOutbox.Add(_message); + _fakeOutbox.Add(_message2); + + _commandProcessor.ClearOutbox(1,1); + + for (var i = 1; i <= 10; i++) + { + if (_fakeMessageProducer.SentMessages.Count == 1) + break; + Thread.Sleep(i * 100); + } + + //Try again and kick off another background thread + for (var i = 1; i <= 10; i++) + { + if (_fakeMessageProducer.SentMessages.Count == 2) + break; + Thread.Sleep(i * 100); + _commandProcessor.ClearOutbox(1, 1); + } + + //_should_send_a_message_via_the_messaging_gateway + _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + + var sentMessage = _fakeMessageProducer.SentMessages.FirstOrDefault(m => m.Id == _message.Id); + sentMessage.Should().NotBeNull(); + sentMessage.Id.Should().Be(_message.Id); + sentMessage.Header.Topic.Should().Be(_message.Header.Topic); + sentMessage.Body.Value.Should().Be(_message.Body.Value); + + var sentMessage2 = _fakeMessageProducer.SentMessages.FirstOrDefault(m => m.Id == _message2.Id); + sentMessage2.Should().NotBeNull(); + sentMessage2.Id.Should().Be(_message2.Id); + sentMessage2.Header.Topic.Should().Be(_message2.Header.Topic); + sentMessage2.Body.Value.Should().Be(_message2.Body.Value); + } + + public void Dispose() + { + CommandProcessor.ClearExtServiceBus(); + } + } +} diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs index 7e08770b3c..c90ef097a7 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry.cs @@ -40,7 +40,7 @@ public class CommandProcessorNoMessageMapperTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private Message _message; private readonly FakeOutboxSync _fakeOutbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; private Exception _exception; public CommandProcessorNoMessageMapperTests() @@ -48,7 +48,7 @@ public CommandProcessorNoMessageMapperTests() _myCommand.Value = "Hello World"; _fakeOutbox = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -71,7 +71,7 @@ public CommandProcessorNoMessageMapperTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } public void When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry() diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs index 7c3c3b1b63..718ffd0573 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Mapper_Registry_Async.cs @@ -41,7 +41,7 @@ public class CommandProcessorNoMessageMapperAsyncTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private Message _message; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; private Exception _exception; public CommandProcessorNoMessageMapperAsyncTests() @@ -49,7 +49,7 @@ public CommandProcessorNoMessageMapperAsyncTests() _myCommand.Value = "Hello World"; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -72,7 +72,7 @@ public CommandProcessorNoMessageMapperAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutboxSync, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store.cs index b42857b431..c014dcc09b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store.cs @@ -37,14 +37,14 @@ public class CommandProcessorNoOutboxTests : IDisposable { private readonly CommandProcessor _commandProcessor; private readonly MyCommand _myCommand = new MyCommand(); - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; private Exception _exception; public CommandProcessorNoOutboxTests() { _myCommand.Value = "Hello World"; - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper())); messageMapperRegistry.Register(); @@ -62,7 +62,7 @@ public CommandProcessorNoOutboxTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, null, - new ProducerRegistry(new Dictionary() {{"MyCommand", _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{"MyCommand", _fakeMessageProducerWithPublishConfirmation},})); } [Fact] diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store_Async.cs index 8fae855f5a..68532fde9a 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_And_There_Is_No_Message_Store_Async.cs @@ -38,14 +38,14 @@ public class CommandProcessorNoOutboxAsyncTests : IDisposable { private readonly CommandProcessor _commandProcessor; private readonly MyCommand _myCommand = new MyCommand(); - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; private Exception _exception; public CommandProcessorNoOutboxAsyncTests() { _myCommand.Value = "Hello World"; - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); var messageMapperRegistry = new MessageMapperRegistry(new SimpleMessageMapperFactory((_) => new MyCommandMessageMapper())); messageMapperRegistry.Register(); @@ -63,7 +63,7 @@ public CommandProcessorNoOutboxAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, null, - new ProducerRegistry(new Dictionary() {{"MyCommand", _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{"MyCommand", _fakeMessageProducerWithPublishConfirmation},})); } [Fact] diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor.cs index fee0999e0d..767341c95c 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor.cs @@ -41,14 +41,14 @@ public class CommandProcessorPostCommandTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly FakeOutboxSync _fakeOutbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorPostCommandTests() { _myCommand.Value = "Hello World"; _fakeOutbox = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -72,7 +72,7 @@ public CommandProcessorPostCommandTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] @@ -86,7 +86,7 @@ public void When_Posting_A_Message_To_The_Command_Processor() .SingleOrDefault(msg => msg.Id == _message.Id) .Should().NotBeNull(); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); //_should_convert_the_command_into_a_message _fakeOutbox.Get().First().Should().Be(_message); } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor_Async.cs index a52a31bc62..2f013fd79b 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_A_Message_To_The_Command_Processor_Async.cs @@ -42,14 +42,14 @@ public class CommandProcessorPostCommandAsyncTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private Message _message; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorPostCommandAsyncTests() { _myCommand.Value = "Hello World"; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -73,7 +73,7 @@ public CommandProcessorPostCommandAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, messageMapperRegistry, _fakeOutboxSync, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] @@ -87,7 +87,7 @@ public async Task When_Posting_A_Message_To_The_Command_Processor_Async() .SingleOrDefault(msg => msg.Id == _message.Id) .Should().NotBeNull(); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); //_should_convert_the_command_into_a_message } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_Via_A_Control_Bus_Sender_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_Via_A_Control_Bus_Sender_Async.cs index 29993d5538..5c69a9e85a 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_Via_A_Control_Bus_Sender_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_Via_A_Control_Bus_Sender_Async.cs @@ -43,14 +43,14 @@ public class ControlBusSenderPostMessageAsyncTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly IAmAnOutboxSync _outbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public ControlBusSenderPostMessageAsyncTests() { _myCommand.Value = "Hello World"; _outbox = new InMemoryOutbox(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -74,7 +74,7 @@ public ControlBusSenderPostMessageAsyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, messageMapperRegistry, _outbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); _controlBusSender = new ControlBusSender(_commandProcessor); } @@ -85,7 +85,7 @@ public async Task When_Posting_Via_A_Control_Bus_Sender_Async() await _controlBusSender.PostAsync(_myCommand); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); //_should_store_the_message_in_the_sent_command_message_repository var message = _outbox diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_A_Default_Policy.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_A_Default_Policy.cs index da809eb364..3e56ba3596 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_A_Default_Policy.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_A_Default_Policy.cs @@ -39,14 +39,14 @@ public class PostCommandTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly FakeOutboxSync _fakeOutboxSync; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public PostCommandTests() { _myCommand.Value = "Hello World"; _fakeOutboxSync = new FakeOutboxSync(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -62,7 +62,7 @@ public PostCommandTests() .Handlers(new HandlerConfiguration(new SubscriberRegistry(), new EmptyHandlerFactorySync())) .DefaultPolicy() .ExternalBus(new MessagingConfiguration( - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},}), + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},}), messageMapperRegistry), _fakeOutboxSync) .RequestContextFactory(new InMemoryRequestContextFactory()) @@ -80,7 +80,7 @@ public void When_Posting_With_A_Default_Policy() .SingleOrDefault(msg => msg.Id == _message.Id) .Should().NotBeNull(); //should send a message via the messaging gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); // should convert the command into a message _fakeOutboxSync.Get().First().Should().Be(_message); } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store.cs index 6e20bcc33f..53cbcf5daf 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store.cs @@ -40,7 +40,7 @@ public class CommandProcessorWithInMemoryOutboxTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly InMemoryOutbox _outbox = new InMemoryOutbox(); - private readonly FakeMessageProducer _fakeMessageProducer = new FakeMessageProducer(); + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); public CommandProcessorWithInMemoryOutboxTests() { @@ -68,7 +68,7 @@ public CommandProcessorWithInMemoryOutboxTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }, messageMapperRegistry, _outbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } [Fact] @@ -79,7 +79,7 @@ public void When_Posting_With_An_In_Memory_Outbox() //_should_store_the_message_in_the_sent_command_message_repository _outbox.Get(_myCommand.Id).Should().NotBeNull(); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); // _should_convert_the_command_into_a_message _outbox.Get(_myCommand.Id).Should().Be(_message); } diff --git a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store_Async.cs b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store_Async.cs index 3b8739261f..1a06526f88 100644 --- a/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store_Async.cs +++ b/tests/Paramore.Brighter.Core.Tests/CommandProcessors/When_Posting_With_An_In_Memory_Message_Store_Async.cs @@ -41,14 +41,14 @@ public class CommandProcessorWithInMemoryOutboxAscyncTests : IDisposable private readonly MyCommand _myCommand = new MyCommand(); private readonly Message _message; private readonly InMemoryOutbox _outbox; - private readonly FakeMessageProducer _fakeMessageProducer; + private readonly FakeMessageProducerWithPublishConfirmation _fakeMessageProducerWithPublishConfirmation; public CommandProcessorWithInMemoryOutboxAscyncTests() { _myCommand.Value = "Hello World"; _outbox = new InMemoryOutbox(); - _fakeMessageProducer = new FakeMessageProducer(); + _fakeMessageProducerWithPublishConfirmation = new FakeMessageProducerWithPublishConfirmation(); const string topic = "MyCommand"; _message = new Message( @@ -72,7 +72,7 @@ public CommandProcessorWithInMemoryOutboxAscyncTests() new PolicyRegistry { { CommandProcessor.RETRYPOLICYASYNC, retryPolicy }, { CommandProcessor.CIRCUITBREAKERASYNC, circuitBreakerPolicy } }, messageMapperRegistry, _outbox, - new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducer},})); + new ProducerRegistry(new Dictionary() {{topic, _fakeMessageProducerWithPublishConfirmation},})); } @@ -85,7 +85,7 @@ public async Task When_Posting_With_An_In_Memory_Outbox_Async() //_should_store_the_message_in_the_sent_command_message_repository message.Should().NotBeNull(); //_should_send_a_message_via_the_messaging_gateway - _fakeMessageProducer.MessageWasSent.Should().BeTrue(); + _fakeMessageProducerWithPublishConfirmation.MessageWasSent.Should().BeTrue(); //_should_convert_the_command_into_a_message message.Should().Be(_message); } diff --git a/tests/Paramore.Brighter.Core.Tests/ControlBus/When_configuring_a_control_bus.cs b/tests/Paramore.Brighter.Core.Tests/ControlBus/When_configuring_a_control_bus.cs index 0bc4cf8d30..a3d1e99f83 100644 --- a/tests/Paramore.Brighter.Core.Tests/ControlBus/When_configuring_a_control_bus.cs +++ b/tests/Paramore.Brighter.Core.Tests/ControlBus/When_configuring_a_control_bus.cs @@ -45,7 +45,7 @@ public ControlBusBuilderTests() var messageProducerFactory = A.Fake(); A.CallTo(() => messageProducerFactory.Create()) - .Returns(new ProducerRegistry(new Dictionary() {{"MyTopic", new FakeMessageProducer()},})); + .Returns(new ProducerRegistry(new Dictionary() {{"MyTopic", new FakeMessageProducerWithPublishConfirmation()},})); _busReceiverBuilder = ControlBusReceiverBuilder .With() diff --git a/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs b/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs index acf2a17309..e0121e76de 100644 --- a/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs +++ b/tests/Paramore.Brighter.InMemory.Tests/Sweeper/When_sweeping_the_outbox.cs @@ -35,6 +35,8 @@ public void When_outstanding_in_outbox_sweep_clears_them() sweeper.Sweep(); + Thread.Sleep(200); + //Assert outbox.EntryCount.Should().Be(3); commandProcessor.Dispatched.Count.Should().Be(3); @@ -63,7 +65,9 @@ public async Task When_outstanding_in_outbox_sweep_clears_them_async() //Act await Task.Delay(milliSecondsSinceSent * 2); // -- let the messages expire - await sweeper.SweepAsync(); + sweeper.SweepAsyncOutbox(); + + await Task.Delay(200); //Assert outbox.EntryCount.Should().Be(3); @@ -96,8 +100,10 @@ public void When_too_new_to_sweep_leaves_them() //Act sweeper.Sweep(); - //Assert - commandProcessor.Dispatched.Count.Should().Be(1); + Thread.Sleep(200); + + //Assert + commandProcessor.Dispatched.Count.Should().Be(1); commandProcessor.Deposited.Count.Should().Be(4); } @@ -124,7 +130,9 @@ public async Task When_too_new_to_sweep_leaves_them_async() } //Act - await sweeper.SweepAsync(); + sweeper.SweepAsyncOutbox(); + + await Task.Delay(200); //Assert commandProcessor.Deposited.Count.Should().Be(4);