From 919f96dee6e2698c281d1cfe872c15a8a9bc73ad Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Fri, 14 Apr 2023 06:46:18 +0100 Subject: [PATCH] Fix Archiver (#2620) * Fix Archviver - Update outbox to not attempt to delete if no Ids are passed in - Have archvier mark activity as failed if there is an issue * Updare Azure archive provider - Add Sync Tests - Ensure we dont try to upload a blob that is already uploaded --- .../AzureBlobArchiveProvider.cs | 28 ++++++++--- src/Paramore.Brighter/OutboxArchiver.cs | 4 ++ .../RelationDatabaseOutbox.cs | 6 ++- .../AzureBlobArchiveProviderTests.cs | 48 ++++++++++++++++++- 4 files changed, 77 insertions(+), 9 deletions(-) diff --git a/src/Paramore.Brighter.Archive.Azure/AzureBlobArchiveProvider.cs b/src/Paramore.Brighter.Archive.Azure/AzureBlobArchiveProvider.cs index b6e311cf70..727b0dce37 100644 --- a/src/Paramore.Brighter.Archive.Azure/AzureBlobArchiveProvider.cs +++ b/src/Paramore.Brighter.Archive.Azure/AzureBlobArchiveProvider.cs @@ -21,8 +21,14 @@ public AzureBlobArchiveProvider(AzureBlobArchiveProviderOptions options) public void ArchiveMessage(Message message) { var blobClient = _containerClient.GetBlobClient(message.Id.ToString()); - - blobClient.Upload(BinaryData.FromBytes(message.Body.Bytes)); + + var alreadyUploaded = blobClient.Exists(); + + if (!alreadyUploaded.Value) + { + var opts = GetUploadOptions(message); + blobClient.Upload(BinaryData.FromBytes(message.Body.Bytes), opts); + } } /// @@ -33,10 +39,20 @@ public void ArchiveMessage(Message message) public async Task ArchiveMessageAsync(Message message, CancellationToken cancellationToken) { var blobClient = _containerClient.GetBlobClient(message.Id.ToString()); - + + var alreadyUploaded = await blobClient.ExistsAsync(cancellationToken); + if (!alreadyUploaded.Value) + { + var opts = GetUploadOptions(message); + await blobClient.UploadAsync(BinaryData.FromBytes(message.Body.Bytes), opts, cancellationToken); + } + } + + private BlobUploadOptions GetUploadOptions(Message message) + { var opts = new BlobUploadOptions() { - AccessTier = _options.AccessTier, + AccessTier = _options.AccessTier }; if (_options.TagBlobs) @@ -50,7 +66,7 @@ public async Task ArchiveMessageAsync(Message message, CancellationToken cancell { "content_type", message.Header.ContentType } }; } - - await blobClient.UploadAsync(BinaryData.FromBytes(message.Body.Bytes), opts, cancellationToken); + + return opts; } } diff --git a/src/Paramore.Brighter/OutboxArchiver.cs b/src/Paramore.Brighter/OutboxArchiver.cs index 90f2419ef4..6d4ac21855 100644 --- a/src/Paramore.Brighter/OutboxArchiver.cs +++ b/src/Paramore.Brighter/OutboxArchiver.cs @@ -43,6 +43,7 @@ public void Archive(int minimumAge) { var messages = _outboxSync.DispatchedMessages(age.Milliseconds, _batchSize); + if (!messages.Any()) return; foreach (var message in messages) { _archiveProvider.ArchiveMessage(message); @@ -52,6 +53,7 @@ public void Archive(int minimumAge) } catch (Exception e) { + activity?.SetStatus(ActivityStatusCode.Error, e.Message); _logger.LogError(e, "Error while archiving from the outbox"); throw; } @@ -78,6 +80,7 @@ public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationTok var messages = await _outboxAsync.DispatchedMessagesAsync(age.Milliseconds, _batchSize, cancellationToken: cancellationToken); + if (!messages.Any()) return; foreach (var message in messages) { await _archiveProvider.ArchiveMessageAsync(message, cancellationToken); @@ -87,6 +90,7 @@ public async Task ArchiveAsync(int minimumAge, CancellationToken cancellationTok } catch (Exception e) { + activity?.SetStatus(ActivityStatusCode.Error, e.Message); _logger.LogError(e, "Error while archiving from the outbox"); throw; } diff --git a/src/Paramore.Brighter/RelationDatabaseOutbox.cs b/src/Paramore.Brighter/RelationDatabaseOutbox.cs index 91315bb8f0..77b4578617 100644 --- a/src/Paramore.Brighter/RelationDatabaseOutbox.cs +++ b/src/Paramore.Brighter/RelationDatabaseOutbox.cs @@ -74,7 +74,8 @@ public void Add(IEnumerable messages, int outBoxTimeout = -1, /// The id of the message to delete public void Delete(params Guid[] messageIds) { - WriteToStore(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null); + if(messageIds.Any()) + WriteToStore(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null); } /// @@ -317,6 +318,9 @@ public Task> OutstandingMessagesAsync( /// The id of the message to delete public Task DeleteAsync(CancellationToken cancellationToken, params Guid[] messageIds) { + if(!messageIds.Any()) + return Task.CompletedTask; + return WriteToStoreAsync(null, connection => InitDeleteDispatchedCommand(connection, messageIds), null, cancellationToken); } diff --git a/tests/Paramore.Brighter.Azure.Tests/AzureBlobArchiveProviderTests.cs b/tests/Paramore.Brighter.Azure.Tests/AzureBlobArchiveProviderTests.cs index cf4acc7e66..1bd88b4055 100644 --- a/tests/Paramore.Brighter.Azure.Tests/AzureBlobArchiveProviderTests.cs +++ b/tests/Paramore.Brighter.Azure.Tests/AzureBlobArchiveProviderTests.cs @@ -34,7 +34,7 @@ public void Setup() _commandMapper = new JsonBodyMessageMapper(_topicDirectory); _eventMapper = new JsonBodyMessageMapper(_topicDirectory); } - + [Test] public async Task GivenARequestToArchiveAMessage_TheMessageIsArchived() { @@ -42,7 +42,7 @@ public async Task GivenARequestToArchiveAMessage_TheMessageIsArchived() var blobClient = GetClient(AccessTier.Cool).GetBlobClient(commandMessage.Id.ToString()); - await _provider.ArchiveMessageAsync(commandMessage, CancellationToken.None); + _provider.ArchiveMessage(commandMessage); Assert.That((bool)await blobClient.ExistsAsync(), Is.True); @@ -65,6 +65,50 @@ public async Task GivenARequestToArchiveAMessage_WhenTagsAreTurnedOn_ThenTagsAre var blobClient = GetClient(AccessTier.Hot, true).GetBlobClient(eventMessage.Id.ToString()); + _provider.ArchiveMessage(eventMessage); + + var tier = await blobClient.GetPropertiesAsync(); + Assert.That(tier.Value.AccessTier, Is.EqualTo(AccessTier.Hot.ToString())); + + var tags = (await blobClient.GetTagsAsync()).Value.Tags; + + Assert.That(tags["topic"], Is.EqualTo(eventMessage.Header.Topic)); + Assert.That(Guid.Parse(tags["correlationId"]), Is.EqualTo(eventMessage.Header.CorrelationId)); + Assert.That(tags["message_type"], Is.EqualTo(eventMessage.Header.MessageType.ToString())); + Assert.That(DateTime.Parse(tags["timestamp"]), Is.EqualTo(eventMessage.Header.TimeStamp)); + Assert.That(tags["content_type"], Is.EqualTo(eventMessage.Header.ContentType)); + } + + [Test] + public async Task GivenARequestToArchiveAMessageAsync_TheMessageIsArchived() + { + var commandMessage = _commandMapper.MapToMessage(_command); + + var blobClient = GetClient(AccessTier.Cool).GetBlobClient(commandMessage.Id.ToString()); + + await _provider.ArchiveMessageAsync(commandMessage, CancellationToken.None); + + Assert.That((bool)await blobClient.ExistsAsync(), Is.True); + + var tags = (await blobClient.GetTagsAsync()).Value.Tags; + Assert.That(tags.Count, Is.EqualTo(0)); + + var body = (await blobClient.DownloadContentAsync()).Value.Content.ToString(); + + Assert.That(body, Is.EqualTo(commandMessage.Body.Value)); + + var tier = await blobClient.GetPropertiesAsync(); + Assert.That(tier.Value.AccessTier, Is.EqualTo(AccessTier.Cool.ToString())); + + } + + [Test] + public async Task GivenARequestToArchiveAMessageAsync_WhenTagsAreTurnedOn_ThenTagsAreWritten() + { + var eventMessage = _eventMapper.MapToMessage(_event); + + var blobClient = GetClient(AccessTier.Hot, true).GetBlobClient(eventMessage.Id.ToString()); + await _provider.ArchiveMessageAsync(eventMessage, CancellationToken.None); var tier = await blobClient.GetPropertiesAsync();