Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Sync Over Async Improvements #3409

Merged
merged 64 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
67f7608
fix: add support for Send to our synchronization context, to allow it…
iancooper Dec 5, 2024
f677893
fix: remove blocking wait from consumers that do not have a built in …
iancooper Dec 5, 2024
195df8b
fix: document where wait completes on the synchronizationcontext via …
iancooper Dec 5, 2024
024accf
fix: update comments for sync and async, fix one where appropriate
iancooper Dec 5, 2024
041a845
feat: add adr describing approach; rename to use reactor and proactor…
iancooper Dec 6, 2024
ba9c2a7
feat: improve the adr about Brighter's usage of threading.
iancooper Dec 6, 2024
84418ed
fix: use GetAwaiter().GetResult() for better call stack
iancooper Dec 6, 2024
2cfc787
feat: Update the ADR for IAmAMessageConsumerAsync
iancooper Dec 6, 2024
9d746b7
feat: add IAmAMessageConsumerAsync.cs and initial tests.
iancooper Dec 6, 2024
540defe
feat: additional tests for InMemoryConsumerAsync
iancooper Dec 6, 2024
3b78962
feat: allow proactor to use async methods on transports, if they exist
iancooper Dec 7, 2024
7684bd5
feat: expose async methods from AWS to Proactor.cs
iancooper Dec 7, 2024
e2549e9
fix: nullability of AWS transport
iancooper Dec 12, 2024
ddd04ac
fix: add async options to ASB
iancooper Dec 15, 2024
4013fb3
chore: branch switch
iancooper Dec 16, 2024
c1220a6
feat: add async support to Kafka
iancooper Dec 18, 2024
64323dd
chore: separate summary from remarks
iancooper Dec 18, 2024
756fca4
feat: add async to RMQ; update to RMQ V7
iancooper Dec 18, 2024
95d4b71
feat: add nullability to RMQ transport
iancooper Dec 19, 2024
d77adca
feat: add async operations to mssql transport
iancooper Dec 19, 2024
ca1cb53
chore: fix nullable issues for MS SQL transport
iancooper Dec 20, 2024
e52710a
chore: move Redis to using ChannelName and RoutingKey over string, to…
iancooper Dec 20, 2024
8964bbf
feat: upgrade the sychronizationcontext to work with the scheduler, d…
iancooper Dec 21, 2024
3c3ba3b
chore: reorg tests around reactor and proactor
iancooper Dec 21, 2024
684de10
Shift to run whole loop through synchronizationcontext over parts
iancooper Dec 21, 2024
ff37660
Make the choice of proactor/reactor easier to understand
iancooper Dec 21, 2024
6bf7ada
fix: switch to close implementation of Stephen Cleary's Nito as a sta…
iancooper Dec 21, 2024
8e8947f
fix: update ADR to reflect sources changes; update sources to reflect…
iancooper Dec 21, 2024
4ba1ee0
fix: need to be explicit about your pump type if not proactor now
iancooper Dec 21, 2024
e86fc83
fix: license file URI for Stephen Cleary was wrong
iancooper Dec 21, 2024
98b8465
feat: update ADR to show native support for proactor or reactor
iancooper Dec 21, 2024
3ba0678
fix: match proactor tests to reactor tests to flush issues
iancooper Dec 22, 2024
f008175
fix: add duplicates for the tests within Reactor, not in Proactor
iancooper Dec 22, 2024
8735ea6
fix: make rmq broker creation async; add cancellationtoken to sendasync
iancooper Dec 22, 2024
8e54725
fix: add async tests for RMQ, use to flush out issues with synchroniz…
iancooper Dec 23, 2024
c484c04
feat: add tests for internal syncrhonizationcontext, derived from Ste…
iancooper Dec 23, 2024
fb20ef8
fix: tests show that we should remove the reentrancy check; there may…
iancooper Dec 23, 2024
e90cb36
fix: add async disposable pattern to consumer
iancooper Dec 23, 2024
0e3098d
feat: support IAsyncDisposable
iancooper Dec 23, 2024
e1acf37
fix: remove GetAwaiter().GetResult() from hot paths
iancooper Dec 24, 2024
3cb7fdb
fix: Improve the ADR to reflect results; attribution in the README.md…
iancooper Dec 24, 2024
b7fdc34
chore: add some debugging support for thornier issues
iancooper Dec 24, 2024
111495f
chore: add better debug statements; helps to diagnose scheduler & con…
iancooper Dec 24, 2024
e1f416b
chore: add async versions of AWS tests
iancooper Dec 26, 2024
4046776
chore: fix missing interface member
iancooper Dec 27, 2024
d298561
chore: add missing interface methods
iancooper Dec 27, 2024
bd5a493
chore: async service bus tests
iancooper Dec 27, 2024
60e64a0
fix: confirm that if task scheduler reset correctly, we don't get a s…
iancooper Dec 27, 2024
efe687a
fix: note concerns on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
eef18d4
fix: another pass at exploring the edge case, that causes work to be …
iancooper Dec 27, 2024
a609951
fix: some fallback approaches in Post
iancooper Dec 27, 2024
472126c
Pull one issue with blocking in the Proactor pipeline out.
iancooper Dec 27, 2024
701caa3
fix: Notes on TaskScheduler and ConfigureAwait
iancooper Dec 27, 2024
c71ce3f
fix: update ADR link
iancooper Dec 27, 2024
7c40cfa
fix: allow asynchronous producer creation (mainly useful for AWS now)
iancooper Dec 28, 2024
b95d99c
fix: add async tests to Kafka; remove spurious finalize call
iancooper Dec 28, 2024
933433e
fix: add async to mqtt
iancooper Dec 29, 2024
8ba6e67
fix: add async for mssql
iancooper Dec 29, 2024
4733e0d
fix: Add async Redis tests
iancooper Dec 29, 2024
122caad
chore: syntax modernization
iancooper Dec 29, 2024
aa01361
Merge branch 'master' into synccontext
iancooper Dec 29, 2024
b01ce53
fix: accidental sample drop
iancooper Dec 29, 2024
841e472
Merge remote-tracking branch 'origin/synccontext' into synccontext
iancooper Dec 29, 2024
4557375
chore: merge branch 'master' into synccontext
iancooper Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

