Skip to content

Commit

Permalink
fix: use message receiver io processor to correctly stop message pump…
Browse files Browse the repository at this point in the history
… during circuit breaker (#444)

* Upgrade packages

* update package

* package updates

* remove duplicate package ref

* detach eventhandlers before stopping ServiceBusProcessor

* pr-fix: fully use message receiver

* pr-fix: use newest features in messaging package

* pr-fix: update az packages + message pump accessibility check

* pr-fix: set is-started in receive messages

* pr-fix: correct accessibility on message pump

* pr-fix: introduce resume functionality

* pr-fix: mark the interval processing as long-running

* pr-fix: intro circuit breaker message handler

* pr-fix: correct worker logging

* pr-fix: stabelize with file system as event source

* temp commit

* pr-sug: use private set for circuit breaker state

* pr-fix: correctly auto-complete message

* pr-fix: add additional test verifications

* Update src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs

Co-authored-by: Frederik Gheysels <frederik.gheysels@telenet.be>

* pr-fix: auto-dead-letter when no message handler can process message

* pr-fix: add exception handling for fallback registrations

* pr-fix: correct receiver tests

* pr-fix: receive on sensors

* pr-fix: auto-abandon & -dead-letter in router

* pr-fix: null-check and updated unit tests

* pr-fix: transient complete + safeguard missing message

* pr-fix: correct namespace connection string

* pr-fix: use managed identity

* pr-fix: skip for now unavailable system tests

* pr-fix: use dedicated namespace connection string

* pr-fix: correct appsettings.json

* pr-fix: remove delay 1 day

* pr-fix: activate circuit-breaker tests

* pr-sug: rename get circuit-breaker state expose method

* Improve circuit-breaker handling

* safeguards in pause

* remove unused private

* fix compiler warnings

* fix test

* code layout

* feat: change circuitbreaker approach (#453)

* processing spike

* code cleanup

* pr-sug: use message processing result io boolean

* pr-sug: promote circuit breaker state enum to class

* pr-fix: throw-if-null is not available in net-standard

* pr-fix: correct usings in az service bus message pump

* pr-sug: add message id context to the processing result

* pr-fix: correct time-out for resiliency tests

* pr-fix: remove useless dev-test

* pr-fix: correct recieved message creation in unit tests

* pr-fix: more stable post-assertion resilence

* pr-fix: use back the message id for the message processing result

* pr-sug: finishing touches on circuit breaker state transitioning

* pr-fix: streamline equalization in circuit breaker state

* pr-fix: let router abbandon message io circuit breaker handler

* pr-sug: rename wait interval method + fix wait recovery period log

* pr-fix: complete renaming in message pump

* pr-sug: use transition method for open state

* pr-sug: add half-open state boolean flag

* pr-fix: limit processing of single message on queue

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <frederik.gheysels@telenet.be>

* pr-sug: reframe summary and remarks wording in circuit breaker states

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <frederik.gheysels@telenet.be>

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

Co-authored-by: Frederik Gheysels <frederik.gheysels@telenet.be>

* Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs

---------

Co-authored-by: stijnmoreels <9039753+stijnmoreels@users.noreply.github.com>

* pr-sug: correct is started description

---------

Co-authored-by: Frederik Gheysels <frederik.gheysels@codit.eu>
Co-authored-by: Frederik Gheysels <frederik.gheysels@telenet.be>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent a83a3b3 commit 0c93d3c
Show file tree
Hide file tree
Showing 42 changed files with 1,323 additions and 595 deletions.
51 changes: 31 additions & 20 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,30 @@ To interact with the message processing system within your custom message handle
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;

public class OrderMessageHandler : IAzureServiceBusMessageHandler<Order>
public class OrderMessageHandler : CircuitBreakerServiceBusMessageHandler<Order>
{
private readonly IMessagePumpCircuitBreaker _circuitBreaker;

public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker)
public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker, ILogger<...> logger) : base(circuitBreaker, logger)
{
_circuitBreaker = circuitBreaker;
}

public async Task ProcessMessageAsync(Order message, AzureServiceBusMessageContext messageContext, ...)
public override async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessagePumpCircuitBreakerOptions options,
...)
{
// Determine whether your dependent system is healthy...
// If not, call the circuit breaker, processing will be halted temporarily.
await _circuitBreaker.PauseMessageProcessingAsync(messageContext.JobId);
if (!IsDependentSystemHealthy())
{
throw new ...Exception("My dependency system is temporarily unavailable, please halt message processing for now");
}
else
{
// Process your message...
}
}
}
```
Expand All @@ -47,24 +56,26 @@ The message pump will by default act in the following pattern:
* Circuit breaker calls `Pause`
* Message pump stops processing messages for a period of time (circuit is OPEN).
* Message pump tries processing a single message (circuit is HALF-OPEN).
* Dependency still unhealthy? => Tries again later (circuit is OPEN)
* Dependency healthy? => Message pump starts receiving message in full again (circuit is CLOSED).
* Dependency still unhealthy? => circuit breaker pauses again (circuit is OPEN)
* Dependency healthy? => circuit breaker resumes, message pump starts receiving message in full again (circuit is CLOSED).

Both the recovery period after the circuit is open and the interval between messages when the circuit is half-open is configurable when calling the circuit breaker. These time periods are related to your dependent system and could change by the type of transient connection failure.

```csharp
await _circuitBreaker.PauseMessageProcessingAsync(
messageContext.JobId,
options =>
{
// Sets the time period the circuit breaker should wait before retrying to receive messages.
// A.k.a. the time period the circuit is closed (default: 30 seconds).
options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15);

// Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
// A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds).
options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5);
});
public override async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessagePumpCircuitBreakerOptions options,
...)
{
// Sets the time period the circuit breaker should wait before retrying to receive messages.
// A.k.a. the time period the circuit is closed (default: 30 seconds).
options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15);

// Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
// A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds).
options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5);
}
```

### Pause message processing for a fixed period of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ public class AzureServiceBusSystemProperties
private AzureServiceBusSystemProperties(ServiceBusReceivedMessage message)
{
Guard.NotNull(message, nameof(message), "Requires an Azure Service Bus received message to construct a set of Azure Service Bus system properties");

DeadLetterSource = message.DeadLetterSource;
DeliveryCount = message.DeliveryCount;
EnqueuedSequenceNumber = message.EnqueuedSequenceNumber;
EnqueuedTime = message.EnqueuedTime;
LockToken = message.LockToken;
IsLockTokenSet = message.LockToken != null;
IsLockTokenSet = message.LockToken != null && message.LockToken != Guid.Empty.ToString();
LockedUntil = message.LockedUntil;
IsReceived = message.SequenceNumber > -1;
SequenceNumber = message.SequenceNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static class ProcessMessageEventArgsExtensions
/// <exception cref="TypeNotFoundException">Thrown when the no Azure Service Bus receiver could be found on the <paramref name="args"/>.</exception>
/// <exception cref="InvalidOperationException">Thrown when no value could be found for the Azure Service Bus receiver on the <paramref name="args"/>.</exception>
/// <exception cref="InvalidCastException">Thrown when the value for the Azure Service Bus receiver on the <paramref name="args"/> wasn't the expected type.</exception>
[Obsolete("Service Bus receiver is used internally instead, no need to go via the processor")]
public static ServiceBusReceiver GetServiceBusReceiver(this ProcessMessageEventArgs args)
{
Guard.NotNull(args, nameof(args), "Requires an event args instance to retrieve the original Service Bus message receiver");
Expand Down
Loading

0 comments on commit 0c93d3c

Please sign in to comment.