Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Sync Over Async Improvements #3409

Merged
merged 64 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
67f7608
fix: add support for Send to our synchronization context, to allow it…
iancooper Dec 5, 2024
f677893
fix: remove blocking wait from consumers that do not have a built in …
iancooper Dec 5, 2024
195df8b
fix: document where wait completes on the synchronizationcontext via …
iancooper Dec 5, 2024
024accf
fix: update comments for sync and async, fix one where appropriate
iancooper Dec 5, 2024
041a845
feat: add adr describing approach; rename to use reactor and proactor…
iancooper Dec 6, 2024
ba9c2a7
feat: improve the adr about Brighter's usage of threading.
iancooper Dec 6, 2024
84418ed
fix: use GetAwaiter().GetResult() for better call stack
iancooper Dec 6, 2024
2cfc787
feat: Update the ADR for IAmAMessageConsumerAsync
iancooper Dec 6, 2024
9d746b7
feat: add IAmAMessageConsumerAsync.cs and initial tests.
iancooper Dec 6, 2024
540defe
feat: additional tests for InMemoryConsumerAsync
iancooper Dec 6, 2024
3b78962
feat: allow proactor to use async methods on transports, if they exist
iancooper Dec 7, 2024
7684bd5
feat: expose async methods from AWS to Proactor.cs
iancooper Dec 7, 2024
e2549e9
fix: nullability of AWS transport
iancooper Dec 12, 2024
ddd04ac
fix: add async options to ASB
iancooper Dec 15, 2024
4013fb3
chore: branch switch
iancooper Dec 16, 2024
c1220a6
feat: add async support to Kafka
iancooper Dec 18, 2024
64323dd
chore: separate summary from remarks
iancooper Dec 18, 2024
756fca4
feat: add async to RMQ; update to RMQ V7
iancooper Dec 18, 2024
95d4b71
feat: add nullability to RMQ transport
iancooper Dec 19, 2024
d77adca
feat: add async operations to mssql transport
iancooper Dec 19, 2024
ca1cb53
chore: fix nullable issues for MS SQL transport
iancooper Dec 20, 2024
e52710a
chore: move Redis to using ChannelName and RoutingKey over string, to…
iancooper Dec 20, 2024
8964bbf
feat: upgrade the sychronizationcontext to work with the scheduler, d…
iancooper Dec 21, 2024
3c3ba3b
chore: reorg tests around reactor and proactor
iancooper Dec 21, 2024
684de10
Shift to run whole loop through synchronizationcontext over parts
iancooper Dec 21, 2024
ff37660
Make the choice of proactor/reactor easier to understand
iancooper Dec 21, 2024
6bf7ada
fix: switch to close implementation of Stephen Cleary's Nito as a sta…
iancooper Dec 21, 2024
8e8947f
fix: update ADR to reflect sources changes; update sources to reflect…
iancooper Dec 21, 2024
4ba1ee0
fix: need to be explicit about your pump type if not proactor now
iancooper Dec 21, 2024
e86fc83
fix: license file URI for Stephen Cleary was wrong
iancooper Dec 21, 2024
98b8465
feat: update ADR to show native support for proactor or reactor
iancooper Dec 21, 2024
3ba0678
fix: match proactor tests to reactor tests to flush issues
iancooper Dec 22, 2024
f008175
fix: add duplicates for the tests within Reactor, not in Proactor
iancooper Dec 22, 2024
8735ea6
fix: make rmq broker creation async; add cancellationtoken to sendasync
iancooper Dec 22, 2024
8e54725
fix: add async tests for RMQ, use to flush out issues with synchroniz…
iancooper Dec 23, 2024
c484c04
feat: add tests for internal syncrhonizationcontext, derived from Ste…
iancooper Dec 23, 2024
fb20ef8
fix: tests show that we should remove the reentrancy check; there may…
iancooper Dec 23, 2024
e90cb36
fix: add async disposable pattern to consumer
iancooper Dec 23, 2024
0e3098d
feat: support IAsyncDisposable
iancooper Dec 23, 2024
e1acf37
fix: remove GetAwaiter().GetResult() from hot paths
iancooper Dec 24, 2024
3cb7fdb
fix: Improve the ADR to reflect results; attribution in the README.md…
iancooper Dec 24, 2024
b7fdc34
chore: add some debugging support for thornier issues
iancooper Dec 24, 2024
111495f
chore: add better debug statements; helps to diagnose scheduler & con…
iancooper Dec 24, 2024
e1f416b
chore: add async versions of AWS tests
iancooper Dec 26, 2024
4046776
chore: fix missing interface member
iancooper Dec 27, 2024
d298561
chore: add missing interface methods
iancooper Dec 27, 2024
bd5a493
chore: async service bus tests
iancooper Dec 27, 2024
60e64a0
fix: confirm that if task scheduler reset correctly, we don't get a s…
iancooper Dec 27, 2024
efe687a
fix: note concerns on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
eef18d4
fix: another pass at exploring the edge case, that causes work to be …
iancooper Dec 27, 2024
a609951
fix: some fallback approaches in Post
iancooper Dec 27, 2024
472126c
Pull one issue with blocking in the Proactor pipeline out.
iancooper Dec 27, 2024
701caa3
fix: Notes on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
c71ce3f
fix: update ADR link
iancooper Dec 27, 2024
7c40cfa
fix: allow asynchronous producer creation (mainly useful for AWS now)
iancooper Dec 28, 2024
b95d99c
fix: add async tests to Kafka; remove spurious finalize call
iancooper Dec 28, 2024
933433e
fix: add async to mqtt
iancooper Dec 29, 2024
8ba6e67
fix: add async for mssql
iancooper Dec 29, 2024
4733e0d
fix: Add async Redis tests
iancooper Dec 29, 2024
122caad
chore: syntax modernization
iancooper Dec 29, 2024
aa01361
Merge branch 'master' into synccontext
iancooper Dec 29, 2024
b01ce53
fix: accidental sample drop
iancooper Dec 29, 2024
841e472
Merge remote-tracking branch 'origin/synccontext' into synccontext
iancooper Dec 29, 2024
4557375
chore: merge branch 'master' into synccontext
iancooper Dec 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<PackageVersion Include="Polly" Version="8.5.0" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="RabbitMQ.Client" Version="7.0.0" />
<PackageVersion Include="Serilog" Version="4.1.0" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageVersion Include="Serilog.Extensions.Logging" Version="8.0.0" />
Expand All @@ -90,7 +90,7 @@
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
<PackageVersion Include="System.Net.Security" Version="4.3.2" />
<PackageVersion Include="System.Security.Cryptography.X509Certificates" Version="4.3.2"/>
<PackageVersion Include="System.Security.Cryptography.X509Certificates" Version="4.3.2" />
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
<PackageVersion Include="xunit" Version="2.9.2" />
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ We release the build artefacts (NuGet packages) to [NuGet](http://nuget.org) on
src="https://scan.coverity.com/projects/2900/badge.svg"/>
</a>

## Sources

Portions of this code are based on Stephen Cleary's [AsyncEx's AsyncContext](https://github.com/StephenCleary/AsyncEx/blob/master/doc/AsyncContext.md)

![CodeScene Code Health](https://codescene.io/projects/32198/status-badges/code-health)

Expand Down
8 changes: 5 additions & 3 deletions docs/adr/0002-use-a-single-threaded-message-pump.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ Accepted

Any service activator pattern will have a message pump, which reads from a queue.

There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multi-threaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.
There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multithreaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.

The other option would be to use the thread pool to service requests, creating a thread for each incoming message. This would not scale, as we would quickly run out of threads in the pool. To avoid this issue, solutions that rely on the thread pool typically have to govern the number of thread pool threads that can be used for concurrent requests. The problem becomes that at scale the semaphore that governs the number of threads becomes a bottleneck.

The alternative to these multi-threaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.
The alternative to these multithreaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.

This approach is the [Reactor](https://en.wikipedia.org/wiki/Reactor_pattern) pattern. The [Reactor](http://reactors.io/tutorialdocs//reactors/why-reactors/index.html) pattern uses a single thread to read from the queue, and then dispatches the message to a handler. If a higher throughput is desired with a single threaded pump, then you can create multiple pumps. In essence, this is the competing consumers pattern, each performer is its own message pump.

To make the Reactor pattern more performant, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).
To give the Reactor pattern higher throughput, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).

Note, that the Reactor pattern may be more performant, because it does not require the overhead of the thread pool, and the context switching that occurs when using the thread pool. The Reactor pattern is also more predictable, as it does not rely on the thread pool, which can be unpredictable in terms of the number of threads available. The Proactor pattern however may offer greater throughput because it does not block on I/O.

The message pump performs the usual sequence of actions:

Expand Down
Loading