Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Sync Over Async Improvements #3409

Merged
merged 64 commits into from
Dec 29, 2024
Merged

Fix: Sync Over Async Improvements #3409

merged 64 commits into from
Dec 29, 2024

Conversation

iancooper
Copy link
Member

We have some outstanding issues, to tackle in V10 (and perhaps backport to V9) that concern sync over async. For example see #3309. This has surfaced again through the V7 release of RabbitMQ, which is moving to async. This then raises a problem in PRs like #3408. It is also worth noting others identifying this problem in discussions such as this one.

Some goals:

  • Move the synchronization context to Brighter to make it available in producer scenarios if needed
  • Avoid using .Wait, .Result, or GetAwaiter().GetResult() in hot paths (probably does not matter in configuration at startup)
  • Use synchronizationcontext where appropriate to push continuations back onto a single thread to avoid thread pool starvation issues.

This is a set of fixes for some of the most problematic issues that we know of. I suspect this will be a whack-a-mole problem over time. They can be hard to spot in review, and the context for introducing an additional thread matters a lot, so it's not a one-size fits all answer.

We also need to add some tests, using tools from AsyncEx if needed

@iancooper iancooper added 2 - In Progress Bug v10 Allocal to a v10 release v9 Required for v9 release .NET Pull requests that update .net code labels Dec 5, 2024
@iancooper iancooper self-assigned this Dec 5, 2024
Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Code Health of new files: 10.00

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.03 (9.61 -> 9.65)

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.03 (8.62 -> 8.65)

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.01 (9.12 -> 9.13)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

