Skip to content

Commit

Permalink
Fix Archiver (#2620)
Browse files Browse the repository at this point in the history
* Fix Archviver

  - Update outbox to not attempt to delete if no Ids are passed in

  - Have archvier mark activity as failed if there is an issue

* Updare Azure archive provider

  - Add Sync Tests

  - Ensure we dont try to upload a blob that is already uploaded
  • Loading branch information
preardon authored Apr 14, 2023
1 parent e3a43b2 commit 919f96d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
28 changes: 22 additions & 6 deletions src/Paramore.Brighter.Archive.Azure/AzureBlobArchiveProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ public AzureBlobArchiveProvider(AzureBlobArchiveProviderOptions options)
public void ArchiveMessage(Message message)
{
var blobClient = _containerClient.GetBlobClient(message.Id.ToString());

blobClient.Upload(BinaryData.FromBytes(message.Body.Bytes));

var alreadyUploaded = blobClient.Exists();

if (!alreadyUploaded.Value)
{
var opts = GetUploadOptions(message);
blobClient.Upload(BinaryData.FromBytes(message.Body.Bytes), opts);
}
}

/// <summary>
Expand All @@ -33,10 +39,20 @@ public void ArchiveMessage(Message message)
public async Task ArchiveMessageAsync(Message message, CancellationToken cancellationToken)
{
var blobClient = _containerClient.GetBlobClient(message.Id.ToString());


var alreadyUploaded = await blobClient.ExistsAsync(cancellationToken);
if (!alreadyUploaded.Value)
{
var opts = GetUploadOptions(message);
await blobClient.UploadAsync(BinaryData.FromBytes(message.Body.Bytes), opts, cancellationToken);
}
}

private BlobUploadOptions GetUploadOptions(Message message)
{
var opts = new BlobUploadOptions()
{
AccessTier = _options.AccessTier,
AccessTier = _options.AccessTier
};

if (_options.TagBlobs)
Expand All @@ -50,7 +66,7 @@ public async Task ArchiveMessageAsync(Message message, CancellationToken cancell
{ "content_type", message.Header.ContentType }
};
}
await blobClient.UploadAsync(BinaryData.FromBytes(message.Body.Bytes), opts, cancellationToken);

return opts;
}
}
4 changes: 4 additions & 0 deletions src/Paramore.Brighter/OutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void Archive(int minimumAge)
{
var messages = _outboxSync.DispatchedMessages(age.Milliseconds, _batchSize);

if (!messages.Any()) return;
foreach (var message in messages)
{
_archiveProvider.ArchiveMessage(message);
Expand All @@ -52,6 +53,7 @@ public void Archive(int minimumAge)
}
catch (Exception e)
{
activity?.SetStatus(ActivityStatusCode.Error, e.Message);
_logger.LogError(e, "Error while archiving from the outbox");
throw;
}
Expand All @@ -78,6 +80,7 @@ public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationTok
var messages = await _outboxAsync.DispatchedMessagesAsync(age.Milliseconds, _batchSize,
cancellationToken: cancellationToken);

if (!messages.Any()) return;
foreach (var message in messages)
{
await _archiveProvider.ArchiveMessageAsync(message, cancellationToken);
Expand All @@ -87,6 +90,7 @@ public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationTok
}
catch (Exception e)
{
activity?.SetStatus(ActivityStatusCode.Error, e.Message);
_logger.LogError(e, "Error while archiving from the outbox");
throw;
}
Expand Down
6 changes: 5 additions & 1 deletion src/Paramore.Brighter/RelationDatabaseOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public void Add(IEnumerable<Message> messages, int outBoxTimeout = -1,
/// <param name="messageIds">The id of the message to delete</param>
public void Delete(params Guid[] messageIds)
{
WriteToStore(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null);
if(messageIds.Any())
WriteToStore(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null);
}

/// <summary>
Expand Down Expand Up @@ -317,6 +318,9 @@ public Task<IEnumerable<Message>> OutstandingMessagesAsync(
/// <param name="messageIds">The id of the message to delete</param>
public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messageIds)
{
if(!messageIds.Any())
return Task.CompletedTask;

return WriteToStoreAsync(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null,
cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public void Setup()
_commandMapper = new JsonBodyMessageMapper<SuperAwesomeCommand>(_topicDirectory);
_eventMapper = new JsonBodyMessageMapper<SuperAwesomeEvent>(_topicDirectory);
}

[Test]
public async Task GivenARequestToArchiveAMessage_TheMessageIsArchived()
{
var commandMessage = _commandMapper.MapToMessage(_command);

var blobClient = GetClient(AccessTier.Cool).GetBlobClient(commandMessage.Id.ToString());

await _provider.ArchiveMessageAsync(commandMessage, CancellationToken.None);
_provider.ArchiveMessage(commandMessage);

Assert.That((bool)await blobClient.ExistsAsync(), Is.True);

Expand All @@ -65,6 +65,50 @@ public async Task GivenARequestToArchiveAMessage_WhenTagsAreTurnedOn_ThenTagsAre

var blobClient = GetClient(AccessTier.Hot, true).GetBlobClient(eventMessage.Id.ToString());

_provider.ArchiveMessage(eventMessage);

var tier = await blobClient.GetPropertiesAsync();
Assert.That(tier.Value.AccessTier, Is.EqualTo(AccessTier.Hot.ToString()));

var tags = (await blobClient.GetTagsAsync()).Value.Tags;

Assert.That(tags["topic"], Is.EqualTo(eventMessage.Header.Topic));
Assert.That(Guid.Parse(tags["correlationId"]), Is.EqualTo(eventMessage.Header.CorrelationId));
Assert.That(tags["message_type"], Is.EqualTo(eventMessage.Header.MessageType.ToString()));
Assert.That(DateTime.Parse(tags["timestamp"]), Is.EqualTo(eventMessage.Header.TimeStamp));
Assert.That(tags["content_type"], Is.EqualTo(eventMessage.Header.ContentType));
}

[Test]
public async Task GivenARequestToArchiveAMessageAsync_TheMessageIsArchived()
{
var commandMessage = _commandMapper.MapToMessage(_command);

var blobClient = GetClient(AccessTier.Cool).GetBlobClient(commandMessage.Id.ToString());

await _provider.ArchiveMessageAsync(commandMessage, CancellationToken.None);

Assert.That((bool)await blobClient.ExistsAsync(), Is.True);

var tags = (await blobClient.GetTagsAsync()).Value.Tags;
Assert.That(tags.Count, Is.EqualTo(0));

var body = (await blobClient.DownloadContentAsync()).Value.Content.ToString();

Assert.That(body, Is.EqualTo(commandMessage.Body.Value));

var tier = await blobClient.GetPropertiesAsync();
Assert.That(tier.Value.AccessTier, Is.EqualTo(AccessTier.Cool.ToString()));

}

[Test]
public async Task GivenARequestToArchiveAMessageAsync_WhenTagsAreTurnedOn_ThenTagsAreWritten()
{
var eventMessage = _eventMapper.MapToMessage(_event);

var blobClient = GetClient(AccessTier.Hot, true).GetBlobClient(eventMessage.Id.ToString());

await _provider.ArchiveMessageAsync(eventMessage, CancellationToken.None);

var tier = await blobClient.GetPropertiesAsync();
Expand Down

0 comments on commit 919f96d

Please sign in to comment.