Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Sync Over Async Improvements #3409

Merged
merged 64 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
67f7608
fix: add support for Send to our synchronization context, to allow it…
iancooper Dec 5, 2024
f677893
fix: remove blocking wait from consumers that do not have a built in …
iancooper Dec 5, 2024
195df8b
fix: document where wait completes on the synchronizationcontext via …
iancooper Dec 5, 2024
024accf
fix: update comments for sync and async, fix one where appropriate
iancooper Dec 5, 2024
041a845
feat: add adr describing approach; rename to use reactor and proactor…
iancooper Dec 6, 2024
ba9c2a7
feat: improve the adr about Brighter's usage of threading.
iancooper Dec 6, 2024
84418ed
fix: use GetAwaiter().GetResult() for better call stack
iancooper Dec 6, 2024
2cfc787
feat: Update the ADR for IAmAMessageConsumerAsync
iancooper Dec 6, 2024
9d746b7
feat: add IAmAMessageConsumerAsync.cs and initial tests.
iancooper Dec 6, 2024
540defe
feat: additional tests for InMemoryConsumerAsync
iancooper Dec 6, 2024
3b78962
feat: allow proactor to use async methods on transports, if they exist
iancooper Dec 7, 2024
7684bd5
feat: expose async methods from AWS to Proactor.cs
iancooper Dec 7, 2024
e2549e9
fix: nullability of AWS transport
iancooper Dec 12, 2024
ddd04ac
fix: add async options to ASB
iancooper Dec 15, 2024
4013fb3
chore: branch switch
iancooper Dec 16, 2024
c1220a6
feat: add async support to Kafka
iancooper Dec 18, 2024
64323dd
chore: separate summary from remarks
iancooper Dec 18, 2024
756fca4
feat: add async to RMQ; update to RMQ V7
iancooper Dec 18, 2024
95d4b71
feat: add nullability to RMQ transport
iancooper Dec 19, 2024
d77adca
feat: add async operations to mssql transport
iancooper Dec 19, 2024
ca1cb53
chore: fix nullable issues for MS SQL transport
iancooper Dec 20, 2024
e52710a
chore: move Redis to using ChannelName and RoutingKey over string, to…
iancooper Dec 20, 2024
8964bbf
feat: upgrade the sychronizationcontext to work with the scheduler, d…
iancooper Dec 21, 2024
3c3ba3b
chore: reorg tests around reactor and proactor
iancooper Dec 21, 2024
684de10
Shift to run whole loop through synchronizationcontext over parts
iancooper Dec 21, 2024
ff37660
Make the choice of proactor/reactor easier to understand
iancooper Dec 21, 2024
6bf7ada
fix: switch to close implementation of Stephen Cleary's Nito as a sta…
iancooper Dec 21, 2024
8e8947f
fix: update ADR to reflect sources changes; update sources to reflect…
iancooper Dec 21, 2024
4ba1ee0
fix: need to be explicit about your pump type if not proactor now
iancooper Dec 21, 2024
e86fc83
fix: license file URI for Stephen Cleary was wrong
iancooper Dec 21, 2024
98b8465
feat: update ADR to show native support for proactor or reactor
iancooper Dec 21, 2024
3ba0678
fix: match proactor tests to reactor tests to flush issues
iancooper Dec 22, 2024
f008175
fix: add duplicates for the tests within Reactor, not in Proactor
iancooper Dec 22, 2024
8735ea6
fix: make rmq broker creation async; add cancellationtoken to sendasync
iancooper Dec 22, 2024
8e54725
fix: add async tests for RMQ, use to flush out issues with synchroniz…
iancooper Dec 23, 2024
c484c04
feat: add tests for internal syncrhonizationcontext, derived from Ste…
iancooper Dec 23, 2024
fb20ef8
fix: tests show that we should remove the reentrancy check; there may…
iancooper Dec 23, 2024
e90cb36
fix: add async disposable pattern to consumer
iancooper Dec 23, 2024
0e3098d
feat: support IAsyncDisposable
iancooper Dec 23, 2024
e1acf37
fix: remove GetAwaiter().GetResult() from hot paths
iancooper Dec 24, 2024
3cb7fdb
fix: Improve the ADR to reflect results; attribution in the README.md…
iancooper Dec 24, 2024
b7fdc34
chore: add some debugging support for thornier issues
iancooper Dec 24, 2024
111495f
chore: add better debug statements; helps to diagnose scheduler & con…
iancooper Dec 24, 2024
e1f416b
chore: add async versions of AWS tests
iancooper Dec 26, 2024
4046776
chore: fix missing interface member
iancooper Dec 27, 2024
d298561
chore: add missing interface methods
iancooper Dec 27, 2024
bd5a493
chore: async service bus tests
iancooper Dec 27, 2024
60e64a0
fix: confirm that if task scheduler reset correctly, we don't get a s…
iancooper Dec 27, 2024
efe687a
fix: note concerns on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
eef18d4
fix: another pass at exploring the edge case, that causes work to be …
iancooper Dec 27, 2024
a609951
fix: some fallback approaches in Post
iancooper Dec 27, 2024
472126c
Pull one issue with blocking in the Proactor pipeline out.
iancooper Dec 27, 2024
701caa3
fix: Notes on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
c71ce3f
fix: update ADR link
iancooper Dec 27, 2024
7c40cfa
fix: allow asynchronous producer creation (mainly useful for AWS now)
iancooper Dec 28, 2024
b95d99c
fix: add async tests to Kafka; remove spurious finalize call
iancooper Dec 28, 2024
933433e
fix: add async to mqtt
iancooper Dec 29, 2024
8ba6e67
fix: add async for mssql
iancooper Dec 29, 2024
4733e0d
fix: Add async Redis tests
iancooper Dec 29, 2024
122caad
chore: syntax modernization
iancooper Dec 29, 2024
aa01361
Merge branch 'master' into synccontext
iancooper Dec 29, 2024
b01ce53
fix: accidental sample drop
iancooper Dec 29, 2024
841e472
Merge remote-tracking branch 'origin/synccontext' into synccontext
iancooper Dec 29, 2024
4557375
chore: merge branch 'master' into synccontext
iancooper Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/adr/0002-use-a-single-threaded-message-pump.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ Accepted