@@ -53,7 +53,7 @@ public class MessagePumpAsync<TRequest> : MessagePump<TRequest> where TRequest :
/// <param name="requestContextFactory">A factory to create instances of request context, used to add context to a pipeline</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
public MessagePumpAsync(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ No longer an issue: Constructor Over-Injection
MessagePumpAsync is no longer above the threshold for number of arguments

@@ -53,7 +53,7 @@ public class MessagePumpAsync<TRequest> : MessagePump<TRequest> where TRequest :
/// <param name="requestContextFactory">A factory to create instances of request context, used to add context to a pipeline</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
public MessagePumpAsync(
public Proactor(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Constructor Over-Injection
Proactor has 7 arguments, threshold = 5

@@ -50,7 +50,7 @@ public class MessagePumpBlocking<TRequest> : MessagePump<TRequest> where TReques
/// <param name="requestContextFactory">A factory to create instances of request context, used to add context to a pipeline</param>
/// <param name="tracer">What is the tracer we will use for telemetry</param>
/// <param name="instrumentationOptions">When creating a span for <see cref="CommandProcessor"/> operations how noisy should the attributes be</param>
public MessagePumpBlocking(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ No longer an issue: Constructor Over-Injection
MessagePumpBlocking is no longer above the threshold for number of arguments

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.01 (9.12 -> 9.13)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.01 (9.17 -> 9.19)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.01 (9.17 -> 9.19)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.02 (9.18 -> 9.20)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: OK

Change in average Code Health of affected files: +0.03 (9.19 -> 9.21)

  • Improving Code Health: 2 findings(s) ✅

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.04 (9.30 -> 9.26)

  • Declining Code Health: 6 findings(s) 🚩
  • Improving Code Health: 5 findings(s) ✅
  • Affected Hotspots: 1 files(s) 🔥

View detailed results in CodeScene

@@ -48,15 +48,15 @@ namespace Paramore.Brighter.ServiceActivator
/// Retry and circuit breaker should be provided by exception policy using an attribute on the handler
/// Timeout on the handler should be provided by timeout policy using an attribute on the handler
/// </summary>
public abstract class MessagePump<TRequest> : IAmAMessagePump where TRequest : class, IRequest
public abstract class MessagePump<TRequest> where TRequest : class, IRequest

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ No longer an issue: Overall Code Complexity
The mean cyclomatic complexity in this module is no longer above the threshold

Comment on lines +133 to +316
{
RequestContext context = InitRequestContext(span, message);

var request = TranslateMessage(message, context);

CommandProcessorProvider.CreateScope();

DispatchRequest(message.Header, request, context);

span?.SetStatus(ActivityStatusCode.Ok);
}
catch (AggregateException aggregateException)
{
var stop = false;
var defer = false;

foreach (var exception in aggregateException.InnerExceptions)
{
if (exception is ConfigurationException configurationException)
{
s_logger.LogCritical(configurationException, "MessagePump: Stopping receiving of messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
stop = true;
break;
}

if (exception is DeferMessageAction)
{
defer = true;
continue;
}

s_logger.LogError(exception, "MessagePump: Failed to dispatch message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
}

if (defer)
{
s_logger.LogDebug("MessagePump: Deferring message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
span?.SetStatus(ActivityStatusCode.Error, $"Deferring message {message.Id} for later action");
if (RequeueMessage(message))
continue;
}

if (stop)
{
RejectMessage(message);
span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Stopping receiving of messages from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
Channel.Dispose();
break;
}

span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Failed to dispatch message {message.Id} from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
}
catch (ConfigurationException configurationException)
{
s_logger.LogCritical(configurationException,"MessagePump: Stopping receiving of messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
RejectMessage(message);
span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Stopping receiving of messages from {Channel.Name} on thread # {Environment.CurrentManagedThreadId}");
Channel.Dispose();
break;
}
catch (DeferMessageAction)
{
s_logger.LogDebug("MessagePump: Deferring message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

span?.SetStatus(ActivityStatusCode.Error, $"Deferring message {message.Id} for later action");

if (RequeueMessage(message)) continue;
}
catch (MessageMappingException messageMappingException)
{
s_logger.LogWarning(messageMappingException, "MessagePump: Failed to map message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

IncrementUnacceptableMessageLimit();

span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Failed to map message {message.Id} from {Channel.Name} with {Channel.RoutingKey} on thread # {Thread.CurrentThread.ManagedThreadId}");
}
catch (Exception e)
{
s_logger.LogError(e,
"MessagePump: Failed to dispatch message '{Id}' from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}",
message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

span?.SetStatus(ActivityStatusCode.Error,$"MessagePump: Failed to dispatch message '{message.Id}' from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
}
finally
{
Tracer?.EndSpan(span);
CommandProcessorProvider.ReleaseScope();
}

AcknowledgeMessage(message);

} while (true);

s_logger.LogInformation(
"MessagePump0: Finished running message loop, no longer receiving messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}",
Channel.Name, Channel.RoutingKey, Thread.CurrentThread.ManagedThreadId);
Tracer?.EndSpan(pumpSpan);

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Complex Method
Run has a cyclomatic complexity of 29, threshold = 9

Suppress

Comment on lines +133 to +316
{
RequestContext context = InitRequestContext(span, message);

var request = TranslateMessage(message, context);

CommandProcessorProvider.CreateScope();

DispatchRequest(message.Header, request, context);

span?.SetStatus(ActivityStatusCode.Ok);
}
catch (AggregateException aggregateException)
{
var stop = false;
var defer = false;

foreach (var exception in aggregateException.InnerExceptions)
{
if (exception is ConfigurationException configurationException)
{
s_logger.LogCritical(configurationException, "MessagePump: Stopping receiving of messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
stop = true;
break;
}

if (exception is DeferMessageAction)
{
defer = true;
continue;
}

s_logger.LogError(exception, "MessagePump: Failed to dispatch message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
}

if (defer)
{
s_logger.LogDebug("MessagePump: Deferring message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
span?.SetStatus(ActivityStatusCode.Error, $"Deferring message {message.Id} for later action");
if (RequeueMessage(message))
continue;
}

if (stop)
{
RejectMessage(message);
span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Stopping receiving of messages from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
Channel.Dispose();
break;
}

span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Failed to dispatch message {message.Id} from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
}
catch (ConfigurationException configurationException)
{
s_logger.LogCritical(configurationException,"MessagePump: Stopping receiving of messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);
RejectMessage(message);
span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Stopping receiving of messages from {Channel.Name} on thread # {Environment.CurrentManagedThreadId}");
Channel.Dispose();
break;
}
catch (DeferMessageAction)
{
s_logger.LogDebug("MessagePump: Deferring message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

span?.SetStatus(ActivityStatusCode.Error, $"Deferring message {message.Id} for later action");

if (RequeueMessage(message)) continue;
}
catch (MessageMappingException messageMappingException)
{
s_logger.LogWarning(messageMappingException, "MessagePump: Failed to map message {Id} from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}", message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

IncrementUnacceptableMessageLimit();

span?.SetStatus(ActivityStatusCode.Error, $"MessagePump: Failed to map message {message.Id} from {Channel.Name} with {Channel.RoutingKey} on thread # {Thread.CurrentThread.ManagedThreadId}");
}
catch (Exception e)
{
s_logger.LogError(e,
"MessagePump: Failed to dispatch message '{Id}' from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}",
message.Id, Channel.Name, Channel.RoutingKey, Environment.CurrentManagedThreadId);

span?.SetStatus(ActivityStatusCode.Error,$"MessagePump: Failed to dispatch message '{message.Id}' from {Channel.Name} with {Channel.RoutingKey} on thread # {Environment.CurrentManagedThreadId}");
}
finally
{
Tracer?.EndSpan(span);
CommandProcessorProvider.ReleaseScope();
}

AcknowledgeMessage(message);

} while (true);

s_logger.LogInformation(
"MessagePump0: Finished running message loop, no longer receiving messages from {ChannelName} with {RoutingKey} on thread # {ManagementThreadId}",
Channel.Name, Channel.RoutingKey, Thread.CurrentThread.ManagedThreadId);
Tracer?.EndSpan(pumpSpan);

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Bumpy Road Ahead
Run has 3 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function

Suppress

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Overall Code Complexity
This module has a mean cyclomatic complexity of 5.33 across 9 functions. The mean complexity threshold is 4

Suppress

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.01 (9.31 -> 9.32)

  • Declining Code Health: 6 findings(s) 🚩
  • Improving Code Health: 6 findings(s) ✅
  • Affected Hotspots: 1 files(s) 🔥

View detailed results in CodeScene

@iancooper
Copy link
Member Author

Added #3418

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.03 (9.47 -> 9.50)

  • Declining Code Health: 26 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.03 (9.48 -> 9.52)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Comment on lines +93 to +117
if (s_pool is null)
throw new ChannelFailureException("RedisMessageProducer: Connection pool has not been initialized");

using var client = s_pool.Value.GetClient();
Topic = message.Header.Topic;

s_logger.LogDebug("RedisMessageProducer: Preparing to send message");

var redisMessage = CreateRedisMessage(message);

s_logger.LogDebug(
"RedisMessageProducer: Publishing message with topic {Topic} and id {Id} and body: {Request}",
message.Header.Topic, message.Id.ToString(), message.Body.Value
);
//increment a counter to get the next message id
var nextMsgId = IncrementMessageCounter(client);
//store the message, against that id
StoreMessage(client, redisMessage, nextMsgId);
//If there are subscriber queues, push the message to the subscriber queues
var pushedTo = PushToQueues(client, nextMsgId);
s_logger.LogDebug(
"RedisMessageProducer: Published message with topic {Topic} and id {Id} and body: {Request} to queues: {3}",
message.Header.Topic, message.Id.ToString(), message.Body.Value, string.Join(", ", pushedTo)
);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 4 functions with similar structure: PushToQueues,PushToQueuesAsync,Send,SendAsync

Suppress

Comment on lines +40 to +58
[Fact]
public async Task When_the_topic_exists_and_sending_a_message_with_no_delay_it_should_send_the_message_to_the_correct_topicclient()
{
var messageBody = Encoding.UTF8.GetBytes("A message body");

_nameSpaceManagerWrapper.ResetState();
_nameSpaceManagerWrapper.Topics.Add("topic", []);

await _producer.SendAsync(new Message(
new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("topic"), MessageType.MT_EVENT),
new MessageBody(messageBody, "JSON"))
);

ServiceBusMessage sentMessage = _topicClient.SentMessages.First();

Assert.Equal(messageBody, sentMessage.Body.ToArray());
Assert.Equal("MT_EVENT", sentMessage.ApplicationProperties["MessageType"]);
Assert.Equal(1, _topicClient.ClosedCount);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 6 functions with similar structure: When_sending_a_command_message_type_message_with_delay_it_should_set_the_correct_messagetype_property,When_sending_a_command_message_type_message_with_no_delay_it_should_set_the_correct_messagetype_property,When_the_topic_does_not_exist_and_sending_a_message_with_a_delay_it_should_send_the_message_to_the_correct_topicclient,When_the_topic_does_not_exist_it_should_be_created_and_the_message_is_sent_to_the_correct_topicclient and 2 more functions

Suppress

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.04 (9.49 -> 9.54)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.49 -> 9.54)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.49 -> 9.54)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.50 -> 9.55)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.50 -> 9.55)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

@iancooper iancooper marked this pull request as ready for review December 29, 2024 13:22
Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.50 -> 9.55)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

@iancooper
Copy link
Member Author

This is a complex PR to review, as it makes changes throughout the codebase. It has one main goal: make the Reactor model blocking, and the Proactor model non-blocking, top-to-bottom.

There is an ADR associated with this change, it may be worth reading that ADR to understand what has been done. A summary:

  • Make Proactor async throughout, not just for user defined code
  • Use a version of Stephen Cleary's AsyncEx over our old synchronizationcontext
  • Use the AsyncEx variation to provide sync over async (so not Wait, Result, GetAwaiter().GetResult()
  • Wrap the Proactor message pump in that context/scheduler to allow us to terminate async in the thread
  • Provide async versions of all existing tests for the transports

Issues will emerge, but it may be better to merge and fix, as this PR is problematic sitting outside too long. Some issues:

  • We get some issues with AsyncEx where work is queued back to a stored ExecutionContext, following the SynchronizationContext being disposed. We may be able to use some prior art to solve this. It is possible we could switch back to our old context - but a trial of that proved it handled some scenarios worse that Stephen Cleary's or switch to Visual Studio's SingleThreadedSynchronizationContext. I am inclined to understand and fix.

It is in the ADR but we should document the "native" choice where that exists i.e. Kafka is Reactor and RMQ v7 is Proactor

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.50 -> 9.55)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.05 (9.50 -> 9.55)

  • Declining Code Health: 28 findings(s) 🚩
  • Improving Code Health: 22 findings(s) ✅
  • Affected Hotspots: 5 files(s) 🔥

View detailed results in CodeScene

@iancooper iancooper merged commit 9af3c0b into master Dec 29, 2024
11 of 20 checks passed
@iancooper iancooper deleted the synccontext branch December 29, 2024 17:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Done Bug .NET Pull requests that update .net code v9 Required for v9 release v10 Allocal to a v10 release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant