Skip to content

Commit

Permalink
updated rabbitmq
Browse files Browse the repository at this point in the history
Updated rabbitmq to version 7 connector that contained breaking changes from 6 to 7
  • Loading branch information
roger-castaldo committed Dec 18, 2024
1 parent 68c8af2 commit 02613a4
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 101 deletions.
129 changes: 61 additions & 68 deletions Connectors/RabbitMQ/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ namespace MQContract.RabbitMQ
/// <summary>
/// This is the MessageServiceConnection implemenation for using RabbitMQ
/// </summary>
public sealed class Connection : IInboxQueryableMessageServiceConnection, IDisposable
public sealed class Connection : IInboxQueryableMessageServiceConnection, IAsyncDisposable
{
private const string InboxExchange = "_Inbox";

private readonly ConnectionFactory factory;
private readonly IConnection conn;
private readonly IModel channel;
private readonly IChannel channel;
private readonly SemaphoreSlim semaphore = new(1, 1);
private readonly string inboxChannel;
private bool disposedValue;
Expand All @@ -26,12 +25,15 @@ public sealed class Connection : IInboxQueryableMessageServiceConnection, IDispo
/// <param name="factory">The connection factory to use that was built with required authentication and connection information</param>
public Connection(ConnectionFactory factory)
{
this.factory = factory;
if (string.IsNullOrWhiteSpace(this.factory.ClientProvidedName))
this.factory.ClientProvidedName = Guid.NewGuid().ToString();
conn = this.factory.CreateConnection();
channel = conn.CreateModel();
MaxMessageBodySize = factory.MaxMessageSize;
if (string.IsNullOrWhiteSpace(factory.ClientProvidedName))
factory.ClientProvidedName = Guid.NewGuid().ToString();
var connectionTask = factory.CreateConnectionAsync();
connectionTask.Wait();
conn = connectionTask.Result;
var channelTask = conn.CreateChannelAsync();
channelTask.Wait();
channel = channelTask.Result;
MaxMessageBodySize = factory.MaxInboundMessageBodySize;
inboxChannel = $"{InboxExchange}.{factory.ClientProvidedName}";
}

Expand All @@ -44,10 +46,10 @@ public Connection(ConnectionFactory factory)
/// <param name="autoDelete">Auto Delete queue when connection closed</param>
/// <param name="arguments">Additional arguements</param>
/// <returns>The connection to allow for chaining calls</returns>
public Connection QueueDeclare(string queue, bool durable = false, bool exclusive = false,
bool autoDelete = true, IDictionary<string, object>? arguments = null)
public async Task<Connection> QueueDeclareAsync(string queue, bool durable = false, bool exclusive = false,
bool autoDelete = true, IDictionary<string, object?>? arguments = null)
{
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
await channel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments:arguments);
return this;
}