Any service activator pattern will have a message pump, which reads from a queue.

There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multi-threaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.
There are different strategies we could use, a common one for example is to use a BlockingCollection to hold messages read from the queue, and then use threads from the thread pool to process those messages. However, a multithreaded pump has the issue that it will de-order an otherwise ordered queue, as the threads will pull items from the blocking collection in parallel, not sequentially. In addition, where we have multiple threads it becomes difficult to create resources used by the pump without protecting them from race conditions.

The other option would be to use the thread pool to service requests, creating a thread for each incoming message. This would not scale, as we would quickly run out of threads in the pool. To avoid this issue, solutions that rely on the thread pool typically have to govern the number of thread pool threads that can be used for concurrent requests. The problem becomes that at scale the semaphore that governs the number of threads becomes a bottleneck.

The alternative to these multi-threaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.
The alternative to these multithreaded approaches is to use a single-threaded message pump that reads from the queue, processes the message, and only when it has processed that message, processes the next item. This prevents de-ordering of the queue, because items are read in sequence.

This approach is the [Reactor](https://en.wikipedia.org/wiki/Reactor_pattern) pattern. The [Reactor](http://reactors.io/tutorialdocs//reactors/why-reactors/index.html) pattern uses a single thread to read from the queue, and then dispatches the message to a handler. If a higher throughput is desired with a single threaded pump, then you can create multiple pumps. In essence, this is the competing consumers pattern, each performer is its own message pump.

To make the Reactor pattern more performant, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).
To give the Reactor pattern higher throughput, we can choose not to block on I/O by using asynchronous handlers. This is the [Proactor](https://en.wikipedia.org/wiki/Proactor_pattern) pattern. Brighter provides a SynchronizationContext so that asynchronous handlers can be used, and the message pump will not block on I/O, whilst still preserving ordering. Using an asynchronous handler switches you to a Proactor approach from Reactor for [performance](https://www.artima.com/articles/comparing-two-high-performance-io-design-patterns#part2).

Note, that the Reactor pattern may be more performant, because it does not require the overhead of the thread pool, and the context switching that occurs when using the thread pool. The Reactor pattern is also more predictable, as it does not rely on the thread pool, which can be unpredictable in terms of the number of threads available. The Proactor pattern however may offer greater throughput because it does not block on I/O.

The message pump performs the usual sequence of actions:

Expand Down
71 changes: 71 additions & 0 deletions docs/adr/0022-reactor-and-nonblocking-io.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# 22. Reactor and Nonblocking IO, Proactor and Blocking IO

Date: 2019-08-01

## Status

Accepted

## Context

### Reactor and Proactor

As [outlined](0002-use-a-single-threaded-message-pump.md), Brighter offers two concurrency models, a Reactor model and a Proactor model.

The Reactor model uses a single-threaded message pump to read from the queue, and then dispatches the message to a handler. If a higher throughput is desired with a single threaded pump, then you can create multiple pumps (Peformers in Brighter's taxonomy). For a queue this is the competing consumers pattern, each Performer is its own message pump and another consumer; for a stream this is the partition worker pattern, each Performer is a single thread reading from one of your stream's partitions.

The Proactor model uses the same single-threaded message pump, or Performer, but uses non-blocking I/O. As the message pump waits for the non-blocking I/O to complete, it will not process additional messages whilst waiting for the I/O to complete; instead it will yield to other Peformers, which will process messages whilst the I/O is waiting to complete.

The benefit of the Proactor approach is throughput, as the Performer shares resources better. If you run multiple performers, each in their own thread, such as competing consumers of a queue, or consumers of individual partitions of a stream, the Proactor model ensures that when one is awaiting I/O, the others can continue to process messages.

The trade-off here is the Reactor model can offer better performance, as it does not require the overhead of waiting for I/O completion.

Of course, this assumes that Brighter can implement the Reactor and Proactor preference for blocking I/O vs non-blocking I/O top-to-bottom. What happens for the Proactor model the underlying SDK does not support non-blocking I/O or the Proactor model if the underlying SDK does not support non-blocking I/O.

### Thread Pool vs. Long Running Threads

A web server, receiving HTTP requests can schedule an incoming request to a pool of threads, and threads can be returned to the pool to service other requests whilst I/O is occuring. This allows it to make the most efficient usage of resources because non-blocking I/O returns threads to the pool to service new requests.

If I want to maintain ordering, I need to use a single-threaded message pump. Nothing else guarantees that I will read and process those messages in sequence. This is particularly important if I am doing stream processing, as I need to maintain the order of messages in the stream.

A consumer of a stream has a constrained choice if it needs to maintain its sequence. In that case, only one thread can consume the stream at a time. When a consumer is processing a record, we block consuming other records on the same stream so that we process them sequentially. To scale, we partition the stream, and allow up to as many threads as we have partitions. Kafka is a good example of this, where the consumer can choose to read from multiple partitions, but within a partition, it can only use a single thread to process the stream.

When consuming messages from a queue, where we do not care about ordering, we can use the competing consumers pattern, where each consumer is a single-threaded message pump. However, we do want to be able to throttle the rate at which we read from the queue, in order to be able to apply backpressure, and slow the rate of consumption. So again, we only tend to use a limited number of threads, and we can find value in being able to explicitly choose that value.

As our Performer, message pump, threads are long-running, we do not use a thread pool thread for them. The danger here is that work could become stuck in a message pump thread's local queue, and not be processed.

As a result we do not use the thread pool for our Performers and those threads are never returned to the pool. So the only thread pool threads we have are those being used for non-blocking I/O.

Non-blocking I/O may be useful if the handler called by the message pump thread performs I/O, when we can yield to another Performer.

Brighter has a custom SynchronizationContext, BrighterSynchronizationContext, that forces continuations to run on the message pump thread. This prevents non-blocking i/o waiting on the thread pool, with potential deadlocks. This synchronization context is used within the Performer for both the non-blocking i/o of the message pump and the non-blocking i/o in the transformer pipeline. Because our performer's only thread processes a single message at a time, there is no danger of this synchronization context deadlocking.

## Decision

If the underlying SDK does not support non-blocking I/O, then the Proactor model is forced to use blocking I/O. If the underlying SDK does not support blocking I/O, then the Reactor model is forced to use non-blocking I/O.

We support both the Reactor and Proactor models across all of our transports. We do this to avoid forcing a concurrency model onto users of Brighter. As we cannot know your context, we do not want to make decisions for you: the performace of blocking i/o or the throughput of non-blocking I/O.

To provide a common programming model, within our setup code our API uses blocking I/O. Where the underlying SDK only supports non-blocking I/O, we use non-blocking I/O and then use GetAwaiter().GetResult() to block on that. We prefer GetAwaiter().GetResult() to .Wait() as it will rework the stack trace to take all the asynchronous context into account.

Although this uses an extra thread, the impact for an application starting up on the thread pool is minimal. We will not starve the thread pool and deadlock during start-up.

For the Performer, within the message pump, we use non-blocking I/O if the transport supports it.

Currently, Brighter only supports only an IAmAMessageConsumer interface and does not support an IAmAMessageConsumerAsync interface. This means that within a Proactor, we cannot take advantage of the non-blocking I/O and we are forced to block on the non-blocking I/O. We will address this by adding that interface, so as to allow a Proactor to take advantage of non-blocking I/O.

To avoid duplicated code we will use the same code IAmAMessageConsumer implementations can use the [Flag Argument Hack](https://learn.microsoft.com/en-us/archive/msdn-magazine/2015/july/async-programming-brownfield-async-development) to share code where useful.

We will need to make changes to the Proactor to use async code within the pump, and to use the non-blocking I/O where possible.

## Consequences

Because setup is only run at application startup, the performance impact of blocking on non-blocking i/o is minimal, using .GetAwaiter().GetResult() normally an additional thread from the pool.

For the Reactor model there is a cost to using non-blocking I/O, that is an additional thread will be needed to run the continuation. This is because the message pump thread is blocked on I/O, and cannot run the continuation. As our message pump is single-threaded, this will be the maximum number of threads required though for the Reactor model. With the message pump thread suspended, awaiting, during non-blocking I/O, there will be no additional messages processed, until after the I/O completes.

This is not a significant issue but if you use an SDK that does not support blocking I/O natively (Azure Service Bus, SNS/SQS, RabbitMQ), then you need to be aware of the additional cost for those SDKs (an additional thread pool thread). You may be better off explicity using the Proactor model with these transports, unless your own application cannot support that concurrency model.

Brighter offers you explicit control, through the number of Performers you run, over how many threads are required, instead of implicit scaling through the pool. This has significant advantages for messaging consumers, as it allows you to maintain ordering, such as when consuming a stream instead of a queue.

For the Proactor model this is less cost in using a transport that only supports blocking I/O.
26 changes: 25 additions & 1 deletion src/Paramore.Brighter.DynamoDb/DynamoDbUnitOfWork.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
using System;
#region Licence
/* The MIT License (MIT)
Copyright © 2022 Ian Cooper <ian_hammond_cooper@yahoo.co.uk>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the “Software”), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE. */
#endregion

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -34,6 +57,7 @@ public void Close()

/// <summary>
/// Commit a transaction, performing all associated write actions
/// Will block thread and use second thread for callback
/// </summary>
public void Commit()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ private void Sweep(object state)
}
finally
{
_distributedLock.ReleaseLockAsync(LockingResourceName, lockId, CancellationToken.None).Wait();
//on a timer thread, so blocking is OK
_distributedLock.ReleaseLockAsync(LockingResourceName, lockId, CancellationToken.None)
.GetAwaiter()
.GetResult();
scope.Dispose();
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/Paramore.Brighter.Inbox.DynamoDB/DynamoDbInbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public DynamoDbInbox(IAmazonDynamoDB client, DynamoDbInboxConfiguration configur
}

/// <summary>
/// Adds a command to the store
/// Adds a command to the store
/// Will block, and consume another thread for callback on threadpool; use within sync pipeline only
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="command">The command to be stored</param>
Expand All @@ -71,7 +72,7 @@ public void Add<T>(T command, string contextKey, int timeoutInMilliseconds = -1)
}

/// <summary>
/// Finds a command with the specified identifier.
/// Finds a command with the specified identifier.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="id">The identifier.</param>
Expand Down
Loading
Loading