Skip to content

Commit

Permalink
Fix: Sync Over Async Improvements (#3409)
Browse files Browse the repository at this point in the history
* fix: add support for Send to our synchronization context, to allow it to work in more use cases

* fix: remove blocking wait from consumers that do not have a built in delay function.

* fix: document where wait completes on the synchronizationcontext via the performer thread.

* fix: update comments for sync and async, fix one where appropriate

* feat: add adr describing approach; rename to use reactor and proactor to align with documentation.

* feat: improve the adr about Brighter's usage of threading.

* fix: use GetAwaiter().GetResult() for better call stack

* feat: Update the ADR for IAmAMessageConsumerAsync

* feat: add IAmAMessageConsumerAsync.cs and initial tests.

* feat: additional tests for InMemoryConsumerAsync

* feat: allow proactor to use async methods on transports, if they exist

* feat: expose async methods from AWS to Proactor.cs

* fix: nullability of AWS transport

* fix: add async options to ASB

* chore: branch switch

* feat: add async support to Kafka

* chore: separate summary from remarks

* feat: add async to RMQ; update to RMQ V7

* feat: add nullability to RMQ transport

* feat: add async operations to mssql transport

* chore: fix nullable issues for MS SQL transport

* chore: move Redis to using ChannelName and RoutingKey over string, to reduce primitive obsession.

* feat: upgrade the sychronizationcontext to work with the scheduler, derived from Stephen Cleary work

* chore: reorg tests around reactor and proactor

* Shift to run whole loop through synchronizationcontext over parts

* Make the choice of proactor/reactor easier to understand

* fix: switch to close implementation of Stephen Cleary's Nito as a starting point; can't use direct as not strong named, and adds depenency we can't fix

* fix: update ADR to reflect sources changes; update sources to reflect code origins

* fix: need to be explicit about your pump type if not proactor now

* fix: license file URI for Stephen Cleary was wrong

* feat: update ADR to show native support for proactor or reactor

* fix: match proactor tests to reactor tests to flush issues

* fix: add duplicates for the tests within Reactor, not in Proactor

* fix: make rmq broker creation async; add cancellationtoken to sendasync

* fix: add async tests for RMQ, use to flush out issues with synchronization context, particularly in test scenarios (as runner and framework also have contexts)

* feat: add tests for internal syncrhonizationcontext, derived from Stephen Cleary

* fix: tests show that we should remove the reentrancy check; there may be an issue, but this was not how to solve

* fix: add async disposable pattern to consumer

* feat: support IAsyncDisposable

* fix: remove GetAwaiter().GetResult() from hot paths

* fix: Improve the ADR to reflect results; attribution in the README.md file

* chore: add some debugging support for thornier issues

* chore: add better debug statements; helps to diagnose scheduler & context issues. Issue with scheduler being used outside of helper context.

* chore: add async versions of AWS tests

* chore: fix missing interface member

* chore: add missing interface methods

* chore: async service bus tests

* fix: confirm that if task scheduler reset correctly, we don't get a spurious callback

* fix: note concerns on TaskScheduler and ConfigureAwait

* fix: another pass at exploring the edge case, that causes work to be accidentally queued to our scheduler

* fix: some fallback approaches in Post

* Pull one issue with blocking in the Proactor pipeline out.

* fix: Notes on TaskScheduler and ConfigureAwait

* fix: update ADR link

* fix: allow asynchronous producer creation (mainly useful for AWS now)

* fix: add async tests to Kafka; remove spurious finalize call

* fix: add async to mqtt

* fix: add async for mssql

* fix: Add async Redis tests

* chore: syntax modernization

* fix: accidental sample drop
  • Loading branch information
iancooper authored Dec 29, 2024
1 parent 8438dd9 commit 9af3c0b
Show file tree
Hide file tree
Showing 393 changed files with 20,931 additions and 8,861 deletions.
10 changes: 5 additions & 5 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@
<PackageVersion Include="Polly" Version="8.5.0" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="Serilog" Version="4.2.0" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="9.0.0" />
<PackageVersion Include="Serilog.Extensions.Logging" Version="9.0.0" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0" />
<PackageVersion Include="Serilog" Version="4.1.0" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageVersion Include="Serilog.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageVersion Include="Serilog.Sinks.TestCorrelator" Version="4.0.0" />
<PackageVersion Include="ServiceStack.Redis.Core" Version="8.5.2" />
Expand All @@ -90,7 +90,7 @@
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
<PackageVersion Include="System.Net.Security" Version="4.3.2" />
<PackageVersion Include="System.Security.Cryptography.X509Certificates" Version="4.3.2"/>
<PackageVersion Include="System.Security.Cryptography.X509Certificates" Version="4.3.2" />
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="xunit" Version="2.9.2" />
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ We release the build artefacts (NuGet packages) to [NuGet](http://nuget.org) on
src="https://scan.coverity.com/projects/2900/badge.svg"/>
</a>

## Sources

Portions of this code are based on Stephen Cleary's [AsyncEx's AsyncContext](https://github.com/StephenCleary/AsyncEx/blob/master/doc/AsyncContext.md)

![CodeScene Code Health](https://codescene.io/projects/32198/status-badges/code-health)

Expand Down
8 changes: 5 additions & 3 deletions docs/adr/0002-use-a-single-threaded-message-pump.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ Accepted

Any service activator pattern will have a message pump, which reads from a queue.

There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multi-threaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.
There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multithreaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.

The other option would be to use the thread pool to service requests, creating a thread for each incoming message. This would not scale, as we would quickly run out of threads in the pool. To avoid this issue, solutions that rely on the thread pool typically have to govern the number of thread pool threads that can be used for concurrent requests. The problem becomes that at scale the semaphore that governs the number of threads becomes a bottleneck.

The alternative to these multi-threaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.
The alternative to these multithreaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.

This approach is the [Reactor](https://en.wikipedia.org/wiki/Reactor_pattern) pattern. The [Reactor](http://reactors.io/tutorialdocs//reactors/why-reactors/index.html) pattern uses a single thread to read from the queue, and then dispatches the message to a handler. If a higher throughput is desired with a single threaded pump, then you can create multiple pumps. In essence, this is the competing consumers pattern, each performer is its own message pump.

To make the Reactor pattern more performant, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).
To give the Reactor pattern higher throughput, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).

Note, that the Reactor pattern may be more performant, because it does not require the overhead of the thread pool, and the context switching that occurs when using the thread pool. The Reactor pattern is also more predictable, as it does not rely on the thread pool, which can be unpredictable in terms of the number of threads available. The Proactor pattern however may offer greater throughput because it does not block on I/O.

The message pump performs the usual sequence of actions:

Expand Down
Loading

0 comments on commit 9af3c0b

Please sign in to comment.