diff --git a/docs/preview/02-Features/06-general-messaging.md b/docs/preview/02-Features/06-general-messaging.md index ff390e29..bcdbb89c 100644 --- a/docs/preview/02-Features/06-general-messaging.md +++ b/docs/preview/02-Features/06-general-messaging.md @@ -24,21 +24,30 @@ To interact with the message processing system within your custom message handle using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; using Arcus.Messaging.Pumps.Abstractions.Resiliency; -public class OrderMessageHandler : IAzureServiceBusMessageHandler +public class OrderMessageHandler : CircuitBreakerServiceBusMessageHandler { private readonly IMessagePumpCircuitBreaker _circuitBreaker; - public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker) + public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker, ILogger<...> logger) : base(circuitBreaker, logger) { _circuitBreaker = circuitBreaker; } - public async Task ProcessMessageAsync(Order message, AzureServiceBusMessageContext messageContext, ...) + public override async Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessagePumpCircuitBreakerOptions options, + ...) { // Determine whether your dependent system is healthy... - - // If not, call the circuit breaker, processing will be halted temporarily. - await _circuitBreaker.PauseMessageProcessingAsync(messageContext.JobId); + if (!IsDependentSystemHealthy()) + { + throw new ...Exception("My dependency system is temporarily unavailable, please halt message processing for now"); + } + else + { + // Process your message... + } } } ``` @@ -47,24 +56,26 @@ The message pump will by default act in the following pattern: * Circuit breaker calls `Pause` * Message pump stops processing messages for a period of time (circuit is OPEN). * Message pump tries processing a single message (circuit is HALF-OPEN). - * Dependency still unhealthy? => Tries again later (circuit is OPEN) - * Dependency healthy? => Message pump starts receiving message in full again (circuit is CLOSED). + * Dependency still unhealthy? => circuit breaker pauses again (circuit is OPEN) + * Dependency healthy? => circuit breaker resumes, message pump starts receiving message in full again (circuit is CLOSED). Both the recovery period after the circuit is open and the interval between messages when the circuit is half-open is configurable when calling the circuit breaker. These time periods are related to your dependent system and could change by the type of transient connection failure. ```csharp -await _circuitBreaker.PauseMessageProcessingAsync( - messageContext.JobId, - options => - { - // Sets the time period the circuit breaker should wait before retrying to receive messages. - // A.k.a. the time period the circuit is closed (default: 30 seconds). - options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15); - - // Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery. - // A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds). - options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5); - }); + public override async Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessagePumpCircuitBreakerOptions options, + ...) +{ + // Sets the time period the circuit breaker should wait before retrying to receive messages. + // A.k.a. the time period the circuit is closed (default: 30 seconds). + options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15); + + // Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery. + // A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds). + options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5); +} ``` ### Pause message processing for a fixed period of time diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/AzureServiceBusSystemProperties.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/AzureServiceBusSystemProperties.cs index f903175c..ecb292cd 100644 --- a/src/Arcus.Messaging.Abstractions.ServiceBus/AzureServiceBusSystemProperties.cs +++ b/src/Arcus.Messaging.Abstractions.ServiceBus/AzureServiceBusSystemProperties.cs @@ -12,13 +12,13 @@ public class AzureServiceBusSystemProperties private AzureServiceBusSystemProperties(ServiceBusReceivedMessage message) { Guard.NotNull(message, nameof(message), "Requires an Azure Service Bus received message to construct a set of Azure Service Bus system properties"); - + DeadLetterSource = message.DeadLetterSource; DeliveryCount = message.DeliveryCount; EnqueuedSequenceNumber = message.EnqueuedSequenceNumber; EnqueuedTime = message.EnqueuedTime; LockToken = message.LockToken; - IsLockTokenSet = message.LockToken != null; + IsLockTokenSet = message.LockToken != null && message.LockToken != Guid.Empty.ToString(); LockedUntil = message.LockedUntil; IsReceived = message.SequenceNumber > -1; SequenceNumber = message.SequenceNumber; diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/Extensions/ProcessMessageEventArgsExtensions.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/Extensions/ProcessMessageEventArgsExtensions.cs index 6d86e2e3..4ec58620 100644 --- a/src/Arcus.Messaging.Abstractions.ServiceBus/Extensions/ProcessMessageEventArgsExtensions.cs +++ b/src/Arcus.Messaging.Abstractions.ServiceBus/Extensions/ProcessMessageEventArgsExtensions.cs @@ -19,6 +19,7 @@ public static class ProcessMessageEventArgsExtensions /// Thrown when the no Azure Service Bus receiver could be found on the . /// Thrown when no value could be found for the Azure Service Bus receiver on the . /// Thrown when the value for the Azure Service Bus receiver on the wasn't the expected type. + [Obsolete("Service Bus receiver is used internally instead, no need to go via the processor")] public static ServiceBusReceiver GetServiceBusReceiver(this ProcessMessageEventArgs args) { Guard.NotNull(args, nameof(args), "Requires an event args instance to retrieve the original Service Bus message receiver"); diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/AzureServiceBusMessageRouter.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/AzureServiceBusMessageRouter.cs index e1e99d2e..9e6dc7c2 100644 --- a/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/AzureServiceBusMessageRouter.cs +++ b/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/AzureServiceBusMessageRouter.cs @@ -13,6 +13,7 @@ using Microsoft.Extensions.Logging.Abstractions; using Serilog.Context; using ServiceBusFallbackMessageHandler = Arcus.Messaging.Abstractions.MessageHandling.FallbackMessageHandler; +using static Arcus.Messaging.Abstractions.MessageHandling.MessageProcessingError; namespace Arcus.Messaging.Abstractions.ServiceBus.MessageHandling { @@ -158,7 +159,7 @@ public override async Task RouteMessageAsync( /// Thrown when the , , or is null. /// /// Thrown when no message handlers or none matching message handlers are found to process the message. - public async Task RouteMessageAsync( + public async Task RouteMessageAsync( ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, MessageCorrelationInfo correlationInfo, @@ -168,7 +169,7 @@ public async Task RouteMessageAsync( Guard.NotNull(messageContext, nameof(messageContext), "Requires an Azure Service Bus message context in which the incoming message can be processed"); Guard.NotNull(correlationInfo, nameof(correlationInfo), "Requires an correlation information to correlate between incoming Azure Service Bus messages"); - await RouteMessageWithPotentialFallbackAsync( + return await RouteMessageWithPotentialFallbackAsync( messageReceiver: null, message: message, messageContext: messageContext, @@ -178,7 +179,7 @@ await RouteMessageWithPotentialFallbackAsync( /// /// Handle a new that was received by routing them through registered s - /// and optionally through an registered or + /// and optionally through a registered or /// if none of the message handlers were able to process the . /// /// @@ -193,7 +194,7 @@ await RouteMessageWithPotentialFallbackAsync( /// Thrown when the , , , or is null. /// /// Thrown when no message handlers or none matching message handlers are found to process the message. - public async Task RouteMessageAsync( + public async Task RouteMessageAsync( ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, @@ -205,12 +206,12 @@ public async Task RouteMessageAsync( Guard.NotNull(messageContext, nameof(messageContext), "Requires an Azure Service Bus message context in which the incoming message can be processed"); Guard.NotNull(correlationInfo, nameof(correlationInfo), "Requires an correlation information to correlate between incoming Azure Service Bus messages"); - await RouteMessageWithPotentialFallbackAsync(messageReceiver, message, messageContext, correlationInfo, cancellationToken); + return await RouteMessageWithPotentialFallbackAsync(messageReceiver, message, messageContext, correlationInfo, cancellationToken); } /// /// Handle a new that was received by routing them through registered s - /// and optionally through an registered or + /// and optionally through a registered or /// if none of the message handlers were able to process the . /// /// @@ -225,7 +226,7 @@ public async Task RouteMessageAsync( /// Thrown when the , , , or is null. /// /// Thrown when no message handlers or none matching message handlers are found to process the message. - protected async Task RouteMessageWithPotentialFallbackAsync( + protected async Task RouteMessageWithPotentialFallbackAsync( ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, @@ -236,29 +237,31 @@ protected async Task RouteMessageWithPotentialFallbackAsync( Guard.NotNull(messageContext, nameof(messageContext), "Requires an Azure Service Bus message context in which the incoming message can be processed"); Guard.NotNull(correlationInfo, nameof(correlationInfo), "Requires an correlation information to correlate between incoming Azure Service Bus messages"); - var isSuccessful = false; - using (DurationMeasurement measurement = DurationMeasurement.Start()) - using (IServiceScope serviceScope = ServiceProvider.CreateScope()) - using (LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfo, Options.CorrelationEnricher))) + string entityName = messageReceiver?.EntityPath ?? ""; + string serviceBusNamespace = messageReceiver?.FullyQualifiedNamespace ?? ""; + + using DurationMeasurement measurement = DurationMeasurement.Start(); + using IServiceScope serviceScope = ServiceProvider.CreateScope(); + using IDisposable _ = LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfo, Options.CorrelationEnricher)); + + try { - try - { - var accessor = serviceScope.ServiceProvider.GetService(); - accessor?.SetCorrelationInfo(correlationInfo); + var accessor = serviceScope.ServiceProvider.GetService(); + accessor?.SetCorrelationInfo(correlationInfo); - await TryRouteMessageWithPotentialFallbackAsync(serviceScope.ServiceProvider, messageReceiver, message, messageContext, correlationInfo, cancellationToken); - isSuccessful = true; - } - finally - { - string entityName = messageReceiver?.EntityPath ?? ""; - string serviceBusNamespace = messageReceiver?.FullyQualifiedNamespace ?? ""; - Logger.LogServiceBusRequest(serviceBusNamespace, entityName, Options.Telemetry.OperationName, isSuccessful, measurement, messageContext.EntityType); - } + MessageProcessingResult routingResult = await TryRouteMessageWithPotentialFallbackAsync(serviceScope.ServiceProvider, messageReceiver, message, messageContext, correlationInfo, cancellationToken); + + Logger.LogServiceBusRequest(serviceBusNamespace, entityName, Options.Telemetry.OperationName, routingResult.IsSuccessful, measurement, messageContext.EntityType); + return routingResult; + } + catch + { + Logger.LogServiceBusRequest(serviceBusNamespace, entityName, Options.Telemetry.OperationName, false, measurement, messageContext.EntityType); + throw; } } - private async Task TryRouteMessageWithPotentialFallbackAsync( + private async Task TryRouteMessageWithPotentialFallbackAsync( IServiceProvider serviceProvider, ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, @@ -269,7 +272,11 @@ private async Task TryRouteMessageWithPotentialFallbackAsync( try { MessageHandler[] messageHandlers = GetRegisteredMessageHandlers(serviceProvider).ToArray(); - EnsureAnyMessageHandlerAvailable(messageHandlers, messageContext); + if (messageHandlers.Length <= 0) + { + await DeadLetterMessageNoHandlerRegisteredAsync(messageReceiver, message, messageContext); + return MessageProcessingResult.Failure(message.MessageId, CannotFindMatchedHandler, "Failed to process message in the message pump as no message handler is registered in the dependency container"); + } Encoding encoding = messageContext.GetMessageEncodingProperty(Logger); string messageBody = encoding.GetString(message.Body.ToArray()); @@ -288,59 +295,76 @@ private async Task TryRouteMessageWithPotentialFallbackAsync( hasGoneThroughMessageHandler = true; if (isProcessed) { - return; + return MessageProcessingResult.Success(message.MessageId); } } } - if (!hasGoneThroughMessageHandler) + ServiceBusFallbackMessageHandler[] serviceBusFallbackHandlers = + GetAvailableFallbackMessageHandlersByContext(messageContext); + + FallbackMessageHandler[] generalFallbackHandlers = + GetAvailableFallbackMessageHandlersByContext(messageContext); + + bool fallbackAvailable = serviceBusFallbackHandlers.Length > 0 || generalFallbackHandlers.Length > 0; + + if (hasGoneThroughMessageHandler && !fallbackAvailable) + { + await AbandonMessageMatchedHandlerFailedAsync(messageReceiver, message, messageContext); + return MessageProcessingResult.Failure(message.MessageId, MatchedHandlerFailed, "Failed to process Azure Service Bus message in pump as the matched handler did not successfully processed the message and no fallback message handlers were configured"); + } + + if (!hasGoneThroughMessageHandler && !fallbackAvailable) + { + await DeadLetterMessageNoHandlerMatchedAsync(messageReceiver, message, messageContext); + return MessageProcessingResult.Failure(message.MessageId, CannotFindMatchedHandler, "Failed to process message in pump as no message handler was matched against the message and no fallback message handlers were configured"); + } + + bool isProcessedByGeneralFallback = await TryFallbackProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken); + if (isProcessedByGeneralFallback) { - EnsureFallbackMessageHandlerAvailable(messageContext); + return MessageProcessingResult.Success(message.MessageId); } - await TryFallbackProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken); - await TryServiceBusFallbackMessageAsync(messageReceiver, message, messageContext, correlationInfo, cancellationToken); + return await TryServiceBusFallbackMessageAsync(messageReceiver, message, messageContext, correlationInfo, cancellationToken); + } catch (Exception exception) { Logger.LogCritical(exception, "Unable to process message with ID '{MessageId}'", message.MessageId); - throw; + if (messageReceiver != null) + { + await messageReceiver.AbandonMessageAsync(message); + } + + return MessageProcessingResult.Failure(message.MessageId, ProcessingInterrupted, "Failed to process message in pump as there was an unexpected critical problem during processing, please see the logs for more information", exception); } } - private void EnsureAnyMessageHandlerAvailable(MessageHandler[] messageHandlers, AzureServiceBusMessageContext messageContext) + private async Task DeadLetterMessageNoHandlerRegisteredAsync(ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext) { - ServiceBusFallbackMessageHandler[] serviceBusFallbackHandlers = - GetAvailableFallbackMessageHandlersByContext(messageContext); - - FallbackMessageHandler[] generalFallbackHandlers = - GetAvailableFallbackMessageHandlersByContext(messageContext); - - if (messageHandlers.Length <= 0 && serviceBusFallbackHandlers.Length <= 0 && generalFallbackHandlers.Length <= 0) + if (messageReceiver != null) { - throw new InvalidOperationException( - $"Azure Service Bus message router cannot correctly process the message in the '{nameof(AzureServiceBusMessageContext)}' " - + "because no 'IAzureServiceBusMessageHandler<>' was registered in the dependency injection container. " - + $"Make sure you call the correct 'WithServiceBusMessageHandler' extension on the {nameof(IServiceCollection)} " - + "during the registration of the Azure Service Bus message pump or message router to register a message handler"); + Logger.LogError("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as no message handler was matched against the message and no fallback message handlers was configured, dead-lettering message!", message.MessageId, messageContext.JobId); + await messageReceiver.DeadLetterMessageAsync(message); } } - private void EnsureFallbackMessageHandlerAvailable(AzureServiceBusMessageContext messageContext) + private async Task DeadLetterMessageNoHandlerMatchedAsync(ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext) { - ServiceBusFallbackMessageHandler[] serviceBusFallbackHandlers = - GetAvailableFallbackMessageHandlersByContext(messageContext); - - FallbackMessageHandler[] generalFallbackHandlers = - GetAvailableFallbackMessageHandlersByContext(messageContext); + if (messageReceiver != null) + { + Logger.LogError("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as no message handler was matched against the message and no fallback message handlers was configured, dead-lettering message!", message.MessageId, messageContext.JobId); + await messageReceiver.DeadLetterMessageAsync(message); + } + } - if (serviceBusFallbackHandlers.Length <= 0 && generalFallbackHandlers.Length <= 0) + private async Task AbandonMessageMatchedHandlerFailedAsync(ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext) + { + if (messageReceiver != null) { - throw new InvalidOperationException( - $"Azure Service Bus message router cannot correctly process the message in the '{nameof(AzureServiceBusMessageContext)}' " - + "because none of the registered 'IAzureServiceBusMessageHandler<,>' implementations in the dependency injection container matches the incoming message type and context; " - + $"and no '{nameof(IFallbackMessageHandler)}' or '{nameof(IAzureServiceBusFallbackMessageHandler)}' was registered to fall back to." - + $"Make sure you call the correct '.WithServiceBusMessageHandler' extension on the {nameof(IServiceCollection)} during the registration of the message pump or message router to register a message handler"); + Logger.LogError("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as the matched message handler did not successfully processed the message and no fallback message handlers configured, abandoning message!", message.MessageId, messageContext.JobId); + await messageReceiver.AbandonMessageAsync(message); } } @@ -389,7 +413,7 @@ protected void SetServiceBusPropertiesForSpecificOperations( /// /// Thrown when the , , , or is null. /// - protected async Task TryServiceBusFallbackMessageAsync( + protected async Task TryServiceBusFallbackMessageAsync( ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, @@ -402,12 +426,6 @@ protected async Task TryServiceBusFallbackMessageAsync( ServiceBusFallbackMessageHandler[] fallbackHandlers = GetAvailableFallbackMessageHandlersByContext(messageContext); - - if (fallbackHandlers.Length <= 0) - { - Logger.LogTrace("No Azure Service Bus message handlers found within message context (JobId: {JobId})", messageContext.JobId); - return; - } foreach (ServiceBusFallbackMessageHandler handler in fallbackHandlers) { @@ -431,11 +449,19 @@ protected async Task TryServiceBusFallbackMessageAsync( if (result) { Logger.LogTrace("Fallback message handler '{FallbackMessageHandlerType}' has processed the message", fallbackMessageHandlerTypeName); - break; + return MessageProcessingResult.Success(message.MessageId); } Logger.LogTrace("Fallback message handler '{FallbackMessageHandlerType}' was not able to process the message", fallbackMessageHandlerTypeName); } + + if (messageReceiver != null) + { + Logger.LogWarning("No fallback message handler processed the Azure Service Bus message '{MessageId}' in pump '{JobId}', abandoning message!", message.MessageId, messageContext.JobId); + await messageReceiver.AbandonMessageAsync(message); + } + + return MessageProcessingResult.Failure(message.MessageId, CannotFindMatchedHandler, "No fallback message handler processed the message"); } } } diff --git a/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/IAzureServiceBusMessageRouter.cs b/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/IAzureServiceBusMessageRouter.cs index 1f455f13..af4cd564 100644 --- a/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/IAzureServiceBusMessageRouter.cs +++ b/src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/IAzureServiceBusMessageRouter.cs @@ -13,7 +13,7 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter { /// /// Handle a new that was received by routing them through registered s - /// and optionally through an registered or + /// and optionally through a registered or /// if none of the message handlers were able to process the . /// /// The incoming message that needs to be routed through registered message handlers. @@ -29,7 +29,7 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter /// Thrown when the , , or is null. /// /// Thrown when no message handlers or none matching message handlers are found to process the message. - Task RouteMessageAsync( + Task RouteMessageAsync( ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, MessageCorrelationInfo correlationInfo, @@ -37,7 +37,7 @@ Task RouteMessageAsync( /// /// Handle a new that was received by routing them through registered s - /// and optionally through an registered or + /// and optionally through a registered or /// if none of the message handlers were able to process the . /// /// @@ -52,7 +52,7 @@ Task RouteMessageAsync( /// Thrown when the , , , or is null. /// /// Thrown when no message handlers or none matching message handlers are found to process the message. - Task RouteMessageAsync( + Task RouteMessageAsync( ServiceBusReceiver messageReceiver, ServiceBusReceivedMessage message, AzureServiceBusMessageContext messageContext, diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/FallbackMessageHandler.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/FallbackMessageHandler.cs index dc95fb4c..69e65a76 100644 --- a/src/Arcus.Messaging.Abstractions/MessageHandling/FallbackMessageHandler.cs +++ b/src/Arcus.Messaging.Abstractions/MessageHandling/FallbackMessageHandler.cs @@ -91,8 +91,16 @@ public async Task ProcessMessageAsync( if (CanProcessMessageBasedOnContext(messageContext)) { - await MessageHandlerInstance.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); - return true; + try + { + await MessageHandlerInstance.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken); + return true; + } + catch (Exception exception) + { + _logger.LogError(exception, "Fallback message handler '{MessageHandlerType}' failed to handle '{MessageType}' message due to an exception: {Message}", MessageHandlerType.Name, typeof(TMessage).Name, exception.Message); + return false; + } } _logger.LogTrace("Fallback message handler '{FallbackMessageHandlerType}' cannot handle message because it was not registered in the correct message context (JobId: {JobId})", MessageHandlerType.Name, _jobId); diff --git a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageProcessingResult.cs b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageProcessingResult.cs index f8e503b5..e49290be 100644 --- a/src/Arcus.Messaging.Abstractions/MessageHandling/MessageProcessingResult.cs +++ b/src/Arcus.Messaging.Abstractions/MessageHandling/MessageProcessingResult.cs @@ -1,24 +1,49 @@ using System; -using GuardNet; namespace Arcus.Messaging.Abstractions.MessageHandling { + /// + /// Represents all the possible errors of a . + /// + public enum MessageProcessingError + { + /// + /// Defines an error that shows that the message processing was interrupted by some external cause, + /// unrelated to the message routing. + /// + ProcessingInterrupted, + + /// + /// Defines an error shows that no implementation was found + /// that was able to process the received message. + /// + CannotFindMatchedHandler, + + /// + /// Defines and error that shows that the matched implementation + /// was unable to process the received message. + /// + MatchedHandlerFailed, + } + /// /// Represents an outcome of a message that was processed by an implementation. /// public class MessageProcessingResult { - private MessageProcessingResult() + private MessageProcessingResult(string messageId) { + MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId)); IsSuccessful = true; } - private MessageProcessingResult(Exception processingException) + private MessageProcessingResult(string messageId, MessageProcessingError error, string errorMessage, Exception processingException) { - Guard.NotNull(processingException, nameof(processingException)); - - IsSuccessful = false; + MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId)); + Error = error; + ErrorMessage = errorMessage; ProcessingException = processingException; + IsSuccessful = false; } /// @@ -27,27 +52,57 @@ private MessageProcessingResult(Exception processingException) public bool IsSuccessful { get; } /// - /// Gets the exception that occurred during the message processing that represents the cause of the processing failure. + /// Gets the unique ID to identify the message for which this is a processing result. + /// + public string MessageId { get; } + + /// + /// Gets the error type that shows which kind of error the message processing failed. + /// + /// + /// Only available when this processing result represents an unsuccessful message processing result - when is false. + /// + public MessageProcessingError Error { get; } + + /// + /// Gets the description that explains the context of the . /// /// /// Only available when this processing result represents an unsuccessful message processing result - when is false. /// + public string ErrorMessage { get; } + + /// + /// Gets the exception that occurred during the message processing that represents the cause of the processing failure. + /// + /// + /// Only possibly available when this processing result represents an unsuccessful message processing result - when is false. + /// public Exception ProcessingException { get; } /// /// Gets an instance that represents a result of a message was successfully processed. /// - public static MessageProcessingResult Success => new MessageProcessingResult(); + public static MessageProcessingResult Success(string messageId) => new(messageId); + + /// + /// Creates an instance that represents a result of a message that was unsuccessfully processed. + /// + public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage) + { + return new MessageProcessingResult(messageId, error, errorMessage, processingException: null); + } /// /// Creates an instance that represents a result of a message that was unsuccessfully processed. /// - /// The exception that occurred during the message processing that represents the cause of the processing failure. - /// Thrown when the is blank. - public static MessageProcessingResult Failure(Exception processingException) + public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage, Exception processingException) { - Guard.NotNull(processingException, nameof(processingException)); - return new MessageProcessingResult(processingException); + return new MessageProcessingResult( + messageId, + error, + errorMessage, + processingException ?? throw new ArgumentNullException(nameof(processingException))); } } } diff --git a/src/Arcus.Messaging.EventHubs.Core/Arcus.Messaging.EventHubs.Core.csproj b/src/Arcus.Messaging.EventHubs.Core/Arcus.Messaging.EventHubs.Core.csproj index 12c761d8..425c7ade 100644 --- a/src/Arcus.Messaging.EventHubs.Core/Arcus.Messaging.EventHubs.Core.csproj +++ b/src/Arcus.Messaging.EventHubs.Core/Arcus.Messaging.EventHubs.Core.csproj @@ -28,8 +28,8 @@ - - + + diff --git a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs index b1e471f5..12424ee1 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs @@ -2,7 +2,6 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using Arcus.Messaging.Abstractions.MessageHandling; using Arcus.Messaging.Pumps.Abstractions.Resiliency; using GuardNet; using Microsoft.Extensions.Configuration; @@ -42,9 +41,14 @@ protected MessagePump(IConfiguration configuration, IServiceProvider serviceProv public string JobId { get; protected set; } = Guid.NewGuid().ToString(); /// - /// Gets the boolean flag that indicates whether the message pump is started and receiving messages. + /// Gets the boolean flag that indicates whether the message pump is started. /// - public bool IsStarted { get; private set; } + public bool IsStarted { get; protected set; } + + /// + /// Gets the current state of the message pump within the circuit breaker context. + /// + public MessagePumpCircuitState CircuitState { get; private set; } = MessagePumpCircuitState.Closed; /// /// Gets hte ID of the client being used to connect to the messaging service. @@ -97,17 +101,6 @@ public override async Task StopAsync(CancellationToken cancellationToken) await base.StopAsync(cancellationToken); } - /// - /// Try to process a single message after the circuit was broken, a.k.a entering the half-open state. - /// - /// - /// [Success] when the related message handler can again process messages and the message pump can again start receive messages in full; [Failure] otherwise. - /// - public virtual Task TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options) - { - return Task.FromResult(MessageProcessingResult.Success); - } - /// /// Start with receiving messages on this message pump. /// @@ -115,6 +108,8 @@ public virtual Task TryProcessProcessSingleMessageAsync public virtual Task StartProcessingMessagesAsync(CancellationToken cancellationToken) { IsStarted = true; + CircuitState = MessagePumpCircuitState.Closed; + return Task.CompletedTask; } @@ -125,9 +120,56 @@ public virtual Task StartProcessingMessagesAsync(CancellationToken cancellationT public virtual Task StopProcessingMessagesAsync(CancellationToken cancellationToken) { IsStarted = false; + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open); + return Task.CompletedTask; } + /// + /// Waits a previously configured amount of time until the message pump is expected to be recovered (Closed to Open state). + /// + /// The token to cancel the wait period. + protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellationToken) + { + Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState); + await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken); + + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + } + + /// + /// Waits a previously configured amount of time until the next single message can be tried (Half-Open state). + /// + /// The token to cancel the wait period. + protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken cancellationToken) + { + Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState); + await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken); + + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen); + } + + /// + /// Notifies the message pump about the new state which pauses message retrieval. + /// + /// The additional accompanied options that goes with the new state. + internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions options) + { + Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState); + + CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options); + } + + /// + /// Notifies the message pump about the new state which resumes message retrieval. + /// + protected void NotifyResumeRetrievingMessages() + { + Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState); + + CircuitState = MessagePumpCircuitState.Closed; + } + /// /// Register information about the client connected to the messaging service /// diff --git a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/DefaultMessagePumpCircuitBreaker.cs b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/DefaultMessagePumpCircuitBreaker.cs index f3d86ae5..dc3c6490 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/DefaultMessagePumpCircuitBreaker.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/DefaultMessagePumpCircuitBreaker.cs @@ -1,8 +1,6 @@ using System; using System.Linq; -using System.Threading; using System.Threading.Tasks; -using Arcus.Messaging.Abstractions.MessageHandling; using GuardNet; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -40,27 +38,41 @@ public DefaultMessagePumpCircuitBreaker(IServiceProvider serviceProvider, ILogge /// The unique identifier to distinguish the message pump in the application services. /// The optional user-configurable options to manipulate the workings of the message pump interaction. /// Thrown when the is blank. - public virtual async Task PauseMessageProcessingAsync(string jobId, Action configureOptions) + public virtual Task PauseMessageProcessingAsync(string jobId, Action configureOptions) { Guard.NotNullOrWhitespace(jobId, nameof(jobId)); - _logger.LogTrace("Open circuit by pausing message processing for message pump '{JobId}'...", jobId); - - var options = new MessagePumpCircuitBreakerOptions(); - configureOptions?.Invoke(options); - MessagePump messagePump = GetRegisteredMessagePump(jobId); - if (messagePump.IsStarted) + + if (!messagePump.IsStarted) { - await messagePump.StopProcessingMessagesAsync(CancellationToken.None); - await WaitRecoveryTimeAsync(messagePump, options); - await TryProcessSingleMessageAsync(messagePump, options); + _logger.LogWarning("Cannot pause message pump '{JobId}' because the pump has not been started", jobId); + return Task.CompletedTask; } - else + + if (!messagePump.CircuitState.IsClosed) { - await WaitMessageIntervalAsync(messagePump, options); - await TryProcessSingleMessageAsync(messagePump, options); + return Task.CompletedTask; } + + var options = new MessagePumpCircuitBreakerOptions(); + configureOptions?.Invoke(options); + + messagePump.NotifyPauseReceiveMessages(options); + return Task.CompletedTask; + } + + /// + /// Gets the current circuit breaker state of message processing in the given message pump. + /// + /// The unique identifier to distinguish the message pump in the application services. + /// Thrown when the is blank. + public MessagePumpCircuitState GetCircuitBreakerState(string jobId) + { + Guard.NotNullOrWhitespace(jobId, nameof(jobId)); + + MessagePump messagePump = GetRegisteredMessagePump(jobId); + return messagePump.CircuitState; } /// @@ -92,37 +104,5 @@ protected MessagePump GetRegisteredMessagePump(string jobId) return messagePumps[0]; } - - private async Task TryProcessSingleMessageAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options) - { - bool isRecovered = false; - while (!isRecovered) - { - MessageProcessingResult processingResult = await messagePump.TryProcessProcessSingleMessageAsync(options); - isRecovered = processingResult.IsSuccessful; - - if (isRecovered) - { - _logger.LogTrace("Message pump '{JobId}' successfully handled a single message, closing circuit...", messagePump.JobId); - await messagePump.StartProcessingMessagesAsync(CancellationToken.None); - } - else - { - await WaitMessageIntervalAsync(messagePump, options); - } - } - } - - private async Task WaitMessageIntervalAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options) - { - _logger.LogError("Message pump '{JobId}' failed to handle a single message, wait configured interval period ({IntervalPeriod}) before retrying...", messagePump.JobId, options.MessageIntervalDuringRecovery); - await Task.Delay(options.MessageIntervalDuringRecovery); - } - - private async Task WaitRecoveryTimeAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options) - { - _logger.LogTrace("Wait configured recovery period ({RecoveryPeriod}) before trying to close circuit for message pump '{JobId}'", options.MessageRecoveryPeriod, messagePump.JobId); - await Task.Delay(options.MessageRecoveryPeriod); - } } } \ No newline at end of file diff --git a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs index 44031384..9ac687a1 100644 --- a/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs +++ b/src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs @@ -9,6 +9,13 @@ namespace Arcus.Messaging.Pumps.Abstractions.Resiliency /// public interface IMessagePumpCircuitBreaker { + /// + /// Gets the current circuit breaker state of message processing in the given message pump. + /// + /// The unique identifier to distinguish the message pump in the application services. + /// Thrown when the is blank. + MessagePumpCircuitState GetCircuitBreakerState(string jobId); + /// /// Pause the process of receiving messages in the message pump for a period of time before careful retrying again. /// @@ -17,4 +24,84 @@ public interface IMessagePumpCircuitBreaker /// Thrown when the is blank. Task PauseMessageProcessingAsync(string jobId, Action configureOptions); } + + /// + /// Represents the available states in the in which the message pump can transition into. + /// + internal enum CircuitBreakerState + { + Closed, HalfOpen, Open + } + + /// + /// Represents the available states in which the is presently in within the circuit breaker context + /// + public sealed class MessagePumpCircuitState + { + private readonly CircuitBreakerState _state; + + private MessagePumpCircuitState(CircuitBreakerState state) : this(state, new MessagePumpCircuitBreakerOptions()) + { + } + + private MessagePumpCircuitState(CircuitBreakerState state, MessagePumpCircuitBreakerOptions options) + { + _state = state; + + Options = options ?? throw new ArgumentNullException(nameof(options)); + } + + /// + /// Gets a boolean flag that indicates whether the message pump is in the 'Closed' circuit breaker state or not. + /// + /// + /// When the message pump is in the 'Closed' state, the pump is retrieving messages at a normal rate. + /// + public bool IsClosed => _state is CircuitBreakerState.Closed; + + /// + /// Gets a boolean flag that indicates whether the message pump is in the 'Open' circuit breaker state or not. + /// + /// + /// When the message pump is in the 'Open' state, the pump has stopped retrieving messages all together. + /// + public bool IsOpen => _state is CircuitBreakerState.Open; + + /// + /// Gets a boolean flag that indicates whether the message pump is in a 'Half-Open' state or not. + /// + /// + /// When the message pump is in the 'Half-Open' state, the pump retrieves a single message and verifies if the message can be processed. + /// If the message is processed successfully, the pump will transition back into the 'Closed' state. + /// + public bool IsHalfOpen => _state is CircuitBreakerState.HalfOpen; + + /// + /// Gets the accompanied additional options that manipulate the behavior of any given state. + /// + public MessagePumpCircuitBreakerOptions Options { get; } + + /// + /// Gets an instance of the class that represents a closed state, + /// in which the message pump is able to process messages normally. + /// + internal static MessagePumpCircuitState Closed => new(CircuitBreakerState.Closed); + + /// + /// Lets the current instance of the state transition to another state. + /// + internal MessagePumpCircuitState TransitionTo(CircuitBreakerState state, MessagePumpCircuitBreakerOptions options = null) + { + return new(state, options ?? Options); + } + + /// + /// Returns a string that represents the current object. + /// + /// A string that represents the current object. + public override string ToString() + { + return _state.ToString(); + } + } } diff --git a/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj b/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj index fc3b8df3..02497717 100644 --- a/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj +++ b/src/Arcus.Messaging.Pumps.EventHubs/Arcus.Messaging.Pumps.EventHubs.csproj @@ -28,8 +28,8 @@ - - + + diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs index 4ef35bb2..8bf17b2a 100644 --- a/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs +++ b/src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Arcus.Messaging.Abstractions; @@ -26,11 +28,10 @@ public class AzureServiceBusMessagePump : MessagePump, IRestartableMessagePump private readonly IAzureServiceBusMessageRouter _messageRouter; private readonly IDisposable _loggingScope; - private bool _ownsTopicSubscription = false; - private bool _isHostShuttingDown; - private ServiceBusProcessor _messageProcessor; - private ServiceBusReceiver _messageReceiver; + private bool _ownsTopicSubscription, _isHostShuttingDown; private int _unauthorizedExceptionCount; + private ServiceBusReceiver _messageReceiver; + private CancellationTokenSource _receiveMessagesCancellation; /// /// Initializes a new instance of the class. @@ -198,13 +199,76 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } } - /// - /// Try to process a single message after the circuit was broken, a.k.a entering the half-open state. - /// - /// - /// [Success] when the related message handler can again process messages and the message pump can again start receive messages in full; [Failure] otherwise. - /// - public override async Task TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options) + private static readonly TimeSpan MessagePollingWaitTime = TimeSpan.FromMilliseconds(300); + + /// + public override async Task StartProcessingMessagesAsync(CancellationToken cancellationToken) + { + if (IsStarted) + { + return; + } + + await base.StartProcessingMessagesAsync(cancellationToken); + + /* TODO: we can't support Azure Service Bus plug-ins yet because the new Azure SDK doesn't yet support this: + https://github.com/arcus-azure/arcus.messaging/issues/176 */ + + Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' started", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); + + Namespace = _messageReceiver.FullyQualifiedNamespace; + RegisterClientInformation(JobId, _messageReceiver.EntityPath); + + _receiveMessagesCancellation = new CancellationTokenSource(); + while (CircuitState.IsClosed + && !_messageReceiver.IsClosed + && !_receiveMessagesCancellation.IsCancellationRequested) + { + try + { + await ProcessMultipleMessagesAsync(cancellationToken); + + if (CircuitState.IsOpen) + { + await WaitMessageRecoveryPeriodAsync(cancellationToken); + + MessageProcessingResult singleProcessingResult; + do + { + singleProcessingResult = await TryProcessProcessSingleMessageAsync(); + if (!singleProcessingResult.IsSuccessful) + { + await WaitMessageIntervalDuringRecoveryAsync(cancellationToken); + } + + } while (!singleProcessingResult.IsSuccessful); + + NotifyResumeRetrievingMessages(); + } + } + catch (Exception exception) when (exception is TaskCanceledException or OperationCanceledException or ObjectDisposedException) + { + IsStarted = false; + + Logger.LogTrace("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}' was cancelled", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); + return; + } + catch (Exception exception) + { + await ProcessErrorAsync(exception, cancellationToken); + } + } + } + + private async Task ProcessMultipleMessagesAsync(CancellationToken cancellationToken) + { + IReadOnlyList messages = + await _messageReceiver.ReceiveMessagesAsync(Settings.Options.MaxConcurrentCalls, cancellationToken: _receiveMessagesCancellation.Token); + + await Task.WhenAll(messages.Select(msg => ProcessMessageAsync(msg, cancellationToken))); + } + + private async Task TryProcessProcessSingleMessageAsync() { if (_messageReceiver is null) { @@ -219,49 +283,25 @@ public override async Task TryProcessProcessSingleMessa while (message is null) { message = await _messageReceiver.ReceiveMessageAsync(); + if (message is null) + { + Logger.LogTrace("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' failed to receive a single message, trying again...", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); + await Task.Delay(MessagePollingWaitTime); + } } try { - await ProcessMessageAsync(new ProcessMessageEventArgs(message, _messageReceiver, CancellationToken.None)); - return MessageProcessingResult.Success; + MessageProcessingResult isSuccessfullyProcessed = await ProcessMessageAsync(message, CancellationToken.None); + return isSuccessfullyProcessed; } catch (Exception exception) { Logger.LogError(exception, "Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' failed to process single message during half-open circuit, retrying after circuit delay", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); - return MessageProcessingResult.Failure(exception); + return MessageProcessingResult.Failure(message.MessageId, MessageProcessingError.ProcessingInterrupted, "Failed to process single message during half-open circuit due to an unexpected exception", exception); } } - /// - public override async Task StartProcessingMessagesAsync(CancellationToken cancellationToken) - { - if (IsStarted) - { - return; - } - - await base.StartProcessingMessagesAsync(cancellationToken); - - if (_messageProcessor is null) - { - _messageProcessor = await Settings.CreateMessageProcessorAsync(); - } - - Namespace = _messageProcessor.FullyQualifiedNamespace; - - /* TODO: we can't support Azure Service Bus plug-ins yet because the new Azure SDK doesn't yet support this: - https://github.com/arcus-azure/arcus.messaging/issues/176 */ - - RegisterClientInformation(JobId, _messageProcessor.EntityPath); - - Logger.LogTrace("Starting Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}'", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); - _messageProcessor.ProcessErrorAsync += ProcessErrorAsync; - _messageProcessor.ProcessMessageAsync += ProcessMessageAsync; - await _messageProcessor.StartProcessingAsync(cancellationToken); - Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' started", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); - } - /// public override async Task StopProcessingMessagesAsync(CancellationToken cancellationToken) { @@ -270,31 +310,16 @@ public override async Task StopProcessingMessagesAsync(CancellationToken cancell return; } - await base.StopProcessingMessagesAsync(cancellationToken); + Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}' closed : {Time}", Settings.ServiceBusEntity, JobId, EntityPath, Namespace, DateTimeOffset.UtcNow); - if (_messageProcessor is null) - { - return; - } - - try - { - Logger.LogTrace("Closing Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}'", Settings.ServiceBusEntity, JobId, EntityPath, Namespace); - _messageProcessor.ProcessMessageAsync -= ProcessMessageAsync; - _messageProcessor.ProcessErrorAsync -= ProcessErrorAsync; - await _messageProcessor.StopProcessingAsync(cancellationToken); + _receiveMessagesCancellation?.Cancel(); + await base.StopProcessingMessagesAsync(cancellationToken); - Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}' closed : {Time}", Settings.ServiceBusEntity, JobId, EntityPath, Namespace, DateTimeOffset.UtcNow); - } - catch (Exception exception) when (exception is not TaskCanceledException && exception is not OperationCanceledException) - { - Logger.LogWarning(exception, "Cannot correctly close the Azure Service Bus message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}': {Message}", JobId, EntityPath, Namespace, exception.Message); - } } - private async Task ProcessErrorAsync(ProcessErrorEventArgs args) + private async Task ProcessErrorAsync(Exception exception, CancellationToken cancellationToken) { - if (args?.Exception is null) + if (exception is null) { Logger.LogWarning("Thrown exception on Azure Service Bus {EntityType} message pump '{JobId}' was null, skipping", Settings.ServiceBusEntity, JobId); return; @@ -302,21 +327,20 @@ private async Task ProcessErrorAsync(ProcessErrorEventArgs args) try { - await HandleReceiveExceptionAsync(args.Exception); + await HandleReceiveExceptionAsync(exception); } finally { - if (args.Exception is UnauthorizedAccessException) + if (exception is UnauthorizedAccessException) { if (Interlocked.Increment(ref _unauthorizedExceptionCount) >= Settings.Options.MaximumUnauthorizedExceptionsBeforeRestart) { Logger.LogTrace("Unable to connect anymore to Azure Service Bus, trying to re-authenticate..."); - await RestartAsync(args.CancellationToken); + await RestartAsync(cancellationToken); } else { - Logger.LogWarning("Unable to connect anymore to Azure Service Bus ({CurrentCount}/{MaxCount})", - _unauthorizedExceptionCount, Settings.Options.MaximumUnauthorizedExceptionsBeforeRestart); + Logger.LogWarning("Unable to connect anymore to Azure Service Bus ({CurrentCount}/{MaxCount})", _unauthorizedExceptionCount, Settings.Options.MaximumUnauthorizedExceptionsBeforeRestart); } } } @@ -354,12 +378,6 @@ public async Task RestartAsync(CancellationToken cancellationToken) /// Indicates that the shutdown process should no longer be graceful. public override async Task StopAsync(CancellationToken cancellationToken) { - if (_messageProcessor != null) - { - await _messageProcessor.StopProcessingAsync(); - await _messageProcessor.CloseAsync(); - } - if (_messageReceiver != null) { await _messageReceiver.CloseAsync(); @@ -376,6 +394,7 @@ public override async Task StopAsync(CancellationToken cancellationToken) await base.StopAsync(cancellationToken); _isHostShuttingDown = true; _loggingScope?.Dispose(); + _receiveMessagesCancellation?.Dispose(); } private async Task DeleteTopicSubscriptionAsync(CancellationToken cancellationToken) @@ -404,20 +423,19 @@ private async Task DeleteTopicSubscriptionAsync(CancellationToken cancellationTo } } - private async Task ProcessMessageAsync(ProcessMessageEventArgs args) + private async Task ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken) { - ServiceBusReceivedMessage message = args?.Message; if (message is null) { Logger.LogWarning("Received message on Azure Service Bus {EntityType} message pump '{JobId}' was null, skipping", Settings.ServiceBusEntity, JobId); - return; + return MessageProcessingResult.Failure("", MessageProcessingError.ProcessingInterrupted, "Cannot process received message as the message is was 'null'"); } if (_isHostShuttingDown) { Logger.LogWarning("Abandoning message with ID '{MessageId}' as the Azure Service Bus {EntityType} message pump '{JobId}' is shutting down", message.MessageId, Settings.ServiceBusEntity, JobId); - await args.AbandonMessageAsync(message); - return; + await _messageReceiver.AbandonMessageAsync(message); + return MessageProcessingResult.Failure(message.MessageId, MessageProcessingError.ProcessingInterrupted, "Cannot process received message as the message pump is shutting down"); } if (string.IsNullOrEmpty(message.CorrelationId)) @@ -425,13 +443,29 @@ private async Task ProcessMessageAsync(ProcessMessageEventArgs args) Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, Settings.ServiceBusEntity, JobId); } - using (MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message)) + using MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message); + AzureServiceBusMessageContext messageContext = message.GetMessageContext(JobId, Settings.ServiceBusEntity); + + MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.CorrelationInfo, cancellationToken); + + if (routingResult.IsSuccessful && Settings.Options.AutoComplete) { - ServiceBusReceiver receiver = args.GetServiceBusReceiver(); - AzureServiceBusMessageContext messageContext = message.GetMessageContext(JobId, Settings.ServiceBusEntity); - - await _messageRouter.RouteMessageAsync(receiver, args.Message, messageContext, correlationResult.CorrelationInfo, args.CancellationToken); + try + { + Logger.LogTrace("Auto-complete message '{MessageId}' (if needed) after processing in Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, Settings.ServiceBusEntity, JobId); + await _messageReceiver.CompleteMessageAsync(message); + } + catch (ServiceBusException exception) when ( + exception.Message.Contains("lock") + && exception.Message.Contains("expired") + && exception.Message.Contains("already") + && exception.Message.Contains("removed")) + { + Logger.LogTrace("Message '{MessageId}' on Azure Service Bus {EntityType} message pump '{JobId}' does not need to be auto-completed, because it was already settled", message.MessageId, Settings.ServiceBusEntity, JobId); + } } + + return routingResult; } private MessageCorrelationResult DetermineMessageCorrelation(ServiceBusReceivedMessage message) diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/Configuration/AzureServiceBusMessagePumpSettings.cs b/src/Arcus.Messaging.Pumps.ServiceBus/Configuration/AzureServiceBusMessagePumpSettings.cs index b10722b6..083bd88f 100644 --- a/src/Arcus.Messaging.Pumps.ServiceBus/Configuration/AzureServiceBusMessagePumpSettings.cs +++ b/src/Arcus.Messaging.Pumps.ServiceBus/Configuration/AzureServiceBusMessagePumpSettings.cs @@ -220,34 +220,6 @@ internal async Task CreateMessageReceiverAsync() return client.CreateReceiver(entityPath, SubscriptionName); } - /// - /// Creates an instance based on the provided settings. - /// - internal async Task CreateMessageProcessorAsync() - { - var options = new ServiceBusClientOptions - { - RetryOptions = { TryTimeout = TimeSpan.FromSeconds(5) } - }; - if (_tokenCredential is null) - { - string rawConnectionString = await GetConnectionStringAsync(); - string entityPath = DetermineEntityPath(rawConnectionString); - - var client = new ServiceBusClient(rawConnectionString, options); - return CreateProcessor(client, entityPath, SubscriptionName); - } - else - { - var client = new ServiceBusClient(FullyQualifiedNamespace, _tokenCredential, options); - - string entityPath = DetermineEntityPath(); - ServiceBusProcessor processor = CreateProcessor(client, entityPath, SubscriptionName); - - return processor; - } - } - private string DetermineEntityPath(string connectionString = null) { if (_tokenCredential is null && !string.IsNullOrWhiteSpace(connectionString)) @@ -276,29 +248,15 @@ private string DetermineEntityPath(string connectionString = null) return EntityName; } - private ServiceBusProcessor CreateProcessor(ServiceBusClient client, string entityName, string subscriptionName) - { - ServiceBusProcessorOptions options = DetermineMessageProcessorOptions(); - - if (string.IsNullOrWhiteSpace(subscriptionName)) - { - return client.CreateProcessor(entityName, options); - } - - return client.CreateProcessor(entityName, subscriptionName, options); - } - - private ServiceBusProcessorOptions DetermineMessageProcessorOptions() + private ServiceBusReceiverOptions DetermineMessageReceiverOptions() { - var messageHandlerOptions = new ServiceBusProcessorOptions(); + var options = new ServiceBusReceiverOptions(); if (Options != null) { - messageHandlerOptions.AutoCompleteMessages = Options.AutoComplete; - messageHandlerOptions.MaxConcurrentCalls = Options.MaxConcurrentCalls; - messageHandlerOptions.PrefetchCount = Options.PrefetchCount; + options.PrefetchCount = Options.PrefetchCount; } - return messageHandlerOptions; + return options; } private async Task GetConnectionStringAsync() diff --git a/src/Arcus.Messaging.Pumps.ServiceBus/Resiliency/CircuitBreakerServiceBusMessageHandler.cs b/src/Arcus.Messaging.Pumps.ServiceBus/Resiliency/CircuitBreakerServiceBusMessageHandler.cs new file mode 100644 index 00000000..f23530db --- /dev/null +++ b/src/Arcus.Messaging.Pumps.ServiceBus/Resiliency/CircuitBreakerServiceBusMessageHandler.cs @@ -0,0 +1,106 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.MessageHandling; +using Arcus.Messaging.Abstractions.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Arcus.Messaging.Pumps.Abstractions.Resiliency; +using Microsoft.Extensions.Logging; + +namespace Arcus.Messaging.Pumps.ServiceBus.Resiliency +{ + /// + /// Represents a template for a message handler that interacts with an unstable dependency system that requires a circuit breaker to prevent overloading the system. + /// + /// The type of the message that this handler can process. + public abstract class CircuitBreakerServiceBusMessageHandler : AzureServiceBusMessageHandler + { + /// + /// Initializes a new instance of the class. + /// + /// The circuit breaker that controls the activation of the message pump. + /// The logger to write diagnostic messages during the processing of the message. + /// Thrown when the or is null. + protected CircuitBreakerServiceBusMessageHandler( + IMessagePumpCircuitBreaker circuitBreaker, + ILogger> logger) + : base(logger) + { + CircuitBreaker = circuitBreaker; + } + + /// + /// Gets the circuit breaker that controls the activation of the message pump. + /// + protected IMessagePumpCircuitBreaker CircuitBreaker { get; } + + /// + /// Process a new message that was received. + /// + /// The message that was received. + /// The context providing more information concerning the processing. + /// The information concerning correlation of telemetry and processes by using a variety of unique identifiers. + /// The token to cancel the processing. + /// + /// Thrown when the , , or the is null. + /// + public override async Task ProcessMessageAsync( + TMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + var options = new MessagePumpCircuitBreakerOptions(); + MessageProcessingResult result = await TryProcessMessageAsync(message, messageContext, correlationInfo, options, cancellationToken); + + if (!result.IsSuccessful) + { + await CircuitBreaker.PauseMessageProcessingAsync(messageContext.JobId, opt => + { + opt.MessageIntervalDuringRecovery = options.MessageIntervalDuringRecovery; + opt.MessageRecoveryPeriod = options.MessageRecoveryPeriod; + }); + + throw result.ProcessingException; + } + } + + private async Task TryProcessMessageAsync( + TMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + MessagePumpCircuitBreakerOptions options, + CancellationToken cancellationToken) + { + try + { + await ProcessMessageAsync(message, messageContext, correlationInfo, options, cancellationToken); + return MessageProcessingResult.Success(messageContext.MessageId); + } + catch (Exception exception) + { + Logger.LogError(exception, "Message Processing failed due to thrown exception: {Message}", exception.Message); + return MessageProcessingResult.Failure(messageContext.MessageId, MessageProcessingError.MatchedHandlerFailed, "Failed to process message due to an exception thrown by the message handler implementation", exception); + } + } + + /// + /// Process a new message that was received. + /// + /// The message that was received. + /// The context providing more information concerning the processing. + /// The information concerning correlation of telemetry and processes by using a variety of unique identifiers. + /// The additional options to manipulate the possible circuit breakage of the message pump for which a message is processed. + /// The token to cancel the processing. + /// + /// Thrown when the , , or the is null. + /// + protected abstract Task ProcessMessageAsync( + TMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + MessagePumpCircuitBreakerOptions options, + CancellationToken cancellationToken); + } +} diff --git a/src/Arcus.Messaging.ServiceBus.Core/Arcus.Messaging.ServiceBus.Core.csproj b/src/Arcus.Messaging.ServiceBus.Core/Arcus.Messaging.ServiceBus.Core.csproj index df634604..49ff6cc3 100644 --- a/src/Arcus.Messaging.ServiceBus.Core/Arcus.Messaging.ServiceBus.Core.csproj +++ b/src/Arcus.Messaging.ServiceBus.Core/Arcus.Messaging.ServiceBus.Core.csproj @@ -29,8 +29,8 @@ - - + + diff --git a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj index 0ffc6f30..c3588479 100644 --- a/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj +++ b/src/Arcus.Messaging.Tests.Integration/Arcus.Messaging.Tests.Integration.csproj @@ -8,20 +8,19 @@ - - + - + diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/TemporaryServiceBusEntity.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/TemporaryServiceBusEntity.cs index d26942e4..ba81d4ac 100644 --- a/src/Arcus.Messaging.Tests.Integration/Fixture/TemporaryServiceBusEntity.cs +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/TemporaryServiceBusEntity.cs @@ -1,19 +1,9 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; using System.Threading.Tasks; using Azure; -using Azure.Identity; -using Azure.Messaging.ServiceBus; using Azure.ResourceManager; using Azure.ResourceManager.ServiceBus; -using Azure.ResourceManager.ServiceBus.Models; -using Microsoft.Azure.EventGrid.Models; using Microsoft.Extensions.Logging; -using Xunit; -using Xunit.Sdk; namespace Arcus.Messaging.Tests.Integration.Fixture { diff --git a/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs b/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs index 3d19ff9f..282a91dd 100644 --- a/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs +++ b/src/Arcus.Messaging.Tests.Integration/Fixture/WorkerOptions.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections; using System.Collections.Generic; using System.Collections.ObjectModel; diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs index ebf8984d..4985b5b2 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/EventHubsMessagePumpTests.cs @@ -2,7 +2,6 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; -using Arcus.EventGrid.Testing.Logging; using Arcus.Messaging.Abstractions; using Arcus.Messaging.Abstractions.EventHubs.MessageHandling; using Arcus.Messaging.Abstractions.MessageHandling; diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceBusMessageProducer.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceBusMessageProducer.cs index 0f20ad22..d7130332 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceBusMessageProducer.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceBusMessageProducer.cs @@ -17,7 +17,7 @@ public class TestServiceBusMessageProducer private readonly ServiceBusConfig _config; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// public TestServiceBusMessageProducer(string entityName, ServiceBusConfig config) { diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceMessageConsumer.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceMessageConsumer.cs index b86c945c..76fd1cf5 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceMessageConsumer.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBus/TestServiceMessageConsumer.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Xunit; +using Xunit.Sdk; namespace Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus { @@ -41,37 +42,44 @@ public static TestServiceMessageConsumer CreateForQueue(string entityName, Servi /// public async Task AssertCompletedMessageAsync(string messageId) { - await using ServiceBusClient client = _config.GetClient(); - await using ServiceBusReceiver receiver = client.CreateReceiver(_entityName); + await Poll.UntilAvailableAsync(async () => + { + await using ServiceBusClient client = _config.GetClient(); + await using ServiceBusReceiver receiver = client.CreateReceiver(_entityName); - IReadOnlyList messages = await receiver.ReceiveMessagesAsync(1, maxWaitTime: TimeSpan.FromSeconds(2)); - Assert.DoesNotContain(messages, msg => msg.MessageId == messageId); + IReadOnlyList messages = await receiver.PeekMessagesAsync(100); + Assert.DoesNotContain(messages, msg => msg.MessageId == messageId); + + }, options => options.FailureMessage = $"Azure Service bus message '{messageId}' did not get completed in time"); } /// /// Tries receiving a single abandoned message on the Azure Service Bus queue. /// /// Thrown when no abandoned messages can be consumed within the configured time-out. - public async Task AssertAbandonMessageAsync(string messageId) + public async Task AssertAbandonMessageAsync(string messageId, bool completeUponReceive = false) { await using ServiceBusClient client = _config.GetClient(); await using ServiceBusReceiver receiver = client.CreateReceiver(_entityName); - ServiceBusReceivedMessage message = + ServiceBusReceivedMessage message = await Poll.Target(() => receiver.ReceiveMessageAsync()) .Until(msg => msg != null && msg.MessageId == messageId && msg.DeliveryCount > 1) .Every(TimeSpan.FromSeconds(1)) .Timeout(TimeSpan.FromMinutes(2)) .FailWith($"cannot receive abandoned message with the message ID: '{messageId}' in time"); - await receiver.CompleteMessageAsync(message); + if (completeUponReceive) + { + await receiver.CompleteMessageAsync(message); + } } /// /// Tries receiving a single dead lettered message on the Azure Service Bus dead letter queue. /// /// Thrown when no dead-lettered messages can be consumed within the configured time-out. - public async Task AssertDeadLetterMessageAsync(string messageId) + public async Task AssertDeadLetterMessageAsync(string messageId, TimeSpan? timeout = null) { var options = new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }; @@ -112,7 +120,7 @@ await Poll.Target(ReceiveMessageAsync) } }) .Every(TimeSpan.FromSeconds(1)) - .Timeout(TimeSpan.FromMinutes(2)) + .Timeout(timeout ?? TimeSpan.FromMinutes(2)) .FailWith($"cannot receive dead-lettered message with message ID: '{messageId}' in time"); } } diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs index 46252e74..8fbed1ff 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ResiliencyTests.cs @@ -1,113 +1,107 @@ using System; +using System.Collections.ObjectModel; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Arcus.Messaging.Abstractions.MessageHandling; +using Arcus.Messaging.Abstractions; using Arcus.Messaging.Abstractions.ServiceBus; using Arcus.Messaging.Pumps.Abstractions; using Arcus.Messaging.Pumps.Abstractions.Resiliency; using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.Pumps.ServiceBus.Resiliency; using Arcus.Messaging.Tests.Core.Events.v1; using Arcus.Messaging.Tests.Core.Messages.v1; using Arcus.Messaging.Tests.Integration.Fixture; -using Arcus.Messaging.Tests.Integration.MessagePump.Fixture; using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; using Arcus.Messaging.Tests.Workers.MessageHandlers; -using Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers; +using Arcus.Testing; using Azure.Messaging.ServiceBus; -using Bogus; +using Azure.Messaging.ServiceBus.Administration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Newtonsoft.Json; +using Microsoft.Extensions.Logging.Abstractions; using Xunit; using static Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus.DiskMessageEventConsumer; +using static Arcus.Messaging.Tests.Integration.MessagePump.TestUnavailableDependencyAzureServiceBusMessageHandler; namespace Arcus.Messaging.Tests.Integration.MessagePump { public partial class ServiceBusMessagePumpTests { - [Fact(Skip = "Currently incomplete in testing as it is not fully validating correct start/stop of message pump")] - public async Task ServiceBusTopicMessagePump_PauseViaCircuitBreaker_RestartsAgainWithOneMessage() + [Fact] + public async Task ServiceBusMessageQueuePump_WithUnavailableDependencySystem_CircuitBreaksUntilDependencyBecomesAvailable() { // Arrange - var options = new WorkerOptions(); - ServiceBusMessage[] messages = GenerateShipmentMessages(3); - TimeSpan recoveryTime = TimeSpan.FromSeconds(10); - TimeSpan messageInterval = TimeSpan.FromSeconds(2); + var messageSink = new OrderMessageSink(); + + ServiceBusMessage messageBeforeBreak = CreateOrderServiceBusMessageForW3C(); + ServiceBusMessage messageAfterBreak = CreateOrderServiceBusMessageForW3C(); + var options = new WorkerOptions(); options.AddXunitTestLogging(_outputWriter) - .AddServiceBusTopicMessagePump( - subscriptionName: "circuit-breaker-" + Guid.NewGuid(), - _ => TopicConnectionString, - opt => opt.TopicSubscription = TopicSubscription.Automatic) - .WithServiceBusMessageHandler( - implementationFactory: provider => new CircuitBreakerAzureServiceBusMessageHandler( - targetMessageIds: messages.Select(m => m.MessageId).ToArray(), - configureOptions: opt => - { - opt.MessageRecoveryPeriod = recoveryTime; - opt.MessageIntervalDuringRecovery = messageInterval; - }, - provider.GetRequiredService())); - - var producer = TestServiceBusMessageProducer.CreateFor(QueueName, _config); + .AddSingleton(messageSink) + .AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName) + .WithServiceBusMessageHandler( + messageContextFilter: ctx => ctx.MessageId == messageBeforeBreak.MessageId + || ctx.MessageId == messageAfterBreak.MessageId); + + var producer = new TestServiceBusMessageProducer(QueueName, _config.GetServiceBus()); await using var worker = await Worker.StartNewAsync(options); // Act - await producer.ProduceAsync(messages); + await producer.ProduceAsync(messageBeforeBreak); // Assert - var handler = GetMessageHandler(worker); - AssertX.RetryAssertUntil(() => - { - DateTimeOffset[] arrivals = handler.GetMessageArrivals(); - Assert.Equal(messages.Length, arrivals.Length); - - _outputWriter.WriteLine("Arrivals: {0}", string.Join(", ", arrivals)); - TimeSpan faultMargin = TimeSpan.FromSeconds(1); - Assert.Collection(arrivals.SkipLast(1).Zip(arrivals.Skip(1)), - dates => AssertDateDiff(dates.First, dates.Second, recoveryTime, recoveryTime.Add(faultMargin)), - dates => AssertDateDiff(dates.First, dates.Second, messageInterval, messageInterval.Add(faultMargin))); + await messageSink.ShouldReceiveOrdersDuringBreakAsync(messageBeforeBreak.MessageId); - }, timeout: TimeSpan.FromMinutes(2), _logger); + await producer.ProduceAsync(messageAfterBreak); + await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); } - private static TMessageHandler GetMessageHandler(Worker worker) + [Fact] + public async Task ServiceBusMessageTopicPump_WithUnavailableDependencySystem_CircuitBreaksUntilDependencyBecomesAvailable() { - return Assert.IsType( - worker.Services.GetRequiredService() - .GetMessageHandlerInstance()); - } + // Arrange + ServiceBusMessage messageBeforeBreak = CreateOrderServiceBusMessageForW3C(); + ServiceBusMessage messageAfterBreak = CreateOrderServiceBusMessageForW3C(); + + var messageSink = new OrderMessageSink(); + await using TemporaryTopicSubscription subscription = await CreateTopicSubscriptionForMessageAsync(messageBeforeBreak, messageAfterBreak); - private static void AssertDateDiff(DateTimeOffset left, DateTimeOffset right, TimeSpan expectedMin, TimeSpan expectedMax) - { - left = new DateTimeOffset(left.Year, left.Month, left.Day, left.Hour, left.Minute, left.Second, 0, left.Offset); - right = new DateTimeOffset(right.Year, right.Month, right.Day, right.Hour, right.Minute, right.Second, 0, right.Offset); + var options = new WorkerOptions(); + options.AddXunitTestLogging(_outputWriter) + .ConfigureSerilog(logging => logging.MinimumLevel.Verbose()) + .AddSingleton(messageSink) + .AddServiceBusTopicMessagePumpUsingManagedIdentity(TopicName, subscription.Name, HostName) + .WithServiceBusMessageHandler(); + + var producer = new TestServiceBusMessageProducer(TopicName, _config.GetServiceBus()); + await using var worker = await Worker.StartNewAsync(options); + + // Act + await producer.ProduceAsync(messageBeforeBreak); - TimeSpan actual = right - left; - Assert.InRange(actual, expectedMin, expectedMax); + // Assert + await messageSink.ShouldReceiveOrdersDuringBreakAsync(messageBeforeBreak.MessageId); + + await producer.ProduceAsync(messageAfterBreak); + await messageSink.ShouldReceiveOrdersAfterBreakAsync(messageAfterBreak.MessageId); } - private static ServiceBusMessage[] GenerateShipmentMessages(int count) + private async Task CreateTopicSubscriptionForMessageAsync(params ServiceBusMessage[] messages) { - var generator = new Faker() - .RuleFor(s => s.Id, f => f.Random.Guid().ToString()) - .RuleFor(s => s.Code, f => f.Random.Int(1, 100)) - .RuleFor(s => s.Date, f => f.Date.RecentOffset()) - .RuleFor(s => s.Description, f => f.Lorem.Sentence()); + var client = new ServiceBusAdministrationClient(NamespaceConnectionString); - return Enumerable.Repeat(generator, count).Select(g => - { - Shipment shipment = g.Generate(); - string json = JsonConvert.SerializeObject(shipment); - return new ServiceBusMessage(json) - { - MessageId = shipment.Id - }; - }).ToArray(); + return await TemporaryTopicSubscription.CreateIfNotExistsAsync( + client, + TopicName, + $"circuit-breaker-{Guid.NewGuid().ToString("N")[..10]}", + _logger, + configureOptions: null, + rule: new CreateRuleOptions("MessageId", new SqlRuleFilter($"sys.messageid in ({string.Join(", ", messages.Select(m => $"'{m.MessageId}'"))})"))); } - [Fact(Skip = "TODO: fixed upon circuit breaker changes")] + [Fact] public async Task ServiceBusMessagePump_PauseViaLifetime_RestartsAgain() { // Arrange @@ -116,7 +110,7 @@ public async Task ServiceBusMessagePump_PauseViaLifetime_RestartsAgain() options.AddXunitTestLogging(_outputWriter) .AddServiceBusTopicMessagePumpUsingManagedIdentity( TopicName, - subscriptionName: Guid.NewGuid().ToString(), + subscriptionName: Guid.NewGuid().ToString(), HostName, configureMessagePump: opt => { @@ -128,9 +122,9 @@ public async Task ServiceBusMessagePump_PauseViaLifetime_RestartsAgain() ServiceBusMessage message = CreateOrderServiceBusMessageForW3C(); - var producer = TestServiceBusMessageProducer.CreateFor(QueueName, _config); + var producer = TestServiceBusMessageProducer.CreateFor(TopicName, _config); await using var worker = await Worker.StartNewAsync(options); - + var lifetime = worker.Services.GetRequiredService(); await lifetime.PauseProcessingMessagesAsync(jobId, TimeSpan.FromSeconds(5), CancellationToken.None); @@ -138,8 +132,193 @@ public async Task ServiceBusMessagePump_PauseViaLifetime_RestartsAgain() await producer.ProduceAsync(message); // Assert - OrderCreatedEventData eventData = await ConsumeOrderCreatedAsync(message.MessageId); + OrderCreatedEventData eventData = await ConsumeOrderCreatedAsync(message.MessageId, TimeSpan.FromSeconds(10)); AssertReceivedOrderEventDataForW3C(message, eventData); } } + + public class TestUnavailableDependencyAzureServiceBusMessageHandler : CircuitBreakerServiceBusMessageHandler + { + public const int RequiredAttempts = 3; + + private readonly OrderMessageSink _messageSink; + + public TestUnavailableDependencyAzureServiceBusMessageHandler( + OrderMessageSink messageSink, + IMessagePumpCircuitBreaker circuitBreaker, + ILogger> logger) : base(circuitBreaker, logger) + { + _messageSink = messageSink; + } + + protected override Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + MessagePumpCircuitBreakerOptions options, + CancellationToken cancellationToken) + { + options.MessageRecoveryPeriod = TimeSpan.FromSeconds(5); + options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1); + + Logger.LogDebug("Process order for the {DeliveryCount}nd time", messageContext.DeliveryCount); + _messageSink.SendOrder(message); + + if (_messageSink.ReceiveOrders().Length < RequiredAttempts) + { + throw new InvalidOperationException( + $"Simulate an unhealthy dependency system! (DeliveryCount: {messageContext.DeliveryCount})"); + } + + return Task.CompletedTask; + } + } + + public class OrderMessageSink + { + private readonly Collection _orders = new(); + + public void SendOrder(Order message) + { + _orders.Add(message); + } + + public Order[] ReceiveOrders() => _orders.ToArray(); + + public async Task ShouldReceiveOrdersDuringBreakAsync(string messageId) + { + TimeSpan _1s = TimeSpan.FromSeconds(1), _1min = TimeSpan.FromMinutes(1); + + await Poll.Target(() => + { + Assert.Equal(RequiredAttempts, _orders.Count(o => o.Id == messageId)); + + }).Every(_1s).Timeout(_1min).FailWith($"message should be retried {RequiredAttempts} times with the help of the circuit breaker"); + } + + public async Task ShouldReceiveOrdersAfterBreakAsync(string messageId) + { + TimeSpan _1s = TimeSpan.FromSeconds(1), _1min = TimeSpan.FromMinutes(1); + + await Poll.Target(() => + { + Assert.Equal(RequiredAttempts + 1, _orders.Count); + Assert.Single(_orders.Where(o => o.Id == messageId)); + + }).Every(_1s).Timeout(_1min).FailWith("pump should continue normal message processing after the message was retried"); + } + } + + /// + /// Represents a temporary Azure Service Bus topic subscription that will be deleted when the instance is disposed. + /// + public class TemporaryTopicSubscription : IAsyncDisposable + { + private readonly ServiceBusAdministrationClient _client; + private readonly string _serviceBusNamespace; + private readonly CreateSubscriptionOptions _options; + private readonly bool _createdByUs; + private readonly ILogger _logger; + + private TemporaryTopicSubscription( + ServiceBusAdministrationClient client, + CreateSubscriptionOptions options, + bool createdByUs, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(options); + + _client = client; + _options = options; + _createdByUs = createdByUs; + _logger = logger; + + Name = _options.SubscriptionName; + } + + /// + /// Gets the name of the Azure Service Bus topic subscription that is possibly created by the test fixture. + /// + public string Name { get; } + + /// + /// Creates a new instance of the which creates a new Azure Service Bus topic subscription if it doesn't exist yet. + /// + /// The administration client to interact with the Azure Service Bus resource where the topic subscription should be created. + /// The name of the Azure Service Bus topic in which the subscription should be created. + /// The name of the subscription in the configured Azure Service Bus topic. + /// The logger to write diagnostic messages during the lifetime of the Azure Service Bus topic subscription. + /// + /// The function to configure the additional options that describes how the Azure Service Bus topic subscription should be created. + /// + /// Thrown when the is null. + /// Thrown when one of the passed arguments is blank. + /// + /// Thrown when the no Azure Service Bus topic exists with the provided + /// in the given namespace where the given points to. + /// + public static async Task CreateIfNotExistsAsync( + ServiceBusAdministrationClient adminClient, + string topicName, + string subscriptionName, + ILogger logger, + Action configureOptions, + CreateRuleOptions rule) + { + ArgumentNullException.ThrowIfNull(adminClient); + + if (string.IsNullOrWhiteSpace(topicName)) + { + throw new ArgumentException( + "Requires a non-blank Azure Service bus topic name to create a temporary topic subscription", nameof(topicName)); + } + + if (string.IsNullOrWhiteSpace(subscriptionName)) + { + throw new ArgumentException( + "Requires a non-blank Azure Service bus topic subscription name to create a temporary topic subscription", nameof(subscriptionName)); + } + + logger ??= NullLogger.Instance; + + var options = new CreateSubscriptionOptions(topicName, subscriptionName); + configureOptions?.Invoke(options); + + + if (!await adminClient.TopicExistsAsync(options.TopicName)) + { + throw new InvalidOperationException( + $"[Test:Setup] cannot create temporary subscription '{options.SubscriptionName}' on Azure Service Bus topic '{options.TopicName}' " + + $"because the topic '{options.TopicName}' does not exists in the provided Azure Service Bus namespace. " + + $"Please make sure to have an available Azure Service Bus topic before using the temporary topic subscription test fixture"); + } + + if (await adminClient.SubscriptionExistsAsync(options.TopicName, options.SubscriptionName)) + { + logger.LogTrace("[Test:Setup] Use already existing Azure Service Bus topic subscription '{SubscriptionName}' in '{TopicName}'", options.SubscriptionName, options.TopicName); + return new TemporaryTopicSubscription(adminClient, options, createdByUs: false, logger); + } + + logger.LogTrace("[Test:Setup] Create new Azure Service Bus topic subscription '{SubscriptionName}' in '{TopicName}'", options.SubscriptionName, options.TopicName); + await adminClient.CreateSubscriptionAsync(options, rule); + + return new TemporaryTopicSubscription(adminClient, options, createdByUs: true, logger); + } + + /// + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously. + /// + /// A task that represents the asynchronous dispose operation. + public async ValueTask DisposeAsync() + { + GC.SuppressFinalize(this); + + if (_createdByUs && await _client.SubscriptionExistsAsync(_options.TopicName, _options.SubscriptionName)) + { + _logger.LogTrace("[Test:Teardown] Delete Azure Service Bus topic subscription '{SubscriptionName}' in '{Namespace}/{TopicName}'", _options.SubscriptionName, _serviceBusNamespace, _options.TopicName); + await _client.DeleteSubscriptionAsync(_options.TopicName, _options.SubscriptionName); + } + } + } } diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ServiceBusTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ServiceBusTests.cs index afcf52bb..42603ca0 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ServiceBusTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePump.ServiceBusTests.cs @@ -1,16 +1,22 @@ -using System; -using System.Threading.Tasks; +using System; + using System.Linq; + using System.Threading.Tasks; using Arcus.Messaging.Abstractions.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; using Arcus.Messaging.Pumps.ServiceBus; +using Arcus.Messaging.Tests.Core.Events.v1; using Arcus.Messaging.Tests.Core.Messages.v1; using Arcus.Messaging.Tests.Integration.Fixture; using Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus; using Arcus.Messaging.Tests.Workers.MessageHandlers; +using Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers; using Azure; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Xunit; +using static Arcus.Messaging.Tests.Integration.MessagePump.ServiceBus.DiskMessageEventConsumer; using static Microsoft.Extensions.Logging.ServiceBusEntityType; namespace Arcus.Messaging.Tests.Integration.MessagePump @@ -133,6 +139,252 @@ await TestServiceBusMessageHandlingAsync(options, Queue, message, async () => }); } + [Fact] + public async Task ServiceBusQueueMessagePumpWithAutoComplete_WhenNoMessageHandlerRegistered_ThenMessageShouldBeDeadLettered() + { + // Arrange + var options = new WorkerOptions(); + options.AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName, configureMessagePump: opt => opt.AutoComplete = true); + + ServiceBusMessage message = CreateOrderServiceBusMessageForW3C(); + + // Act + await TestServiceBusMessageHandlingAsync(options, Queue, message, async () => + { + // Assert + TestServiceMessageConsumer consumer = CreateQueueConsumer(); + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusQueueMessagePumpWithAutoComplete_WhenNoMessageHandlerAbleToHandle_ThenMessageShouldBeDeadLettered() + { + // Arrange + var options = new WorkerOptions(); + options.AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName, configureMessagePump: opt => opt.AutoComplete = true) + .WithServiceBusMessageHandler(); + + ServiceBusMessage message = CreateOrderServiceBusMessageForW3C(); + + // Act + await TestServiceBusMessageHandlingAsync(options, Queue, message, async () => + { + // Assert + TestServiceMessageConsumer consumer = CreateQueueConsumer(); + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusQueueMessagePumpWithAutoComplete_PublishServiceBusMessage_MessageSuccessfullyCompleted() + { + // Arrange + var options = new WorkerOptions(); + options.AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName, configureMessagePump: opt => opt.AutoComplete = true) + .WithServiceBusMessageHandler(); + + ServiceBusMessage message = CreateOrderServiceBusMessageForW3C(); + + // Act + await TestServiceBusMessageHandlingAsync(options, Queue, message, async () => + { + // Assert + TestServiceMessageConsumer consumer = CreateQueueConsumer(); + await consumer.AssertCompletedMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusTopicMessagePumpWithMultipleMessages_PublishesServiceBusMessages_AllMessagesSuccessfullyHandled() + { + // Arrange + var options = new WorkerOptions(); + options.AddXunitTestLogging(_outputWriter); + options.AddServiceBusTopicMessagePumpUsingManagedIdentity(TopicName, HostName) + .WithServiceBusMessageHandler(); + + ServiceBusMessage[] messages = + Bogus.Make(50, () => CreateOrderServiceBusMessageForW3C()).ToArray(); + + await using var worker = await Worker.StartNewAsync(options); + var producer = TestServiceBusMessageProducer.CreateFor(TopicName, _config); + + // Act + await producer.ProduceAsync(messages); + + // Assert + foreach (ServiceBusMessage msg in messages) + { + OrderCreatedEventData eventData = await ConsumeOrderCreatedAsync(msg.MessageId); + AssertReceivedOrderEventDataForW3C(msg, eventData); + } + } + + [Fact] + public async Task ServiceBusMessagePump_WithServiceBusDeadLetterDuringProcessing_ThenMessageShouldBeDeadLettered() + { + await TestServiceBusMessageHandlingAsync(pump => + { + pump.WithServiceBusMessageHandler(context => context.Properties["Topic"].ToString() == "Customers") + .WithServiceBusMessageHandler() + .WithMessageHandler((AzureServiceBusMessageContext _) => false); + }, + async (message, consumer) => + { + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WithServiceBusDeadLetterOnFallback_ThenMessageShouldBeDeadLettered() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler((AzureServiceBusMessageContext _) => true) + .WithServiceBusFallbackMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WithServiceBusAbandonInProcessing_ThenMessageShouldBeAbandoned() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler((AzureServiceBusMessageContext _) => false) + .WithServiceBusMessageHandler((AzureServiceBusMessageContext _) => true); + }, + async (message, consumer) => + { + await consumer.AssertAbandonMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WithServiceBusAbandonOnFallback_ThenMessageSuccessShouldBeAbandoned() + { + await TestServiceBusMessageHandlingAsync(pump => + { + pump.WithServiceBusMessageHandler() + .WithServiceBusFallbackMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertAbandonMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePumpWithAutoComplete_WithMatchedMessageHandler_ThenMessageShouldBeCompleted() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertCompletedMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePumpWithAutoComplete_WhenNoMessageHandlerRegistered_ThenMessageShouldBeDeadLettered() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + }, + async (message, consumer) => + { + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WhenMessageHandlerIsSelectedButFailsToProcess_ThenMessageShouldBeAbandonedUntilDeadLettered() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertAbandonMessageAsync(message.MessageId); + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WhenMessageHandlerIsNotSelectedWithoutFallback_ThenMessageShouldBeDeadLettered() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WhenFallbackMessageHandlerSelectedSucceedsProcessing_ThenMessageShouldBeCompleted() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler() + .WithServiceBusFallbackMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertCompletedMessageAsync(message.MessageId); + }); + } + + [Fact] + public async Task ServiceBusMessagePump_WhenFallbackMessageHandlerSelectedButFailsToProcess_ThenMessageShouldBeAbandonedUntilDeadLettered() + { + await TestServiceBusMessageHandlingAsync( + pump => + { + pump.WithServiceBusMessageHandler() + .WithServiceBusFallbackMessageHandler(); + }, + async (message, consumer) => + { + await consumer.AssertAbandonMessageAsync(message.MessageId); + await consumer.AssertDeadLetterMessageAsync(message.MessageId); + }); + } + + private async Task TestServiceBusMessageHandlingAsync( + Action configurePump, + Func assertion) + { + var options = new WorkerOptions(); + ServiceBusMessageHandlerCollection collection = options.AddServiceBusQueueMessagePumpUsingManagedIdentity(QueueName, HostName); + + configurePump(collection); + + ServiceBusMessage message = CreateOrderServiceBusMessageForW3C(); + + await TestServiceBusMessageHandlingAsync(options, Queue, message, async () => + { + TestServiceMessageConsumer consumer = CreateQueueConsumer(); + await assertion(message, consumer); + }); + } + private TestServiceMessageConsumer CreateQueueConsumer() { var consumer = TestServiceMessageConsumer.CreateForQueue(QueueName, _serviceBusConfig, _logger); @@ -181,4 +433,4 @@ await TestServiceBusMessageHandlingAsync(Topic, options => }); } } -} +} \ No newline at end of file diff --git a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs index 55bf56b2..8d3f04ba 100644 --- a/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs +++ b/src/Arcus.Messaging.Tests.Integration/MessagePump/ServiceBusMessagePumpTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Linq; using System.Runtime.CompilerServices; using System.Text; @@ -16,6 +16,7 @@ using Arcus.Messaging.Tests.Workers.MessageHandlers; using Arcus.Testing; using Azure.Messaging.ServiceBus; +using Bogus; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -38,6 +39,8 @@ public partial class ServiceBusMessagePumpTests : IClassFixture /// Initializes a new instance of the class. /// diff --git a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue.csproj b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue.csproj index b862df1c..b1fc95ba 100644 --- a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue.csproj +++ b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Queue.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic.csproj b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic.csproj index 5da4a69f..f3bde973 100644 --- a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic.csproj +++ b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic.csproj @@ -12,9 +12,9 @@ - - - + + + diff --git a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/OrderProcessingFunction.cs b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/OrderProcessingFunction.cs index bfb72a2b..1ef4dfca 100644 --- a/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/OrderProcessingFunction.cs +++ b/src/Arcus.Messaging.Tests.Runtimes.AzureFunction.ServiceBus.Topic/OrderProcessingFunction.cs @@ -1,4 +1,3 @@ -using System.Text.Json; using Arcus.Messaging.Abstractions; using Arcus.Messaging.Abstractions.ServiceBus; using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; @@ -25,10 +24,9 @@ public OrderProcessingFunction( [Function("order-processing")] public async Task Run( - [ServiceBusTrigger("docker-az-func-topic", "TestSubscription", Connection = "ARCUS_SERVICEBUS_CONNECTIONSTRING")] byte[] messageBody, + [ServiceBusTrigger("docker-az-func-topic", "TestSubscription", Connection = "ARCUS_SERVICEBUS_CONNECTIONSTRING")] ServiceBusReceivedMessage message, FunctionContext executionContext) { - ServiceBusReceivedMessage message = ConvertToServiceBusMessage(messageBody, executionContext); _logger.LogInformation("C# ServiceBus topic trigger function processed message: {MessageId}", message.MessageId); AzureServiceBusMessageContext messageContext = message.GetMessageContext(_jobId); @@ -37,23 +35,5 @@ public async Task Run( await _messageRouter.RouteMessageAsync(message, messageContext, result.CorrelationInfo, CancellationToken.None); } } - - private static ServiceBusReceivedMessage ConvertToServiceBusMessage(byte[] messageBody, FunctionContext context) - { - var applicationProperties = new Dictionary(); - if (context.BindingContext.BindingData.TryGetValue("ApplicationProperties", out object applicationPropertiesObj)) - { - var json = applicationPropertiesObj.ToString(); - applicationProperties = JsonSerializer.Deserialize>(json); - } - - var message = ServiceBusModelFactory.ServiceBusReceivedMessage( - body: BinaryData.FromBytes(messageBody), - messageId: context.BindingContext.BindingData["MessageId"]?.ToString(), - correlationId: context.BindingContext.BindingData["CorrelationId"]?.ToString(), - properties: applicationProperties); - - return message; - } } } diff --git a/src/Arcus.Messaging.Tests.Unit/Extensions/ObjectExtensions.cs b/src/Arcus.Messaging.Tests.Unit/Extensions/ObjectExtensions.cs index 5f6afdfc..44a644fd 100644 --- a/src/Arcus.Messaging.Tests.Unit/Extensions/ObjectExtensions.cs +++ b/src/Arcus.Messaging.Tests.Unit/Extensions/ObjectExtensions.cs @@ -34,33 +34,12 @@ public static ServiceBusReceivedMessage AsServiceBusReceivedMessage( string serializedMessageBody = JsonConvert.SerializeObject(messageBody); byte[] rawMessage = Encoding.UTF8.GetBytes(serializedMessageBody); - var amqp = new AmqpAnnotatedMessage(new AmqpMessageBody(new[] {new ReadOnlyMemory(rawMessage)})); - amqp.Header.DeliveryCount = BogusGenerator.Random.UInt(); - - if (operationId is null) - { - amqp.Properties.CorrelationId = new AmqpMessageId(); - } - else - { - amqp.Properties.CorrelationId = new AmqpMessageId(operationId); - } - - if (applicationProperties != null) - { - foreach (KeyValuePair applicationProperty in applicationProperties) - { - amqp.ApplicationProperties[applicationProperty.Key] = applicationProperty.Value; - } - } - var serviceBusMessage = (ServiceBusReceivedMessage) Activator.CreateInstance( - type: typeof(ServiceBusReceivedMessage), - bindingAttr: BindingFlags.NonPublic | BindingFlags.Instance, - binder: null, - args: new object[] {amqp}, - culture: null, - activationAttributes: null); + var serviceBusMessage = ServiceBusModelFactory.ServiceBusReceivedMessage( + body: BinaryData.FromBytes(rawMessage), + messageId: BogusGenerator.Random.Guid().ToString(), + correlationId: operationId, + properties: applicationProperties); return serviceBusMessage; } diff --git a/src/Arcus.Messaging.Tests.Unit/MessageHandling/ServiceBus/AzureServiceBusMessageRouterTests.cs b/src/Arcus.Messaging.Tests.Unit/MessageHandling/ServiceBus/AzureServiceBusMessageRouterTests.cs index 3f6b5b37..07791420 100644 --- a/src/Arcus.Messaging.Tests.Unit/MessageHandling/ServiceBus/AzureServiceBusMessageRouterTests.cs +++ b/src/Arcus.Messaging.Tests.Unit/MessageHandling/ServiceBus/AzureServiceBusMessageRouterTests.cs @@ -19,6 +19,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; +using Moq; using Newtonsoft.Json; using Xunit; using Order = Arcus.Messaging.Tests.Core.Messages.v1.Order; @@ -52,38 +53,15 @@ public async Task RouteMessage_WithDifferentMessageContext_SucceedsWithSameJobId await router.RouteMessageAsync(message, context, correlationInfo, CancellationToken.None); } - [Fact] - public async Task RouteMessage_WithDifferentMessageContext_FailsWithDifferentJobId() - { - // Arrange - var services = new ServiceCollection(); - ServiceBusMessageHandlerCollection collection = services.AddServiceBusMessageRouting(); - collection.JobId = Guid.NewGuid().ToString(); - - // Act - collection.WithServiceBusMessageHandler(); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var router = provider.GetRequiredService(); - - var order = OrderGenerator.Generate(); - var message = ServiceBusModelFactory.ServiceBusReceivedMessage(BinaryData.FromObjectAsJson(order), messageId: "message-id"); - AzureServiceBusMessageContext context = message.GetMessageContext("other-job-id"); - MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo(); - - await Assert.ThrowsAsync(() => router.RouteMessageAsync(message, context, correlationInfo, CancellationToken.None)); - } - [Fact] public async Task RouteMessage_WithMultipleFallbackHandlers_UsesCorrectHandlerByJobId() { // Arrange var services = new ServiceCollection(); - ServiceBusMessageHandlerCollection collection1 = services.AddServiceBusMessageRouting(); + ServiceBusMessageHandlerCollection collection1 = services.AddServiceBusMessageRouting().WithServiceBusMessageHandler(); collection1.JobId = Guid.NewGuid().ToString(); - ServiceBusMessageHandlerCollection collection2 = services.AddServiceBusMessageRouting(); + ServiceBusMessageHandlerCollection collection2 = services.AddServiceBusMessageRouting().WithServiceBusMessageHandler(); collection2.JobId = Guid.NewGuid().ToString(); var handler1 = new PassThruServiceBusFallbackMessageHandler(); @@ -121,46 +99,19 @@ public async Task RouteMessage_WithoutFallbackWithFailingButMatchingMessageHandl IServiceProvider provider = services.BuildServiceProvider(); var router = provider.GetRequiredService(); + var receiver = new Mock(); var order = OrderGenerator.Generate(); var message = ServiceBusModelFactory.ServiceBusReceivedMessage(BinaryData.FromObjectAsJson(order), messageId: "message-id"); var context = AzureServiceBusMessageContextFactory.Generate(); var correlationInfo = message.GetCorrelationInfo(); // Act / Assert - await router.RouteMessageAsync(message, context, correlationInfo, CancellationToken.None); + await router.RouteMessageAsync(receiver.Object, message, context, correlationInfo, CancellationToken.None); // Assert Assert.True(sabotageHandler.IsProcessed, "sabotage message handler should be tried"); } - [Fact] - public async Task RouteMessage_WithoutFallbackWithFailingButMismatchingMessageHandler_FailsBecauseNoFallback() - { - // Arrange - var services = new ServiceCollection(); - var sabotageHandler = new OrdersSabotageAzureServiceBusMessageHandler(); - - services.AddServiceBusMessageRouting() - .WithServiceBusMessageHandler() - .WithServiceBusMessageHandler(serviceProvider => sabotageHandler); - - IServiceProvider provider = services.BuildServiceProvider(); - var router = provider.GetRequiredService(); - - var orderV2 = OrderV2Generator.Generate(); - var message = ServiceBusModelFactory.ServiceBusReceivedMessage(BinaryData.FromObjectAsJson(orderV2), messageId: "message-id"); - var context = AzureServiceBusMessageContextFactory.Generate(); - var correlationInfo = message.GetCorrelationInfo(); - - // Act / Assert - var exception = await Assert.ThrowsAsync(() => router.RouteMessageAsync(message, context, correlationInfo, CancellationToken.None)); - - // Assert - Assert.Contains("none of the registered", exception.Message, StringComparison.OrdinalIgnoreCase); - Assert.Contains("fallback", exception.Message, StringComparison.OrdinalIgnoreCase); - Assert.False(sabotageHandler.IsProcessed, "sabotage message handler should not be tried"); - } - [Fact] public async Task RouteMessage_WithGeneralRouting_GoesThroughRegisteredMessageHandler() { @@ -520,31 +471,6 @@ await router.RouteMessageAsync(message, Assert.Equal(messageCount, spyOrderV2MessageHandler.ProcessedMessages.Length); } - [Fact] - public async Task RouteMessage_WithMessageHandlerWithDifferentJobId_Fails() - { - // Arrange - var services = new ServiceCollection(); - var collection = new ServiceBusMessageHandlerCollection(services) { JobId = Guid.NewGuid().ToString() }; - collection.WithServiceBusMessageHandler(); - - // Act - services.AddServiceBusMessageRouting(); - - // Assert - IServiceProvider provider = services.BuildServiceProvider(); - var router = provider.GetRequiredService(); - - Order order = OrderGenerator.Generate(); - var message = ServiceBusModelFactory.ServiceBusReceivedMessage(BinaryData.FromObjectAsJson(order), messageId: Guid.NewGuid().ToString()); - AzureServiceBusMessageContext context = message.GetMessageContext(jobId: Guid.NewGuid().ToString()); - var correlationInfo = new MessageCorrelationInfo("operation-id", "transaction-id"); - - var exception = await Assert.ThrowsAsync( - () => router.RouteMessageAsync(message, context, correlationInfo, CancellationToken.None)); - Assert.Contains("because none", exception.Message); - } - [Fact] public void CreateWithoutOptionsAndLogger_WithoutServiceProvider_Fails() { diff --git a/src/Arcus.Messaging.Tests.Unit/MessagePump/DefaultMessagePumpCircuitBreakerTests.cs b/src/Arcus.Messaging.Tests.Unit/MessagePump/DefaultMessagePumpCircuitBreakerTests.cs index b00ed073..4e9cf1b0 100644 --- a/src/Arcus.Messaging.Tests.Unit/MessagePump/DefaultMessagePumpCircuitBreakerTests.cs +++ b/src/Arcus.Messaging.Tests.Unit/MessagePump/DefaultMessagePumpCircuitBreakerTests.cs @@ -44,7 +44,7 @@ public async Task PauseMessagePump_WithManyRegisteredMessagePump_Fails() { // Arrange var services = new ServiceCollection(); - services.AddMessagePump(p => (Pumps.Abstractions.MessagePump) new TestMessagePump("same-job-id", Mock.Of(), p, _logger)); + services.AddMessagePump(p => (Pumps.Abstractions.MessagePump)new TestMessagePump("same-job-id", Mock.Of(), p, _logger)); services.AddMessagePump(p => new TestMessagePump("same-job-id", Mock.Of(), p, _logger)); IServiceProvider provider = services.BuildServiceProvider(); diff --git a/src/Arcus.Messaging.Tests.Unit/ServiceBus/AzureServiceBusSystemPropertiesTests.cs b/src/Arcus.Messaging.Tests.Unit/ServiceBus/AzureServiceBusSystemPropertiesTests.cs index dbc3477a..60030fd3 100644 --- a/src/Arcus.Messaging.Tests.Unit/ServiceBus/AzureServiceBusSystemPropertiesTests.cs +++ b/src/Arcus.Messaging.Tests.Unit/ServiceBus/AzureServiceBusSystemPropertiesTests.cs @@ -16,7 +16,7 @@ namespace Arcus.Messaging.Tests.Unit.ServiceBus public class AzureServiceBusSystemPropertiesTests { private static readonly Faker BogusGenerator = new Faker(); - + [Fact] public void CreateProperties_WithoutServiceBusMessage_Fails() { @@ -33,14 +33,14 @@ public void CreateProperties_FromMessage_AssignsDeliveryCountCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.Header.DeliveryCount = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert - Assert.Equal((int) expected, systemProperties.DeliveryCount); + Assert.Equal((int)expected, systemProperties.DeliveryCount); } - + [Fact] public void CreateProperties_FromMessage_AssignsContentTypeCorrectly() { @@ -50,14 +50,14 @@ public void CreateProperties_FromMessage_AssignsContentTypeCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.Properties.ContentType = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.ContentType); } - + [Fact] public void CreateProperties_FromMessage_AssignsDeadLetterSourceCorrectly() { @@ -67,14 +67,14 @@ public void CreateProperties_FromMessage_AssignsDeadLetterSourceCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-deadletter-source"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.DeadLetterSource); } - + [Fact] public void CreateProperties_FromMessage_AssignsEnqueuedSequenceNumberCorrectly() { @@ -84,14 +84,14 @@ public void CreateProperties_FromMessage_AssignsEnqueuedSequenceNumberCorrectly( AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-enqueue-sequence-number"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.EnqueuedSequenceNumber); } - + [Fact] public void CreateProperties_FromMessage_AssignsIsReceivedCorrectly() { @@ -101,14 +101,14 @@ public void CreateProperties_FromMessage_AssignsIsReceivedCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-enqueue-sequence-number"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.True(systemProperties.IsReceived); } - + [Fact] public void CreateProperties_FromMessage_AssignsEnqueuedTimeCorrectly() { @@ -118,14 +118,14 @@ public void CreateProperties_FromMessage_AssignsEnqueuedTimeCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-enqueued-time"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.EnqueuedTime); } - + [Theory] [InlineData(null)] [InlineData("465924F2-D386-407F-826A-573144085682")] @@ -139,12 +139,12 @@ public void CreateProperties_FromMessage_AssignsLockTokenCorrectly(string lockTo // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, Guid.Parse(systemProperties.LockToken)); - Assert.Equal(expected != null, systemProperties.IsLockTokenSet); + Assert.Equal(lockToken != null, systemProperties.IsLockTokenSet); } - + [Fact] public void CreateProperties_FromMessage_AssignsLockedUntilCorrectly() { @@ -154,14 +154,14 @@ public void CreateProperties_FromMessage_AssignsLockedUntilCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-locked-until"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.LockedUntil); } - + [Fact] public void CreateProperties_FromMessage_AssignsSequenceNumberCorrectly() { @@ -171,10 +171,10 @@ public void CreateProperties_FromMessage_AssignsSequenceNumberCorrectly() AmqpAnnotatedMessage amqpMessage = CreateAmqpMessage(); amqpMessage.MessageAnnotations["x-opt-sequence-number"] = expected; ServiceBusReceivedMessage message = CreateServiceBusReceivedMessage(amqpMessage); - + // Act var systemProperties = AzureServiceBusSystemProperties.CreateFrom(message); - + // Assert Assert.Equal(expected, systemProperties.SequenceNumber); Assert.Equal(expected > -1, systemProperties.IsReceived); @@ -182,6 +182,11 @@ public void CreateProperties_FromMessage_AssignsSequenceNumberCorrectly() private static void SetLockToken(ServiceBusReceivedMessage message, Guid? expected) { + if (expected == null || expected == Guid.Empty) + { + return; + } + PropertyInfo lockTokenGuid = message.GetType().GetProperty("LockTokenGuid", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(lockTokenGuid); lockTokenGuid.SetValue(message, expected); @@ -193,22 +198,22 @@ private static AmqpAnnotatedMessage CreateAmqpMessage() string json = JsonConvert.SerializeObject(order); byte[] bytes = Encoding.UTF8.GetBytes(json); - var message = new AmqpAnnotatedMessage(new AmqpMessageBody(new[] {new ReadOnlyMemory(bytes)})); + var message = new AmqpAnnotatedMessage(new AmqpMessageBody(new[] { new ReadOnlyMemory(bytes) })); message.Header.DeliveryCount = BogusGenerator.Random.UInt(); - + return message; } private static ServiceBusReceivedMessage CreateServiceBusReceivedMessage(AmqpAnnotatedMessage amqpMessage) { - var serviceBusMessage = (ServiceBusReceivedMessage) Activator.CreateInstance( + var serviceBusMessage = (ServiceBusReceivedMessage)Activator.CreateInstance( type: typeof(ServiceBusReceivedMessage), bindingAttr: BindingFlags.NonPublic | BindingFlags.Instance, binder: null, - args: new object[] {amqpMessage}, + args: new object[] { amqpMessage }, culture: null, activationAttributes: null); - + return serviceBusMessage; } } diff --git a/src/Arcus.Messaging.Tests.Unit/ServiceBus/ObjectExtensionsTests.cs b/src/Arcus.Messaging.Tests.Unit/ServiceBus/ObjectExtensionsTests.cs index bd3683b9..221cc7ac 100644 --- a/src/Arcus.Messaging.Tests.Unit/ServiceBus/ObjectExtensionsTests.cs +++ b/src/Arcus.Messaging.Tests.Unit/ServiceBus/ObjectExtensionsTests.cs @@ -6,7 +6,6 @@ using Arcus.Messaging.Tests.Core.Generators; using Arcus.Messaging.Tests.Core.Messages.v1; using Azure.Messaging.ServiceBus; -using Microsoft.Azure.ServiceBus; using Newtonsoft.Json; using Xunit; @@ -29,7 +28,7 @@ public void WrapInServiceBusMessage_BasicWithoutOptions_ReturnsValidServiceBusMe // Assert Assert.NotNull(serviceBusMessage); - Assert.Empty(serviceBusMessage.CorrelationId); + Assert.Null(serviceBusMessage.CorrelationId); IDictionary userProperties = serviceBusMessage.ApplicationProperties; Assert.True(userProperties.ContainsKey(PropertyNames.ContentType)); Assert.True(userProperties.ContainsKey(PropertyNames.Encoding)); @@ -77,7 +76,7 @@ public void WrapInServiceBusMessage_BasicWithTransactionId_ReturnsValidServiceBu // Assert Assert.NotNull(serviceBusMessage); - Assert.Empty(serviceBusMessage.CorrelationId); + Assert.Null(serviceBusMessage.CorrelationId); IDictionary userProperties = serviceBusMessage.ApplicationProperties; Assert.True(userProperties.ContainsKey(PropertyNames.ContentType)); Assert.True(userProperties.ContainsKey(PropertyNames.Encoding)); @@ -101,7 +100,7 @@ public void WrapInServiceBusMessage_BasicWithEncoding_ReturnsValidServiceBusMess // Assert Assert.NotNull(serviceBusMessage); - Assert.Empty(serviceBusMessage.CorrelationId); + Assert.Null(serviceBusMessage.CorrelationId); IDictionary userProperties = serviceBusMessage.ApplicationProperties; Assert.True(userProperties.ContainsKey(PropertyNames.ContentType)); Assert.True(userProperties.ContainsKey(PropertyNames.Encoding)); diff --git a/src/Arcus.Messaging.Tests.Unit/ServiceBus/ServiceBusMessageBuilderTests.cs b/src/Arcus.Messaging.Tests.Unit/ServiceBus/ServiceBusMessageBuilderTests.cs index f157c4f5..fa6769c4 100644 --- a/src/Arcus.Messaging.Tests.Unit/ServiceBus/ServiceBusMessageBuilderTests.cs +++ b/src/Arcus.Messaging.Tests.Unit/ServiceBus/ServiceBusMessageBuilderTests.cs @@ -218,7 +218,7 @@ public void Create_WithoutOperationId_Succeeds() // Assert ServiceBusMessage message = builder.Build(); AssertEqualOrder(expected, message); - Assert.Empty(message.CorrelationId); + Assert.Null(message.CorrelationId); } [Theory] @@ -235,7 +235,7 @@ public void CreateWithPropertyName_WithoutOperationId_Succeeds(string operationI // Assert ServiceBusMessage message = builder.Build(); AssertEqualOrder(expected, message); - Assert.Empty(message.CorrelationId); + Assert.Null(message.CorrelationId); } [Fact] diff --git a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj index 1a32467f..367527e1 100644 --- a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj +++ b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/Arcus.Messaging.Tests.Workers.EventHubs.Core.csproj @@ -6,8 +6,6 @@ - - diff --git a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/WriteSensorToDiskEventHubsMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/WriteSensorToDiskEventHubsMessageHandler.cs index e1ac0fb4..10d9bf40 100644 --- a/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/WriteSensorToDiskEventHubsMessageHandler.cs +++ b/src/Arcus.Messaging.Tests.Workers.EventHubs.Core/MessageHandlers/WriteSensorToDiskEventHubsMessageHandler.cs @@ -55,7 +55,7 @@ private async Task PublishReadingAsync( MessageCorrelationInfo correlationInfo, CancellationToken cancellationToken) { - _logger.LogTrace("Write order v1 message to disk: {MessageId}", message.SensorId); + _logger.LogTrace("Write sensor reading message to disk: {MessageId}", message.SensorId); string fileName = message.SensorId + ".json"; string dirPath = Directory.GetCurrentDirectory(); diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj index a8aac47f..046e3267 100644 --- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/Arcus.Messaging.Tests.Workers.ServiceBus.csproj @@ -6,8 +6,6 @@ - - diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrdersAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrdersAzureServiceBusMessageHandler.cs index 108015ce..252111fa 100644 --- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrdersAzureServiceBusMessageHandler.cs +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/OrdersAzureServiceBusMessageHandler.cs @@ -1,9 +1,12 @@ -using System.Threading; +using System.IO; +using System.Text.Json; +using System.Threading; using System.Threading.Tasks; using Arcus.EventGrid.Publishing.Interfaces; using Arcus.Messaging.Abstractions; using Arcus.Messaging.Abstractions.ServiceBus; using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Arcus.Messaging.Tests.Core.Events.v1; using Arcus.Messaging.Tests.Core.Messages.v1; using Azure.Messaging.EventGrid; using GuardNet; @@ -50,6 +53,18 @@ public async Task ProcessMessageAsync( { EnsureSameCorrelation(correlationInfo); await _eventGridPublisher.PublishOrderAsync(order, correlationInfo, _logger); + + string filePath = Path.Combine(Directory.GetCurrentDirectory(), $"{correlationInfo.TransactionId}.json"); + + var eventData = new OrderCreatedEventData( + order.Id, + order.Amount, + order.ArticleNumber, + $"{order.Customer.FirstName} {order.Customer.LastName}", + correlationInfo); + string json = JsonSerializer.Serialize(eventData); + + await File.WriteAllTextAsync(filePath, json, cancellationToken); } private void EnsureSameCorrelation(MessageCorrelationInfo correlationInfo) diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/PassThruAzureServiceBusFallbackMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/PassThruAzureServiceBusFallbackMessageHandler.cs new file mode 100644 index 00000000..ee320004 --- /dev/null +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/PassThruAzureServiceBusFallbackMessageHandler.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Azure.Messaging.ServiceBus; + +namespace Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers +{ + public class PassThruAzureServiceBusFallbackMessageHandler : IAzureServiceBusFallbackMessageHandler + { + public Task ProcessMessageAsync( + ServiceBusReceivedMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/SabotageAzureServiceBusFallbackMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/SabotageAzureServiceBusFallbackMessageHandler.cs new file mode 100644 index 00000000..19e997fe --- /dev/null +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/SabotageAzureServiceBusFallbackMessageHandler.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.MessageHandling; +using Arcus.Messaging.Abstractions.ServiceBus; +using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; +using Arcus.Messaging.Tests.Core.Messages.v1; +using Azure.Messaging.ServiceBus; +using InvalidOperationException = System.InvalidOperationException; + +namespace Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers +{ + public class SabotageAzureServiceBusFallbackMessageHandler : + IAzureServiceBusFallbackMessageHandler, + IFallbackMessageHandler, + IFallbackMessageHandler + { + /// + /// Initializes a new instance of the class. + /// + public SabotageAzureServiceBusFallbackMessageHandler() + { + + } + + public Task ProcessMessageAsync( + ServiceBusReceivedMessage message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + throw new InvalidOperationException("Sabotage fallback!"); + } + + public Task ProcessMessageAsync( + Order message, + AzureServiceBusMessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + throw new InvalidOperationException("Sabotage fallback!"); + } + + public Task ProcessMessageAsync( + string message, + MessageContext messageContext, + MessageCorrelationInfo correlationInfo, + CancellationToken cancellationToken) + { + throw new InvalidOperationException("Sabotage fallback!"); + } + } +} diff --git a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/CircuitBreakerAzureServiceBusMessageHandler.cs b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/TestCircuitBreakerAzureServiceBusMessageHandler.cs similarity index 60% rename from src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/CircuitBreakerAzureServiceBusMessageHandler.cs rename to src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/TestCircuitBreakerAzureServiceBusMessageHandler.cs index 723d3135..a343d4d6 100644 --- a/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/CircuitBreakerAzureServiceBusMessageHandler.cs +++ b/src/Arcus.Messaging.Tests.Workers.ServiceBus/MessageHandlers/TestCircuitBreakerAzureServiceBusMessageHandler.cs @@ -5,11 +5,14 @@ using System.Threading; using System.Threading.Tasks; using Arcus.Messaging.Abstractions; +using Arcus.Messaging.Abstractions.MessageHandling; using Arcus.Messaging.Abstractions.ServiceBus; -using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling; using Arcus.Messaging.Pumps.Abstractions.Resiliency; +using Arcus.Messaging.Pumps.ServiceBus.Resiliency; using Arcus.Messaging.Tests.Core.Messages.v1; using GuardNet; +using Microsoft.Extensions.Logging; +using Xunit; namespace Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers { @@ -17,21 +20,21 @@ namespace Arcus.Messaging.Tests.Workers.ServiceBus.MessageHandlers /// Represents a message handler that interacts with the /// in providing simulated failures which starts and stops the related message pump. /// - public class CircuitBreakerAzureServiceBusMessageHandler : IAzureServiceBusMessageHandler + public class TestCircuitBreakerAzureServiceBusMessageHandler : CircuitBreakerServiceBusMessageHandler { private readonly string[] _targetMessageIds; private readonly Action _configureOptions; - private readonly IMessagePumpCircuitBreaker _circuitBreaker; - - private static readonly ICollection<(Shipment, DateTimeOffset)> MessageArrivals = new Collection<(Shipment, DateTimeOffset)>(); + + private static readonly ICollection<(Shipment message, DateTimeOffset arrival)> MessageArrivals = new Collection<(Shipment, DateTimeOffset)>(); /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - public CircuitBreakerAzureServiceBusMessageHandler( + public TestCircuitBreakerAzureServiceBusMessageHandler( string[] targetMessageIds, Action configureOptions, - IMessagePumpCircuitBreaker circuitBreaker) + IMessagePumpCircuitBreaker circuitBreaker, + ILogger logger) : base(circuitBreaker, logger) { Guard.NotNull(targetMessageIds, nameof(targetMessageIds)); Guard.NotAny(targetMessageIds, nameof(targetMessageIds)); @@ -39,7 +42,6 @@ public CircuitBreakerAzureServiceBusMessageHandler( _targetMessageIds = targetMessageIds; _configureOptions = configureOptions; - _circuitBreaker = circuitBreaker; } /// @@ -48,32 +50,42 @@ public CircuitBreakerAzureServiceBusMessageHandler( /// The message that was received. /// The context providing more information concerning the processing. /// The information concerning correlation of telemetry and processes by using a variety of unique identifiers. + /// The additional options to manipulate the possible circuit breakage of the message pump for which a message is processed. /// The token to cancel the processing. /// /// Thrown when the , , or the is null. /// - public async Task ProcessMessageAsync( + protected override Task ProcessMessageAsync( Shipment message, AzureServiceBusMessageContext messageContext, MessageCorrelationInfo correlationInfo, + MessagePumpCircuitBreakerOptions options, CancellationToken cancellationToken) { + _configureOptions(options); + if (!_targetMessageIds.Contains(message.Id)) { - return; + return Task.CompletedTask; } MessageArrivals.Add((message, DateTimeOffset.UtcNow)); - if (MessageArrivals.Count < _targetMessageIds.Length) + if (MessageArrivals.Count < 3) { - await _circuitBreaker.PauseMessageProcessingAsync(messageContext.JobId, _configureOptions); + Logger.LogError("Sabotage unavailable dependency system"); + throw new InvalidOperationException("Simulated sabotage of unavailable dependency system"); } + + Logger.LogInformation("Recovered from simulated unavailable dependency system"); + return Task.CompletedTask; } public DateTimeOffset[] GetMessageArrivals() { - return _targetMessageIds.Select(id => MessageArrivals.FirstOrDefault(a => a.Item1.Id == id).Item2).Where(a => a != default) - .ToArray(); + var arrivals = MessageArrivals.Select(a => a.arrival).ToArray(); + Assert.Equal(3, arrivals.Length); + + return arrivals; } } } diff --git a/src/Arcus.Messaging.Tests.Workers/Arcus.Messaging.Tests.Workers.csproj b/src/Arcus.Messaging.Tests.Workers/Arcus.Messaging.Tests.Workers.csproj index 2cee2437..cc848aac 100644 --- a/src/Arcus.Messaging.Tests.Workers/Arcus.Messaging.Tests.Workers.csproj +++ b/src/Arcus.Messaging.Tests.Workers/Arcus.Messaging.Tests.Workers.csproj @@ -6,11 +6,10 @@ - - +