Skip to content

Commit

Permalink
feat: add circuit breaker support to service bus message pump (#416)
Browse files Browse the repository at this point in the history
* feat: add circuit breaker support to service bus message pump

* mend

* pr-fix: update w/ correct assert-x renaming

* pr-sug: increase default time periods

* pr-sug: add a warning to not being availble for event hubs message pumps

* Update MessagePumpCircuitBreakerOptions.cs

* pr-sug: rename sub-namespace to 'resiliency'

* pr-sug: add message processing result to the message pump try single msg functionality

* pr-fix: unit test ctor circuit breaker

* pr-fix: upgrade az identity
  • Loading branch information
stijnmoreels authored Dec 7, 2023
1 parent 0c11f1e commit 7a0b04f
Show file tree
Hide file tree
Showing 19 changed files with 775 additions and 19 deletions.
59 changes: 59 additions & 0 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,65 @@ Both and future systems also support some general functionality that will be exp

## Stop message pump when downstream is unable to keep up

### Pause message processing with a circuit breaker
When your message handler interacts with a dependency on an external resource, that resource may become unavailable. In that case you want to temporarily stop processing messages.

⚠️ This functionality is currently only available for the Azure Service Bus message pump.

⚠️ This functionality is not supported for the Azure Event Hubs message pump.

⚠️ This functionality is only available when interacting with message pumps, not in message router-only scenarios like Azure Functions.

To interact with the message processing system within your custom message handler, you can inject the `IMessagePumpCircuitBreaker`:

```csharp
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;

public class OrderMessageHandler : IAzureServiceBusMessageHandler<Order>
{
private readonly IMessagePumpCircuitBreaker _circuitBreaker;

public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker)
{
_circuitBreaker = circuitBreaker;
}

public async Task ProcessMessageAsync(Order message, AzureServiceBusMessageContext messageContext, ...)
{
// Determine whether your dependent system is healthy...
// If not, call the circuit breaker, processing will be halted temporarily.
await _circuitBreaker.PauseMessageProcessingAsync(messageContext.JobId);
}
}
```

The message pump will by default act in the following pattern:
* Circuit breaker calls `Pause`
* Message pump stops processing messages for a period of time (circuit is OPEN).
* Message pump tries processing a single message (circuit is HALF-OPEN).
* Dependency still unhealthy? => Tries again later (circuit is OPEN)
* Dependency healthy? => Message pump starts receiving message in full again (circuit is CLOSED).

Both the recovery period after the circuit is open and the interval between messages when the circuit is half-open is configurable when calling the circuit breaker. These time periods are related to your dependent system and could change by the type of transient connection failure.

```csharp
await _circuitBreaker.PauseMessageProcessingAsync(
messageContext.JobId,
options =>
{
// Sets the time period the circuit breaker should wait before retrying to receive messages.
// A.k.a. the time period the circuit is closed (default: 30 seconds).
options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15);

// Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
// A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds).
options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5);
});
```

### Pause message processing for a fixed period of time
⚡ If you use one of the message-type specific packages like `Arcus.Messaging.Pumps.EventHubs`, you will automatically get this functionality. If you implement your own message pump, please use the `services.AddMessagePump(...)` extension which makes sure that you also registers this functionality.

When messages are being processed by a system that works slower than the rate that messages are being received by the message pump, a rate problem could occur.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using GuardNet;

namespace Arcus.Messaging.Abstractions.MessageHandling
{
/// <summary>
/// Represents an outcome of a message that was processed by an <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.
/// </summary>
public class MessageProcessingResult
{
private MessageProcessingResult()
{
IsSuccessful = true;
}

private MessageProcessingResult(Exception processingException)
{
Guard.NotNull(processingException, nameof(processingException));

IsSuccessful = false;
ProcessingException = processingException;
}

/// <summary>
/// Gets the boolean flag that indicates whether this result represents a successful or unsuccessful outcome of a processed message.
/// </summary>
public bool IsSuccessful { get; }

/// <summary>
/// Gets the exception that occurred during the message processing that represents the cause of the processing failure.
/// </summary>
/// <remarks>
/// Only available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
/// </remarks>
public Exception ProcessingException { get; }

/// <summary>
/// Gets an <see cref="MessageProcessingResult"/> instance that represents a result of a message was successfully processed.
/// </summary>
public static MessageProcessingResult Success => new MessageProcessingResult();

/// <summary>
/// Creates an <see cref="MessageProcessingResult"/> instance that represents a result of a message that was unsuccessfully processed.
/// </summary>
/// <param name="processingException">The exception that occurred during the message processing that represents the cause of the processing failure.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="processingException"/> is blank.</exception>
public static MessageProcessingResult Failure(Exception processingException)
{
Guard.NotNull(processingException, nameof(processingException));
return new MessageProcessingResult(processingException);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Threading.Tasks;
using GuardNet;

// ReSharper disable once CheckNamespace
namespace Arcus.Messaging.Pumps.Abstractions.Resiliency
{
/// <summary>
/// Extensions on the <see cref="IMessagePumpCircuitBreaker"/> for more dev-friendly interaction.
/// </summary>
// ReSharper disable once InconsistentNaming
public static class IMessagePumpCircuitBreakerExtensions
{
/// <summary>
/// Pause the process of receiving messages in the message pump for a period of time before careful retrying again.
/// </summary>
/// <param name="circuitBreaker">The instance to interact with.</param>
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public static async Task PauseMessageProcessingAsync(
this IMessagePumpCircuitBreaker circuitBreaker,
string jobId)
{
Guard.NotNull(circuitBreaker, nameof(circuitBreaker));
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

await circuitBreaker.PauseMessageProcessingAsync(jobId, _ => { });
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using Arcus.Messaging.Pumps.Abstractions;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
Expand All @@ -28,6 +30,9 @@ public static IServiceCollection AddMessagePump<TMessagePump>(
Guard.NotNull(implementationFactory, nameof(implementationFactory), "Requires a factory implementation function to create the custom message pump instance");

services.TryAddSingleton<IMessagePumpLifetime, DefaultMessagePumpLifetime>();
services.TryAddSingleton<IMessagePumpCircuitBreaker>(
provider => new DefaultMessagePumpCircuitBreaker(provider, provider.GetService<ILogger<DefaultMessagePumpCircuitBreaker>>()));

return services.AddHostedService(implementationFactory);
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
using GuardNet;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -96,6 +98,17 @@ public override async Task StopAsync(CancellationToken cancellationToken)
await base.StopAsync(cancellationToken);
}

/// <summary>
/// Try to process a single message after the circuit was broken, a.k.a entering the half-open state.
/// </summary>
/// <returns>
/// [Success] when the related message handler can again process messages and the message pump can again start receive messages in full; [Failure] otherwise.
/// </returns>
public virtual Task<MessageProcessingResult> TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)
{
return Task.FromResult(MessageProcessingResult.Success);
}

/// <summary>
/// Start with receiving messages on this message pump.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions.MessageHandling;
using GuardNet;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Arcus.Messaging.Pumps.Abstractions.Resiliency
{
/// <summary>
/// Represents a default implementation of the <see cref="IMessagePumpCircuitBreaker"/>
/// that starts and stops a configured message pump by its configured <see cref="MessagePumpCircuitBreakerOptions"/>.
/// </summary>
public class DefaultMessagePumpCircuitBreaker : IMessagePumpCircuitBreaker
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="DefaultMessagePumpCircuitBreaker" /> class.
/// </summary>
/// <param name="serviceProvider">The application services to retrieve the registered <see cref="MessagePump"/>.</param>
/// <param name="logger">The logger instance to write diagnostic messages during the inspection of healthy message pumps.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
public DefaultMessagePumpCircuitBreaker(IServiceProvider serviceProvider, ILogger<DefaultMessagePumpCircuitBreaker> logger)
{
Guard.NotNull(serviceProvider, nameof(serviceProvider));

_serviceProvider = serviceProvider;
_logger = logger ?? NullLogger<DefaultMessagePumpCircuitBreaker>.Instance;
}

/// <summary>
/// Pause the process of receiving messages in the message pump for a period of time before careful retrying again.
/// </summary>
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <param name="configureOptions">The optional user-configurable options to manipulate the workings of the message pump interaction.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
public virtual async Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
{
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

_logger.LogTrace("Open circuit by pausing message processing for message pump '{JobId}'...", jobId);

var options = new MessagePumpCircuitBreakerOptions();
configureOptions?.Invoke(options);

MessagePump messagePump = GetRegisteredMessagePump(jobId);
await messagePump.StopProcessingMessagesAsync(CancellationToken.None);

await WaitUntilRecoveredAsync(messagePump, options);
await messagePump.StartProcessingMessagesAsync(CancellationToken.None);
}

/// <summary>
/// Get the registered <see cref="MessagePump"/> from the application services
/// for which to pause the process of receiving messages.
/// </summary>
/// <exception cref="InvalidOperationException">Thrown when not a single or more than one message pump could be found by the configured job ID.</exception>
protected MessagePump GetRegisteredMessagePump(string jobId)
{
Guard.NotNullOrWhitespace(jobId, nameof(jobId));

MessagePump[] messagePumps =
_serviceProvider.GetServices<IHostedService>()
.OfType<MessagePump>()
.Where(p => p.JobId == jobId)
.ToArray();

if (messagePumps.Length == 0)
{
throw new InvalidOperationException(
$"Cannot find one correct registered message pump with job ID: '{jobId}', please make sure to register a single message pump instance in the application services with this job ID");
}

if (messagePumps.Length > 1)
{
throw new InvalidOperationException(
$"Cannot find one correct registered message pump as multiple pump instances were registered with the same job ID: '{jobId}', please make sure to only register a single message pump instance in the application services with this job ID");
}

return messagePumps.First();
}

private async Task WaitUntilRecoveredAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
{
_logger.LogTrace("Wait configured recovery period ({RecoveryPeriod}) before trying to close circuit for message pump '{JobId}'", options.MessageRecoveryPeriod, messagePump.JobId);
await Task.Delay(options.MessageRecoveryPeriod);

bool isRecovered = false;
while (!isRecovered)
{
MessageProcessingResult processingResult = await messagePump.TryProcessProcessSingleMessageAsync(options);
isRecovered = processingResult.IsSuccessful;

if (isRecovered)
{
_logger.LogTrace("Message pump '{JobId}' successfully handled a single message, closing circuit...", messagePump.JobId);
}
else
{
_logger.LogError(processingResult.ProcessingException, "Message pump '{JobId}' failed to handle a single message: {Message}, wait configured interval period ({IntervalPeriod}) before retrying...", messagePump.JobId, processingResult.ProcessingException.Message, options.MessageIntervalDuringRecovery);
await Task.Delay(options.MessageIntervalDuringRecovery);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Threading.Tasks;

namespace Arcus.Messaging.Pumps.Abstractions.Resiliency
{
/// <summary>
/// Represents an instance to pause the process of receiving messages in the message pump until the message handler can process the messages again.
/// Usually injected in the message handler to handle transient connection failures with dependencies.
/// </summary>
public interface IMessagePumpCircuitBreaker
{
/// <summary>
/// Pause the process of receiving messages in the message pump for a period of time before careful retrying again.
/// </summary>
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
/// <param name="configureOptions">The optional user-configurable options to manipulate the workings of the message pump interaction.</param>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using GuardNet;

namespace Arcus.Messaging.Pumps.Abstractions.Resiliency
{
/// <summary>
/// Represents user-configurable options to manipulate the <see cref="IMessagePumpCircuitBreaker"/> functionality.
/// </summary>
public class MessagePumpCircuitBreakerOptions
{
private TimeSpan _messageRecoveryPeriod = TimeSpan.FromSeconds(30),
_messageIntervalDuringRecovery = TimeSpan.FromSeconds(10);

/// <summary>
/// Gets or sets the time period the circuit breaker should wait before retrying to receive messages.
/// A.k.a. the time period the circuit is open.
/// </summary>
/// <remarks>
/// Default uses 30 seconds recovery period.
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the <paramref name="value"/> does not represent a positive time period.</exception>
public TimeSpan MessageRecoveryPeriod
{
get => _messageRecoveryPeriod;
set
{
Guard.NotLessThanOrEqualTo(value, TimeSpan.Zero, nameof(value));
_messageRecoveryPeriod = value;
}
}

/// <summary>
/// Gets or sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
/// A.k.a. the time interval to receive messages during which the circuit is half-open.
/// </summary>
/// <remarks>
/// Default uses 10 seconds interval period.
/// </remarks>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the <paramref name="value"/> does not represent a positive time period.</exception>
public TimeSpan MessageIntervalDuringRecovery
{
get => _messageIntervalDuringRecovery;
set
{
Guard.NotLessThanOrEqualTo(value, TimeSpan.Zero, nameof(value));
_messageIntervalDuringRecovery = value;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.6.0" />
<PackageReference Include="Azure.Identity" Version="1.10.2" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="2.1.0" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 7a0b04f

Please sign in to comment.