Skip to content

Commit

Permalink
Fix - include correlation information when tracking exceptions (#133)
Browse files Browse the repository at this point in the history
* Fix - include correlation information when tracking exceptions

* pr-test: add integration test to verify if the unhandled exception includes tracking correlation information

* pr-fix:use API key to connect to application insights

* pr-sug: assert on operation ID in application insights specific operation ID
  • Loading branch information
stijnmoreels authored Oct 29, 2020
1 parent 05f1808 commit 1b93c04
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 66 deletions.
124 changes: 60 additions & 64 deletions src/Arcus.Messaging.Pumps.ServiceBus/AzureServiceBusMessagePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -415,32 +415,21 @@ private async Task DeleteTopicSubscriptionAsync(CancellationToken cancellationTo

try
{
bool subscriptionExists =
await serviceBusClient.SubscriptionExistsAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken);

bool subscriptionExists = await serviceBusClient.SubscriptionExistsAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken);
if (subscriptionExists)
{
Logger.LogTrace(
"Deleting subscription '{SubscriptionName}' on topic '{Path}'...",
SubscriptionName, serviceBusConnectionString.EntityPath);

Logger.LogTrace("Deleting subscription '{SubscriptionName}' on topic '{Path}'...", SubscriptionName, serviceBusConnectionString.EntityPath);
await serviceBusClient.DeleteSubscriptionAsync(serviceBusConnectionString.EntityPath, SubscriptionName, cancellationToken);

Logger.LogTrace(
"Subscription '{SubscriptionName}' deleted on topic '{Path}'",
SubscriptionName, serviceBusConnectionString.EntityPath);
Logger.LogTrace("Subscription '{SubscriptionName}' deleted on topic '{Path}'", SubscriptionName, serviceBusConnectionString.EntityPath);
}
else
{
Logger.LogTrace(
"Cannot delete topic subscription with name '{SubscriptionName}' because no subscription exists on Service Bus resource",
SubscriptionName); }
Logger.LogTrace("Cannot delete topic subscription with name '{SubscriptionName}' because no subscription exists on Service Bus resource", SubscriptionName);
}
}
catch (Exception exception)
{
Logger.LogWarning(exception,
"Failed to delete topic subscription with name '{SubscriptionName}' on Service Bus resource",
SubscriptionName);
Logger.LogWarning(exception, "Failed to delete topic subscription with name '{SubscriptionName}' on Service Bus resource", SubscriptionName);
}
finally
{
Expand Down Expand Up @@ -479,34 +468,24 @@ private async Task HandleMessageAsync(Message message, CancellationToken cancell
Logger.LogInformation("No operation ID was found on the message");
}

try
MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo();
using (IServiceScope serviceScope = ServiceProvider.CreateScope())
{
MessageCorrelationInfo correlationInfo = message.GetCorrelationInfo();
using (IServiceScope serviceScope = ServiceProvider.CreateScope())
var correlationInfoAccessor = serviceScope.ServiceProvider.GetService<ICorrelationInfoAccessor<MessageCorrelationInfo>>();
if (correlationInfoAccessor is null)
{
var correlationInfoAccessor = serviceScope.ServiceProvider.GetService<ICorrelationInfoAccessor<MessageCorrelationInfo>>();
if (correlationInfoAccessor is null)
{
Logger.LogTrace("No message correlation configured");
await ProcessMessageAsync(message, cancellationToken, correlationInfo);
}
else
Logger.LogTrace("No message correlation configured");
await ProcessMessageWithFallbackAsync(message, cancellationToken, correlationInfo);
}
else
{
correlationInfoAccessor.SetCorrelationInfo(correlationInfo);
using (LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfoAccessor)))
{
correlationInfoAccessor.SetCorrelationInfo(correlationInfo);
using (LogContext.Push(new MessageCorrelationInfoEnricher(correlationInfoAccessor)))
{
await ProcessMessageAsync(message, cancellationToken, correlationInfo);
}
await ProcessMessageWithFallbackAsync(message, cancellationToken, correlationInfo);
}
}
}
catch (Exception ex)
{
Logger.LogCritical(ex, "Unable to process message with ID '{MessageId}'", message.MessageId);
await HandleReceiveExceptionAsync(ex);

throw;
}
}

/// <summary>
Expand Down Expand Up @@ -535,41 +514,58 @@ protected override Task PreProcessMessageAsync<TMessageContext>(MessageHandler m
return Task.CompletedTask;
}