113 changes: 113 additions & 0 deletions src/Paramore.Brighter/BrighterSynchronizationContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Threading;

//Based on:
// https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/
// https://www.codeproject.com/Articles/5274751/Understanding-the-SynchronizationContext-in-NET-wi
// https://raw.githubusercontent.com/Microsoft/vs-threading/refs/heads/main/src/Microsoft.VisualStudio.Threading/SingleThreadedSynchronizationContext.cs

namespace Paramore.Brighter
{
public class BrighterSynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<Message> _queue = new();
private int _operationCount;
private readonly int _ownedThreadId = Environment.CurrentManagedThreadId;

/// <inheritdoc/>
public override void OperationCompleted()
{
if (Interlocked.Decrement(ref _operationCount) == 0)
Complete();
}

/// <inheritdoc/>
public override void OperationStarted()
{
Interlocked.Increment(ref _operationCount);
}

/// <inheritdoc/>
public override void Post(SendOrPostCallback d, object? state)
{
if (d == null) throw new ArgumentNullException(nameof(d));
_queue.Add(new Message(d, state));
}

/// <inheritdoc/>
public override void Send(SendOrPostCallback d, object? state)
{
if (_ownedThreadId == Environment.CurrentManagedThreadId)
{
try
{
d(state);
}
catch (Exception ex)
{
throw new TargetInvocationException(ex);
}
}
else
{
Exception? caughtException = null;
var evt = new ManualResetEventSlim();
try
{
_queue.Add(new Message(
s =>
{
try
{
d(state);
}
catch (Exception ex)
{
caughtException = ex;
}
finally
{
evt.Set();
}
},
state,
evt));

evt.Wait();


if (caughtException != null)
{
throw new TargetInvocationException(caughtException);
}
}
finally
{
evt.Dispose();
}
}
}

/// <summary>Runs a loop to process all queued work items.</summary>
public void RunOnCurrentThread()
{
foreach (var message in _queue.GetConsumingEnumerable())
{
message.Callback(message.State);
message.FinishedEvent?.Set();
}
}

/// <summary>Notifies the context that no more work will arrive.</summary>
private void Complete() { _queue.CompleteAdding(); }

private struct Message(SendOrPostCallback callback, object? state, ManualResetEventSlim? finishedEvent = null)
{
public readonly SendOrPostCallback Callback = callback;
public readonly object? State = state;
public readonly ManualResetEventSlim? FinishedEvent = finishedEvent;
}
}
}
Loading