Expand All @@ -60,10 +62,10 @@ public Connection QueueDeclare(string queue, bool durable = false, bool exclusiv
/// <param name="autoDelete">Auto Delete when connection closed</param>
/// <param name="arguments">Additional arguements</param>
/// <returns>The connection to allow for chaining calls</returns>
public Connection ExchangeDeclare(string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object>? arguments = null)
public async Task<Connection> ExchangeDeclareAsync(string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object?>? arguments = null)
{
channel.ExchangeDeclare(exchange,type,durable,autoDelete,arguments);
await channel.ExchangeDeclareAsync(exchange,type,durable,autoDelete,arguments);
return this;
}

Expand All @@ -73,8 +75,8 @@ public Connection ExchangeDeclare(string exchange, string type, bool durable = f
/// <param name="queue">The name of the queue</param>
/// <param name="ifUnused">Is unused</param>
/// <param name="ifEmpty">Is Empty</param>
public void QueueDelete(string queue, bool ifUnused, bool ifEmpty)
=> channel.QueueDelete(queue,ifUnused,ifEmpty);
public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty)
=> await channel.QueueDeclareAsync(queue, ifUnused, ifEmpty);

/// <summary>
/// The maximum message body size allowed
Expand All @@ -87,11 +89,13 @@ public void QueueDelete(string queue, bool ifUnused, bool ifEmpty)
/// </summary>
public TimeSpan DefaultTimeout { get; init; } = TimeSpan.FromMinutes(1);

internal static (IBasicProperties props, ReadOnlyMemory<byte>) ConvertMessage(ServiceMessage message, IModel channel, Guid? messageId = null)
internal static (BasicProperties props, ReadOnlyMemory<byte>) ConvertMessage(ServiceMessage message, Guid? messageId = null)
{
var props = channel.CreateBasicProperties();
props.MessageId=message.ID;
props.Type = message.MessageTypeID;
var props = new BasicProperties
{
MessageId=message.ID,
Type = message.MessageTypeID
};
using var ms = new MemoryStream();
using var bw = new BinaryWriter(ms);
if (messageId!=null)
Expand All @@ -103,7 +107,7 @@ internal static (IBasicProperties props, ReadOnlyMemory<byte>) ConvertMessage(Se
bw.Write((byte)0);
bw.Write(message.Data.Length);
bw.Write(message.Data.ToArray());
foreach(var key in message.Header.Keys)
foreach (var key in message.Header.Keys)
{
var bytes = UTF8Encoding.UTF8.GetBytes(key);
bw.Write(bytes.Length);
Expand Down Expand Up @@ -134,8 +138,8 @@ internal static ReceivedServiceMessage ConvertMessage(BasicDeliverEventArgs even
header.Add(key, value);
}
return new(
eventArgs.BasicProperties.MessageId,
eventArgs.BasicProperties.Type,
eventArgs.BasicProperties.MessageId!,
eventArgs.BasicProperties.Type!,
channel,
new(header),
data.ToArray(),
Expand All @@ -149,8 +153,8 @@ async ValueTask<TransmissionResult> IMessageServiceConnection.PublishAsync(Servi
TransmissionResult result;
try
{
(var props, var data) = ConvertMessage(message, this.channel);
channel.BasicPublish(message.Channel,string.Empty,props,data);
(var props, var data) = ConvertMessage(message);
await channel.BasicPublishAsync<BasicProperties>(message.Channel,string.Empty,true,props,data,cancellationToken);
result = new TransmissionResult(message.ID);
}catch(Exception e)
{
Expand All @@ -160,37 +164,39 @@ async ValueTask<TransmissionResult> IMessageServiceConnection.PublishAsync(Servi
return result;
}

private Subscription ProduceSubscription(IConnection conn, string channel, string? group, Action<BasicDeliverEventArgs,IModel, Func<ValueTask>> messageReceived, Action<Exception> errorReceived)
private async Task<Subscription> ProduceSubscriptionAsync(IConnection conn, string channel, string? group, Action<BasicDeliverEventArgs,IChannel, Func<ValueTask>> messageReceived, Action<Exception> errorReceived)
{
if (group==null)
{
group = Guid.NewGuid().ToString();
this.channel.QueueDeclare(queue:group, durable:false, exclusive:false, autoDelete:true);
await this.channel.QueueDeclareAsync(queue:group, durable:false, exclusive:false, autoDelete:true);
}else
{
try
{
this.channel.QueueDeclare(queue: group);
await this.channel.QueueDeclareAsync(queue: group);
}
catch (Exception) {
//this may throw an error is the queue already exists but checking for it fails
}
catch (Exception) { }
}
return new Subscription(conn, channel, group,messageReceived,errorReceived);
return await Subscription.ProduceInstanceAsync(conn, channel, group,messageReceived,errorReceived);
}

ValueTask<IServiceSubscription?> IMessageServiceConnection.SubscribeAsync(Action<ReceivedServiceMessage> messageReceived, Action<Exception> errorReceived, string channel, string? group, CancellationToken cancellationToken)
=> ValueTask.FromResult<IServiceSubscription?>(ProduceSubscription(conn, channel, group,
async ValueTask<IServiceSubscription?> IMessageServiceConnection.SubscribeAsync(Action<ReceivedServiceMessage> messageReceived, Action<Exception> errorReceived, string channel, string? group, CancellationToken cancellationToken)
=> await ProduceSubscriptionAsync(conn, channel, group,
(@event,modelChannel, acknowledge) =>
{
messageReceived(ConvertMessage(@event, channel, acknowledge,out _));
},
errorReceived
));
);

ValueTask<IServiceSubscription> IInboxQueryableMessageServiceConnection.EstablishInboxSubscriptionAsync(Action<ReceivedInboxServiceMessage> messageReceived, CancellationToken cancellationToken)
async ValueTask<IServiceSubscription> IInboxQueryableMessageServiceConnection.EstablishInboxSubscriptionAsync(Action<ReceivedInboxServiceMessage> messageReceived, CancellationToken cancellationToken)
{
channel.ExchangeDeclare(InboxExchange, ExchangeType.Direct, durable: false, autoDelete: true);
channel.QueueDeclare(inboxChannel, durable: false, exclusive: false, autoDelete: true);
return ValueTask.FromResult<IServiceSubscription>(new Subscription(
await channel.ExchangeDeclareAsync(InboxExchange, ExchangeType.Direct, durable: false, autoDelete: true, cancellationToken:cancellationToken);
await channel.QueueDeclareAsync(inboxChannel, durable: false, exclusive: false, autoDelete: true, cancellationToken: cancellationToken);
return await Subscription.ProduceInstanceAsync(
conn,
InboxExchange,
inboxChannel,
Expand All @@ -210,18 +216,18 @@ ValueTask<IServiceSubscription> IInboxQueryableMessageServiceConnection.Establis
},
(error) => { },
routingKey:inboxChannel
));
);
}

async ValueTask<TransmissionResult> IInboxQueryableMessageServiceConnection.QueryAsync(ServiceMessage message, Guid correlationID, CancellationToken cancellationToken)
{
(var props, var data) = ConvertMessage(message, this.channel, correlationID);
(var props, var data) = ConvertMessage(message, correlationID);
props.ReplyTo = inboxChannel;
await semaphore.WaitAsync(cancellationToken);
TransmissionResult result;
try
{
channel.BasicPublish(message.Channel, string.Empty, props, data);
await channel.BasicPublishAsync<BasicProperties>(message.Channel, string.Empty, true, props, data,cancellationToken:cancellationToken);
result = new TransmissionResult(message.ID);
}
catch (Exception e)
Expand All @@ -232,16 +238,16 @@ async ValueTask<TransmissionResult> IInboxQueryableMessageServiceConnection.Quer
return result;
}

ValueTask<IServiceSubscription?> IQueryableMessageServiceConnection.SubscribeQueryAsync(Func<ReceivedServiceMessage, ValueTask<ServiceMessage>> messageReceived, Action<Exception> errorReceived, string channel, string? group, CancellationToken cancellationToken)
=> ValueTask.FromResult<IServiceSubscription?>(ProduceSubscription(conn, channel, group,
async ValueTask<IServiceSubscription?> IQueryableMessageServiceConnection.SubscribeQueryAsync(Func<ReceivedServiceMessage, ValueTask<ServiceMessage>> messageReceived, Action<Exception> errorReceived, string channel, string? group, CancellationToken cancellationToken)
=> await ProduceSubscriptionAsync(conn, channel, group,
async (@event, model, acknowledge) =>
{
var result = await messageReceived(ConvertMessage(@event, channel, acknowledge, out var messageID));
(var props, var data) = ConvertMessage(result, model, messageID);
(var props, var data) = ConvertMessage(result, messageID);
await semaphore.WaitAsync(cancellationToken);
try
{
this.channel.BasicPublish(InboxExchange, @event.BasicProperties.ReplyTo, props, data);
await this.channel.BasicPublishAsync<BasicProperties>(InboxExchange, @event.BasicProperties.ReplyTo!, true , props, data);
}
catch (Exception e)
{
Expand All @@ -250,37 +256,24 @@ async ValueTask<TransmissionResult> IInboxQueryableMessageServiceConnection.Quer
semaphore.Release();
},
errorReceived
));
);

ValueTask IMessageServiceConnection.CloseAsync()
{
Dispose(true);
return ValueTask.CompletedTask;
}
=> ((IAsyncDisposable)this).DisposeAsync();

private void Dispose(bool disposing)
async ValueTask IAsyncDisposable.DisposeAsync()
{
if (!disposedValue)
{
if (disposing)
{
semaphore.Wait();
channel.Close();
channel.Dispose();
conn.Close();
conn.Dispose();
semaphore.Release();
semaphore.Dispose();
}
disposedValue=true;
await semaphore.WaitAsync();
await channel.CloseAsync();
await channel.DisposeAsync();
await conn.CloseAsync();
await conn.DisposeAsync();
semaphore.Release();
semaphore.Dispose();
}
}

void IDisposable.Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
}
2 changes: 1 addition & 1 deletion Connectors/RabbitMQ/RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>


Expand Down
Loading

0 comments on commit 02613a4

Please sign in to comment.