private async Task ProcessMessageAsync(
Message message,
CancellationToken cancellationToken,
MessageCorrelationInfo correlationInfo)
private async Task ProcessMessageWithFallbackAsync(Message message, CancellationToken cancellationToken, MessageCorrelationInfo correlationInfo)
{
Logger.LogTrace("Received message '{MessageId}'", message.MessageId);
try
{
Logger.LogTrace("Received message '{MessageId}'", message.MessageId);

var messageContext = new AzureServiceBusMessageContext(message.MessageId, message.SystemProperties, message.UserProperties);
Encoding encoding = messageContext.GetMessageEncodingProperty(Logger);
string messageBody = encoding.GetString(message.Body);
var messageContext = new AzureServiceBusMessageContext(message.MessageId, message.SystemProperties, message.UserProperties);
Encoding encoding = messageContext.GetMessageEncodingProperty(Logger);
string messageBody = encoding.GetString(message.Body);

if (_fallbackMessageHandler is null)
{
await ProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken);
}
else
{
if (_fallbackMessageHandler is AzureServiceBusMessageHandlerTemplate specificMessageHandler)
if (_fallbackMessageHandler is null)
{
specificMessageHandler.SetMessageReceiver(_messageReceiver);
await ProcessMessageAsync(messageBody, messageContext, correlationInfo, cancellationToken);
}

MessageHandlerResult result = await ProcessMessageAndCaptureAsync(messageBody, messageContext, correlationInfo, cancellationToken);
if (result.Exception != null)
else
{
throw result.Exception;
await FallbackProcessMessageAsync(message, messageBody, messageContext, correlationInfo, cancellationToken);
}

if (!result.IsProcessed)
{
await _fallbackMessageHandler.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken);
}
Logger.LogTrace("Message '{MessageId}' processed", message.MessageId);
}
catch (Exception exception)
{
Logger.LogCritical(exception, "Unable to process message with ID '{MessageId}'", message.MessageId);
await HandleReceiveExceptionAsync(exception);

Logger.LogTrace("Message '{MessageId}' processed", message.MessageId);
throw;
}
}

private async Task FallbackProcessMessageAsync(
Message message,
string messageBody,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken)
{
if (_fallbackMessageHandler is AzureServiceBusMessageHandlerTemplate specificMessageHandler)
{
specificMessageHandler.SetMessageReceiver(_messageReceiver);
}

MessageHandlerResult result = await ProcessMessageAndCaptureAsync(messageBody, messageContext, correlationInfo, cancellationToken);
if (result.Exception != null)
{
throw result.Exception;
}

if (!result.IsProcessed)
{
await _fallbackMessageHandler.ProcessMessageAsync(message, messageContext, correlationInfo, cancellationToken);
}
}

