From bd4635e4f73aa0f054e8ce4ee5a49878fa1c9db0 Mon Sep 17 00:00:00 2001 From: Tim Salva Date: Wed, 28 Aug 2024 13:28:11 +0100 Subject: [PATCH] feature: Backport #3270 (#3272) --- .../SqsInlineMessageCreator.cs | 39 ++++++++++++++----- .../SqsMessagePublisher.cs | 14 ++++++- ...ing_a_message_via_the_messaging_gateway.cs | 23 ++++++++--- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs index dbdba581cc..db920f9315 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsInlineMessageCreator.cs @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) var timeStamp = HeaderResult.Empty(); var receiptHandle = HeaderResult.Empty(); var replyTo = HeaderResult.Empty(); + var subject = HeaderResult.Empty(); Message message; try { - _messageAttributes = ReadMessageAttributes(sqsMessage); + var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + _messageAttributes = ReadMessageAttributes(jsonDocument); topic = ReadTopic(); messageId = ReadMessageId(); @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) messageType = ReadMessageType(); timeStamp = ReadTimestamp(); replyTo = ReadReplyTo(); + subject = ReadMessageSubject(jsonDocument); receiptHandle = ReadReceiptHandle(sqsMessage); var messageHeader = timeStamp.Success @@ -72,11 +75,16 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) if (replyTo.Success) messageHeader.ReplyTo = replyTo.Result; + if (subject.Result != null) + { + messageHeader.Bag.Add("Subject", subject.Result); + } + if (contentType.Success) messageHeader.ContentType = contentType.Result; - message = new Message(messageHeader, ReadMessageBody(sqsMessage)); - + message = new Message(messageHeader, ReadMessageBody(jsonDocument)); + //deserialize the bag var bag = ReadMessageBag(); foreach (var key in bag.Keys) @@ -97,14 +105,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage) return message; } - private Dictionary ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage) + private static Dictionary ReadMessageAttributes(JsonDocument jsonDocument) { var messageAttributes = new Dictionary(); try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); - if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes)) { messageAttributes = JsonSerializer.Deserialize>( @@ -239,13 +245,28 @@ private HeaderResult ReadTopic() return new HeaderResult(string.Empty, true); } - - private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage) + + private static HeaderResult ReadMessageSubject(JsonDocument jsonDocument) { try { - var jsonDocument = JsonDocument.Parse(sqsMessage.Body); + if (jsonDocument.RootElement.TryGetProperty("Subject", out var value)) + { + return new HeaderResult(value.GetString(), true); + } + } + catch (Exception ex) + { + s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}"); + } + return new HeaderResult(null, true); + } + + private static MessageBody ReadMessageBody(JsonDocument jsonDocument) + { + try + { if (jsonDocument.RootElement.TryGetProperty("Message", out var value)) { return new MessageBody(value.GetString()); diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs index fd80fa9f5e..467d373a0d 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessagePublisher.cs @@ -44,7 +44,8 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien public async Task PublishAsync(Message message) { var messageString = message.Body.Value; - var publishRequest = new PublishRequest(_topicArn, messageString); + var subject = GetSubject(message); + var publishRequest = new PublishRequest(_topicArn, messageString, subject); var messageAttributes = new Dictionary(); messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"}); @@ -73,6 +74,17 @@ public async Task PublishAsync(Message message) return null; } + private static string GetSubject(Message message) + { + var subjectExists = message.Header.Bag.TryGetValue("Subject", out var subject); + if (subjectExists) + { + message.Header.Bag.Remove("Subject"); + } + + return subject?.ToString(); + } + public string Publish(Message message) { return PublishAsync(message).GetAwaiter().GetResult(); diff --git a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 1289482712..c42689a10f 100644 --- a/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.AWS.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -37,7 +37,8 @@ public SqsMessageProducerSendTests() SqsSubscription subscription = new( name: new SubscriptionName(channelName), channelName: new ChannelName(channelName), - routingKey: routingKey + routingKey: routingKey, + rawMessageDelivery: false ); _message = new Message( @@ -58,14 +59,21 @@ public SqsMessageProducerSendTests() [Theory] - [InlineData(true)] - [InlineData(false)] - public async Task When_posting_a_message_via_the_producer(bool sendAsync) + [InlineData("test subject", true)] + [InlineData(null, true)] + [InlineData("test subject", false)] + [InlineData(null, false)] + public async Task When_posting_a_message_via_the_producer(string subject, bool sendAsync) { //arrange + if (subject != null) + { + _message.Header.Bag.Add("Subject", subject); + } + if (sendAsync) { - _messageProducer.SendAsync(_message).Wait(); + await _messageProducer.SendAsync(_message); } else { @@ -95,6 +103,9 @@ public async Task When_posting_a_message_via_the_producer(bool sendAsync) message.Header.DelayedMilliseconds.Should().Be(0); //{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false} message.Body.Value.Should().Be(_message.Body.Value); + + message.Header.Bag.TryGetValue("Subject", out var actualSubject).Should().Be(subject != null); + actualSubject.Should().Be(subject); } public void Dispose() @@ -104,7 +115,7 @@ public void Dispose() _messageProducer?.Dispose(); } - private DateTime RoundToSeconds(DateTime dateTime) + private static DateTime RoundToSeconds(DateTime dateTime) { return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind); }