Skip to content

Commit

Permalink
Fixes #6675: Update IEventHubDataAdapter to support StreamId to parti…
Browse files Browse the repository at this point in the history
…tion mapping (#6676) (#6731)

* Fixes #6675: Include stream Namespace in PartitionKey for Event Hub messages

* Update IEventHubDataAdapter to support partition to StreamId mapping

Co-authored-by: Alex Meyer-Gleaves <alex.meyergleaves@gmail.com>
  • Loading branch information
benjaminpetit and alexmg authored Sep 4, 2020
1 parent 47a1fa6 commit baa1dc8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ public virtual Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamName
Dictionary<string, object> requestContext)
{
EventData eventData = this.dataAdapter.ToQueueMessage(streamGuid, streamNamespace, events, token, requestContext);
return this.client.SendAsync(eventData, streamGuid.ToString());
string partitionKey = this.dataAdapter.GetPartitionKey(streamGuid, streamNamespace);
return this.client.SendAsync(eventData, partitionKey);
}

/// <summary>
Expand Down Expand Up @@ -321,4 +322,4 @@ public static EventHubAdapterFactory Create(IServiceProvider services, string na
return factory;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,10 @@ public virtual CachedMessage FromQueueMessage(StreamPosition streamPosition, Eve

public virtual StreamPosition GetStreamPosition(string partition, EventData queueMessage)
{
Guid streamGuid =
Guid.Parse(queueMessage.SystemProperties.PartitionKey);
string streamNamespace = queueMessage.GetStreamNamespaceProperty();
IStreamIdentity stremIdentity = new StreamIdentity(streamGuid, streamNamespace);
IStreamIdentity streamIdentity = this.GetStreamIdentity(queueMessage);
StreamSequenceToken token =
new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0);
return new StreamPosition(stremIdentity, token);
return new StreamPosition(streamIdentity, token);
}

/// <summary>
Expand All @@ -95,6 +92,26 @@ public virtual string GetOffset(CachedMessage lastItemPurged)
return SegmentBuilder.ReadNextString(lastItemPurged.Segment, ref readOffset); // read offset
}

/// <summary>
/// Get the Event Hub partition key to use for a stream.
/// </summary>
/// <param name="streamGuid">The stream Guid.</param>
/// <param name="streamNamespace">The stream Namespace.</param>
/// <returns>The partition key to use for the stream.</returns>
public virtual string GetPartitionKey(Guid streamGuid, string streamNamespace) => streamGuid.ToString();

/// <summary>
/// Get the <see cref="IStreamIdentity"/> for an event message.
/// </summary>
/// <param name="queueMessage">The event message.</param>
/// <returns>The stream identity.</returns>
public virtual IStreamIdentity GetStreamIdentity(EventData queueMessage)
{
Guid streamGuid = Guid.Parse(queueMessage.SystemProperties.PartitionKey);
string streamNamespace = queueMessage.GetStreamNamespaceProperty();
return new StreamIdentity(streamGuid, streamNamespace);
}

// Placed object message payload into a segment.
protected virtual ArraySegment<byte> EncodeMessageIntoSegment(EventData queueMessage, Func<int, ArraySegment<byte>> getSegment)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ public interface IEventHubDataAdapter : IQueueDataAdapter<EventData>, ICacheData
StreamPosition GetStreamPosition(string partition, EventData queueMessage);

string GetOffset(CachedMessage cachedMessage);

string GetPartitionKey(Guid streamGuid, string streamNamespace);

IStreamIdentity GetStreamIdentity(EventData queueMessage);
}
}

0 comments on commit baa1dc8

Please sign in to comment.