private static async Task UntilCancelledAsync(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
<ItemGroup>
<PackageReference Include="Arcus.EventGrid.Publishing" Version="3.0.0" />
<PackageReference Include="Arcus.EventGrid.Testing" Version="3.0.0" />
<PackageReference Include="Arcus.Observability.Telemetry.Core" Version="0.4.0" />
<PackageReference Include="Arcus.Observability.Telemetry.Serilog.Sinks.ApplicationInsights" Version="0.4.0" />
<PackageReference Include="Arcus.Security.Providers.AzureKeyVault" Version="1.3.0" />
<PackageReference Include="Guard.Net" Version="1.2.0" />
<PackageReference Include="Microsoft.Azure.ApplicationInsights.Query" Version="1.0.0" />
<PackageReference Include="Microsoft.Azure.Management.ServiceBus" Version="2.1.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.0.0" />
Expand All @@ -29,6 +32,9 @@
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="Serilog" Version="2.10.0" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="3.1.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="3.1.1" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using GuardNet;

namespace Arcus.Messaging.Tests.Integration.Fixture
{
/// <summary>
/// Represents an application configuration section related to information regarding Azure Application Insights.
/// </summary>
public class ApplicationInsightsConfig
{
/// <summary>
/// Initializes a new instance of the <see cref="ApplicationInsightsConfig"/> class.
/// </summary>
/// <param name="instrumentationKey">The instrumentation key of the Azure Application Insights resource.</param>
/// <param name="applicationId">The application ID that has API access to the Azure Application Insights resource.</param>
/// <param name="apiKey">The application API key that has API access to the Azure Application Insights resource.</param>
/// <exception cref="System.ArgumentException">Thrown when the <paramref name="instrumentationKey"/> or <paramref name="apiKey"/> or <paramref name="apiKey"/> is blank.</exception>
public ApplicationInsightsConfig(string instrumentationKey, string applicationId, string apiKey)
{
Guard.NotNullOrWhitespace(instrumentationKey, nameof(instrumentationKey), "Requires a non-blank Application Insights instrumentation key");
Guard.NotNullOrWhitespace(apiKey, nameof(apiKey), "Requires a non-blank Application Insights application application ID");
Guard.NotNullOrWhitespace(apiKey, nameof(apiKey), "Requires a non-blank Application Insights application API key");

InstrumentationKey = instrumentationKey;
ApplicationId = applicationId;
ApiKey = apiKey;
}

/// <summary>
/// Gets the instrumentation key to connect to the Application Insights resource.
/// </summary>
public string InstrumentationKey { get; }

/// <summary>
/// Gets the application ID which has API access to the Application Insights resource.
/// </summary>
public string ApplicationId { get; }

/// <summary>
/// Gets the application API key which has API access to the Application Insights resource.
/// </summary>
public string ApiKey { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System;
using Arcus.EventGrid.Publishing;
using Arcus.Messaging.Tests.Core.Messages.v1;
using Arcus.Messaging.Tests.Workers.MessageHandlers;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Serilog;
using Serilog.Configuration;
using Serilog.Events;

// ReSharper disable once CheckNamespace
namespace Arcus.Messaging.Tests.Workers.ServiceBus
{
public class ServiceBusQueueTrackCorrelationOnExceptionProgram
{
public static void main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.WriteTo.Console()
.CreateLogger();

try
{
CreateHostBuilder(args)
.Build()
.Run();
}
catch (Exception exception)
{
Log.Fatal(exception, "Host terminated unexpectedly");
}
finally
{
Log.CloseAndFlush();
}
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(configuration =>
{
configuration.AddCommandLine(args);
configuration.AddEnvironmentVariables();
})
.UseSerilog(UpdateLoggerConfiguration)
.ConfigureServices((hostContext, services) =>
{
services.AddServiceBusQueueMessagePump(configuration => configuration["ARCUS_SERVICEBUS_CONNECTIONSTRING"])
.WithServiceBusMessageHandler<OrdersSabotageAzureServiceBusMessageHandler, Order>();

services.AddTcpHealthProbes("ARCUS_HEALTH_PORT", builder => builder.AddCheck("sample", () => HealthCheckResult.Healthy()));
});

private static void UpdateLoggerConfiguration(
HostBuilderContext hostContext,
LoggerConfiguration currentLoggerConfiguration)
{
var instrumentationKey = hostContext.Configuration.GetValue<string>("APPLICATIONINSIGHTS_INSTRUMENTATIONKEY");

currentLoggerConfiguration
.MinimumLevel.Debug()
.MinimumLevel.Override("Microsoft", LogEventLevel.Information)
.Enrich.FromLogContext()
.Enrich.WithVersion()
.Enrich.WithComponentName("Service Bus Queue Worker")
.WriteTo.Console()
.WriteTo.AzureApplicationInsights(instrumentationKey);
}
}
}
13 changes: 13 additions & 0 deletions src/Arcus.Messaging.Tests.Integration/Fixture/TestConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ public string GetServiceBusConnectionString(ServiceBusEntity entity)
}
}

/// <summary>
/// Gets the Application Insights configuration from the application configuration.
/// </summary>
/// <exception cref="ArgumentException">Thrown when one of the Application Insights configuration values is blank.</exception>
public ApplicationInsightsConfig GetApplicationInsightsConfig()
{
var instrumentationKey = _config.GetValue<string>("Arcus:ApplicationInsights:InstrumentationKey");
var applicationId = _config.GetValue<string>("Arcus:ApplicationInsights:ApplicationId");
var apiKey = _config.GetValue<string>("Arcus:ApplicationInsights:ApiKey");

return new ApplicationInsightsConfig(instrumentationKey, applicationId, apiKey);
}

/// <summary>
/// Gets the project directory where the fixtures are located.
/// </summary>
Expand Down
Loading

0 comments on commit 1b93c04

Please sign in to comment.