Skip to content

Commit

Permalink
Implement a sparse outstanding index for the dynamo db outbox [v9] (#…
Browse files Browse the repository at this point in the history
…3295)

* Implement a sparse outstanding index for the dynamo db outbox

* Replace existing dynamodb outbox with new implementation

* Add tests covering removal of dispatched messages from the outstanding index

* Remove unused query key expression

* Remove launch settings
  • Loading branch information
dhickie authored Sep 4, 2024
1 parent b65112a commit 70137f7
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using Amazon;
using Amazon.Runtime;

Expand Down
19 changes: 9 additions & 10 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ await _context.SaveAsync(
message,
_dynamoOverwriteTableConfig,
cancellationToken);
}
}

public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatchedAt = null, Dictionary<string, object> args = null,
CancellationToken cancellationToken = default)
Expand All @@ -296,13 +296,16 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<st
message,
_dynamoOverwriteTableConfig)
.Wait(_configuration.Timeout);

}

private static void MarkMessageDispatched(DateTime? dispatchedAt, MessageItem message)
{
message.DeliveryTime = dispatchedAt.Value.Ticks;
message.DeliveredAt = $"{dispatchedAt:yyyy-MM-dd}";

// Set the outstanding created time to null to remove the attribute
// from the item in dynamo
message.OutstandingCreatedTime = null;
}

/// <summary>
Expand Down Expand Up @@ -386,7 +389,7 @@ private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messag

private async Task<Message> GetMessage(Guid id, CancellationToken cancellationToken = default)
{
MessageItem messageItem = await _context.LoadAsync<MessageItem>(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken);
var messageItem = await _context.LoadAsync<MessageItem>(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken);
return messageItem?.ConvertToMessage() ?? new Message();
}

Expand Down Expand Up @@ -460,9 +463,9 @@ private async Task<IEnumerable<Message>> OutstandingMessagesForAllTopicsAsync(do
private async Task<IEnumerable<Message>> OutstandingMessagesForTopicAsync(double millisecondsDispatchedSince,
string topicName, CancellationToken cancellationToken)
{
var olrderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var olderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

var messages = (await QueryAllOutstandingShardsAsync(topicName, olrderThan, cancellationToken)).ToList();
var messages = (await QueryAllOutstandingShardsAsync(topicName, olderThan, cancellationToken)).ToList();
return messages.Select(msg => msg.ConvertToMessage());
}

Expand All @@ -485,14 +488,10 @@ private async Task<IEnumerable<MessageItem>> QueryAllOutstandingShardsAsync(stri

for (int shard = 0; shard < _configuration.NumberOfShards; shard++)
{
// We get all the messages for topic, added within a time range
// There should be few enough of those that we can efficiently filter for those
// that don't have a delivery date.
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.OutstandingIndexName,
KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topic, minimumAge, shard),
FilterExpression = new NoDispatchTimeExpression().Generate(),
KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topic, minimumAge, shard),
ConsistentRead = false
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using System;
using System;
using System.Collections.Generic;
using Amazon.DynamoDBv2.DocumentModel;

namespace Paramore.Brighter.Outbox.DynamoDB
{
internal class KeyTopicCreatedTimeExpression
internal class KeyTopicOutstandingCreatedTimeExpression
{
private readonly Expression _expression;

public KeyTopicCreatedTimeExpression()
public KeyTopicOutstandingCreatedTimeExpression()
{
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" };
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" };
}

public override string ToString()
Expand All @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTime createdTime, int shard)
{
var values = new Dictionary<string, DynamoDBEntry>();
values.Add(":v_TopicShard", $"{topicName}_{shard}");
values.Add(":v_CreatedTime", createdTime.Ticks);
values.Add(":v_OutstandingCreatedTime", createdTime.Ticks);

_expression.ExpressionAttributeValues = values;

Expand Down
26 changes: 14 additions & 12 deletions src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ public class MessageItem
/// <summary>
/// The time at which the message was created, in ticks
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long CreatedTime { get; set; }

/// <summary>
/// The time at which the message was created, in ticks. Null if the message has been dispatched.
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long? OutstandingCreatedTime { get; set; }

/// <summary>
/// The time at which the message was delivered, formatted as a string yyyy-MM-dd
/// </summary>
Expand Down Expand Up @@ -122,6 +128,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null)
CharacterEncoding = message.Body.CharacterEncoding.ToString();
CreatedAt = $"{date}";
CreatedTime = date.Ticks;
OutstandingCreatedTime = date.Ticks;
DeliveryTime = 0;
HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
MessageId = message.Id.ToString();
Expand Down Expand Up @@ -163,42 +170,37 @@ public Message ConvertToMessage()

return new Message(header, body);
}

public void MarkMessageDelivered(DateTime deliveredAt)
{
DeliveryTime = deliveredAt.Ticks;
DeliveredAt = $"{deliveredAt:yyyy-MM-dd}";
}
}

public class MessageItemBodyConverter : IPropertyConverter
{
public DynamoDBEntry ToEntry(object value)
{
byte[] body = value as byte[];
if (body == null) throw new ArgumentOutOfRangeException("Expected the body to be a byte array");
if (body == null)
throw new ArgumentOutOfRangeException("Expected the body to be a byte array");

DynamoDBEntry entry = new Primitive
{
Value = body,
Type = DynamoDBEntryType.Binary

};

return entry;
}

public object FromEntry(DynamoDBEntry entry)
{
byte[] data = Array.Empty<byte>();
Primitive primitive = entry as Primitive;
Primitive primitive = entry as Primitive;
if (primitive?.Value is byte[] bytes)
data = bytes;
if (primitive?.Value is string text) //for historical data that used UTF-8 strings
data = Encoding.UTF8.GetBytes(text);
if (primitive == null || !(primitive.Value is string || primitive.Value is byte[]))
throw new ArgumentOutOfRangeException("Expected Dynamo to have stored a byte array");

return data;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,54 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics()
}
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched_async()
{
await _dynamoDbOutbox.AddAsync(_message);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched()
{
_dynamoDbOutbox.Add(_message);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

_dynamoDbOutbox.MarkDispatched(_message.Id);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

private Message CreateMessage(string topic)
{
return new Message(
Expand Down

0 comments on commit 70137f7

Please sign in to comment.