Skip to content

Commit

Permalink
Add error handling and ensure that traces are written (#2618)
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon authored Apr 13, 2023
1 parent 7298899 commit e3a43b2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ private void Archive(object state)
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
throw;
}

s_logger.LogInformation("Outbox Sweeper sleeping");
Expand Down
63 changes: 43 additions & 20 deletions src/Paramore.Brighter/OutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Paramore.Brighter.Logging;

namespace Paramore.Brighter
{
Expand All @@ -14,6 +16,7 @@ public class OutboxArchiver
private IAmAnOutboxSync<Message> _outboxSync;
private IAmAnOutboxAsync<Message> _outboxAsync;
private IAmAnArchiveProvider _archiveProvider;
private readonly ILogger _logger = ApplicationLogging.CreateLogger<OutboxArchiver>();

public OutboxArchiver(IAmAnOutbox<Message> outbox,IAmAnArchiveProvider archiveProvider, int batchSize = 100)
{
Expand All @@ -34,20 +37,29 @@ public OutboxArchiver(IAmAnOutbox<Message> outbox,IAmAnArchiveProvider archivePr
public void Archive(int minimumAge)
{
var activity = ApplicationTelemetry.ActivitySource.StartActivity(ARCHIVEOUTBOX, ActivityKind.Server);

var age = TimeSpan.FromHours(minimumAge);

var messages = _outboxSync.DispatchedMessages(age.Milliseconds, _batchSize);

foreach (var message in messages)

try
{
_archiveProvider.ArchiveMessage(message);
var messages = _outboxSync.DispatchedMessages(age.Milliseconds, _batchSize);

foreach (var message in messages)
{
_archiveProvider.ArchiveMessage(message);
}

_outboxSync.Delete(messages.Select(e => e.Id).ToArray());
}
catch (Exception e)
{
_logger.LogError(e, "Error while archiving from the outbox");
throw;
}
finally
{
if(activity?.DisplayName == ARCHIVEOUTBOX)
activity.Dispose();
}

_outboxSync.Delete(messages.Select(e => e.Id).ToArray());

if(activity?.DisplayName == ARCHIVEOUTBOX)
activity.Dispose();
}

/// <summary>
Expand All @@ -60,18 +72,29 @@ public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationTok
var activity = ApplicationTelemetry.ActivitySource.StartActivity(ARCHIVEOUTBOX, ActivityKind.Server);

var age = TimeSpan.FromHours(minimumAge);

var messages = await _outboxAsync.DispatchedMessagesAsync(age.Milliseconds, _batchSize, cancellationToken: cancellationToken);

foreach (var message in messages)
try
{
await _archiveProvider.ArchiveMessageAsync(message, cancellationToken);
var messages = await _outboxAsync.DispatchedMessagesAsync(age.Milliseconds, _batchSize,
cancellationToken: cancellationToken);

foreach (var message in messages)
{
await _archiveProvider.ArchiveMessageAsync(message, cancellationToken);
}

await _outboxAsync.DeleteAsync(cancellationToken, messages.Select(e => e.Id).ToArray());
}
catch (Exception e)
{
_logger.LogError(e, "Error while archiving from the outbox");
throw;
}
finally
{
if(activity?.DisplayName == ARCHIVEOUTBOX)
activity.Dispose();
}

await _outboxAsync.DeleteAsync(cancellationToken, messages.Select(e => e.Id).ToArray());

if(activity?.DisplayName == ARCHIVEOUTBOX)
activity.Dispose();
}
}
}

0 comments on commit e3a43b2

Please sign in to comment.