Skip to content

Commit

Permalink
feature: Backport #3270 (#3272)
Browse files Browse the repository at this point in the history
  • Loading branch information
jtsalva authored Aug 28, 2024
1 parent d3558a1 commit bd4635e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
var timeStamp = HeaderResult<DateTime>.Empty();
var receiptHandle = HeaderResult<string>.Empty();
var replyTo = HeaderResult<string>.Empty();
var subject = HeaderResult<string>.Empty();

Message message;
try
{
_messageAttributes = ReadMessageAttributes(sqsMessage);
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
_messageAttributes = ReadMessageAttributes(jsonDocument);

topic = ReadTopic();
messageId = ReadMessageId();
Expand All @@ -60,6 +62,7 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
messageType = ReadMessageType();
timeStamp = ReadTimestamp();
replyTo = ReadReplyTo();
subject = ReadMessageSubject(jsonDocument);
receiptHandle = ReadReceiptHandle(sqsMessage);

var messageHeader = timeStamp.Success
Expand All @@ -72,11 +75,16 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
if (replyTo.Success)
messageHeader.ReplyTo = replyTo.Result;

if (subject.Result != null)
{
messageHeader.Bag.Add("Subject", subject.Result);
}

if (contentType.Success)
messageHeader.ContentType = contentType.Result;

message = new Message(messageHeader, ReadMessageBody(sqsMessage));

message = new Message(messageHeader, ReadMessageBody(jsonDocument));
//deserialize the bag
var bag = ReadMessageBag();
foreach (var key in bag.Keys)
Expand All @@ -97,14 +105,12 @@ public Message CreateMessage(Amazon.SQS.Model.Message sqsMessage)
return message;
}

private Dictionary<string, JsonElement> ReadMessageAttributes(Amazon.SQS.Model.Message sqsMessage)
private static Dictionary<string, JsonElement> ReadMessageAttributes(JsonDocument jsonDocument)
{
var messageAttributes = new Dictionary<string, JsonElement>();

try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);

if (jsonDocument.RootElement.TryGetProperty("MessageAttributes", out var attributes))
{
messageAttributes = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(
Expand Down Expand Up @@ -239,13 +245,28 @@ private HeaderResult<string> ReadTopic()

return new HeaderResult<string>(string.Empty, true);
}

private MessageBody ReadMessageBody(Amazon.SQS.Model.Message sqsMessage)
private static HeaderResult<string> ReadMessageSubject(JsonDocument jsonDocument)
{
try
{
var jsonDocument = JsonDocument.Parse(sqsMessage.Body);
if (jsonDocument.RootElement.TryGetProperty("Subject", out var value))
{
return new HeaderResult<string>(value.GetString(), true);
}
}
catch (Exception ex)
{
s_logger.LogWarning($"Failed to parse Sqs Message Body to valid Json Document, ex: {ex}");
}

return new HeaderResult<string>(null, true);
}

private static MessageBody ReadMessageBody(JsonDocument jsonDocument)
{
try
{
if (jsonDocument.RootElement.TryGetProperty("Message", out var value))
{
return new MessageBody(value.GetString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public SqsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
public async Task<string> PublishAsync(Message message)
{
var messageString = message.Body.Value;
var publishRequest = new PublishRequest(_topicArn, messageString);
var subject = GetSubject(message);
var publishRequest = new PublishRequest(_topicArn, messageString, subject);

var messageAttributes = new Dictionary<string, MessageAttributeValue>();
messageAttributes.Add(HeaderNames.Id, new MessageAttributeValue{StringValue = Convert.ToString(message.Header.Id), DataType = "String"});
Expand Down Expand Up @@ -73,6 +74,17 @@ public async Task<string> PublishAsync(Message message)
return null;
}

private static string GetSubject(Message message)
{
var subjectExists = message.Header.Bag.TryGetValue("Subject", out var subject);
if (subjectExists)
{
message.Header.Bag.Remove("Subject");
}

return subject?.ToString();
}

public string Publish(Message message)
{
return PublishAsync(message).GetAwaiter().GetResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public SqsMessageProducerSendTests()
SqsSubscription<MyCommand> subscription = new(
name: new SubscriptionName(channelName),
channelName: new ChannelName(channelName),
routingKey: routingKey
routingKey: routingKey,
rawMessageDelivery: false
);

_message = new Message(
Expand All @@ -58,14 +59,21 @@ public SqsMessageProducerSendTests()


[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task When_posting_a_message_via_the_producer(bool sendAsync)
[InlineData("test subject", true)]
[InlineData(null, true)]
[InlineData("test subject", false)]
[InlineData(null, false)]
public async Task When_posting_a_message_via_the_producer(string subject, bool sendAsync)
{
//arrange
if (subject != null)
{
_message.Header.Bag.Add("Subject", subject);
}

if (sendAsync)
{
_messageProducer.SendAsync(_message).Wait();
await _messageProducer.SendAsync(_message);
}
else
{
Expand Down Expand Up @@ -95,6 +103,9 @@ public async Task When_posting_a_message_via_the_producer(bool sendAsync)
message.Header.DelayedMilliseconds.Should().Be(0);
//{"Id":"cd581ced-c066-4322-aeaf-d40944de8edd","Value":"Test","WasCancelled":false,"TaskCompleted":false}
message.Body.Value.Should().Be(_message.Body.Value);

message.Header.Bag.TryGetValue("Subject", out var actualSubject).Should().Be(subject != null);
actualSubject.Should().Be(subject);
}

public void Dispose()
Expand All @@ -104,7 +115,7 @@ public void Dispose()
_messageProducer?.Dispose();
}

private DateTime RoundToSeconds(DateTime dateTime)
private static DateTime RoundToSeconds(DateTime dateTime)
{
return new DateTime(dateTime.Ticks - (dateTime.Ticks % TimeSpan.TicksPerSecond), dateTime.Kind);
}
Expand Down

0 comments on commit bd4635e

Please sign in to comment.