diff --git a/Connectors/RabbitMQ/Connection.cs b/Connectors/RabbitMQ/Connection.cs index 454824d..7b15984 100644 --- a/Connectors/RabbitMQ/Connection.cs +++ b/Connectors/RabbitMQ/Connection.cs @@ -9,13 +9,12 @@ namespace MQContract.RabbitMQ /// /// This is the MessageServiceConnection implemenation for using RabbitMQ /// - public sealed class Connection : IInboxQueryableMessageServiceConnection, IDisposable + public sealed class Connection : IInboxQueryableMessageServiceConnection, IAsyncDisposable { private const string InboxExchange = "_Inbox"; - private readonly ConnectionFactory factory; private readonly IConnection conn; - private readonly IModel channel; + private readonly IChannel channel; private readonly SemaphoreSlim semaphore = new(1, 1); private readonly string inboxChannel; private bool disposedValue; @@ -26,12 +25,15 @@ public sealed class Connection : IInboxQueryableMessageServiceConnection, IDispo /// The connection factory to use that was built with required authentication and connection information public Connection(ConnectionFactory factory) { - this.factory = factory; - if (string.IsNullOrWhiteSpace(this.factory.ClientProvidedName)) - this.factory.ClientProvidedName = Guid.NewGuid().ToString(); - conn = this.factory.CreateConnection(); - channel = conn.CreateModel(); - MaxMessageBodySize = factory.MaxMessageSize; + if (string.IsNullOrWhiteSpace(factory.ClientProvidedName)) + factory.ClientProvidedName = Guid.NewGuid().ToString(); + var connectionTask = factory.CreateConnectionAsync(); + connectionTask.Wait(); + conn = connectionTask.Result; + var channelTask = conn.CreateChannelAsync(); + channelTask.Wait(); + channel = channelTask.Result; + MaxMessageBodySize = factory.MaxInboundMessageBodySize; inboxChannel = $"{InboxExchange}.{factory.ClientProvidedName}"; } @@ -44,10 +46,10 @@ public Connection(ConnectionFactory factory) /// Auto Delete queue when connection closed /// Additional arguements /// The connection to allow for chaining calls - public Connection QueueDeclare(string queue, bool durable = false, bool exclusive = false, - bool autoDelete = true, IDictionary? arguments = null) + public async Task QueueDeclareAsync(string queue, bool durable = false, bool exclusive = false, + bool autoDelete = true, IDictionary? arguments = null) { - channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); + await channel.QueueDeclareAsync(queue, durable, exclusive, autoDelete, arguments:arguments); return this; } @@ -60,10 +62,10 @@ public Connection QueueDeclare(string queue, bool durable = false, bool exclusiv /// Auto Delete when connection closed /// Additional arguements /// The connection to allow for chaining calls - public Connection ExchangeDeclare(string exchange, string type, bool durable = false, bool autoDelete = false, - IDictionary? arguments = null) + public async Task ExchangeDeclareAsync(string exchange, string type, bool durable = false, bool autoDelete = false, + IDictionary? arguments = null) { - channel.ExchangeDeclare(exchange,type,durable,autoDelete,arguments); + await channel.ExchangeDeclareAsync(exchange,type,durable,autoDelete,arguments); return this; } @@ -73,8 +75,8 @@ public Connection ExchangeDeclare(string exchange, string type, bool durable = f /// The name of the queue /// Is unused /// Is Empty - public void QueueDelete(string queue, bool ifUnused, bool ifEmpty) - => channel.QueueDelete(queue,ifUnused,ifEmpty); + public async Task QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty) + => await channel.QueueDeclareAsync(queue, ifUnused, ifEmpty); /// /// The maximum message body size allowed @@ -87,11 +89,13 @@ public void QueueDelete(string queue, bool ifUnused, bool ifEmpty) /// public TimeSpan DefaultTimeout { get; init; } = TimeSpan.FromMinutes(1); - internal static (IBasicProperties props, ReadOnlyMemory) ConvertMessage(ServiceMessage message, IModel channel, Guid? messageId = null) + internal static (BasicProperties props, ReadOnlyMemory) ConvertMessage(ServiceMessage message, Guid? messageId = null) { - var props = channel.CreateBasicProperties(); - props.MessageId=message.ID; - props.Type = message.MessageTypeID; + var props = new BasicProperties + { + MessageId=message.ID, + Type = message.MessageTypeID + }; using var ms = new MemoryStream(); using var bw = new BinaryWriter(ms); if (messageId!=null) @@ -103,7 +107,7 @@ internal static (IBasicProperties props, ReadOnlyMemory) ConvertMessage(Se bw.Write((byte)0); bw.Write(message.Data.Length); bw.Write(message.Data.ToArray()); - foreach(var key in message.Header.Keys) + foreach (var key in message.Header.Keys) { var bytes = UTF8Encoding.UTF8.GetBytes(key); bw.Write(bytes.Length); @@ -134,8 +138,8 @@ internal static ReceivedServiceMessage ConvertMessage(BasicDeliverEventArgs even header.Add(key, value); } return new( - eventArgs.BasicProperties.MessageId, - eventArgs.BasicProperties.Type, + eventArgs.BasicProperties.MessageId!, + eventArgs.BasicProperties.Type!, channel, new(header), data.ToArray(), @@ -149,8 +153,8 @@ async ValueTask IMessageServiceConnection.PublishAsync(Servi TransmissionResult result; try { - (var props, var data) = ConvertMessage(message, this.channel); - channel.BasicPublish(message.Channel,string.Empty,props,data); + (var props, var data) = ConvertMessage(message); + await channel.BasicPublishAsync(message.Channel,string.Empty,true,props,data,cancellationToken); result = new TransmissionResult(message.ID); }catch(Exception e) { @@ -160,37 +164,39 @@ async ValueTask IMessageServiceConnection.PublishAsync(Servi return result; } - private Subscription ProduceSubscription(IConnection conn, string channel, string? group, Action> messageReceived, Action errorReceived) + private async Task ProduceSubscriptionAsync(IConnection conn, string channel, string? group, Action> messageReceived, Action errorReceived) { if (group==null) { group = Guid.NewGuid().ToString(); - this.channel.QueueDeclare(queue:group, durable:false, exclusive:false, autoDelete:true); + await this.channel.QueueDeclareAsync(queue:group, durable:false, exclusive:false, autoDelete:true); }else { try { - this.channel.QueueDeclare(queue: group); + await this.channel.QueueDeclareAsync(queue: group); + } + catch (Exception) { + //this may throw an error is the queue already exists but checking for it fails } - catch (Exception) { } } - return new Subscription(conn, channel, group,messageReceived,errorReceived); + return await Subscription.ProduceInstanceAsync(conn, channel, group,messageReceived,errorReceived); } - ValueTask IMessageServiceConnection.SubscribeAsync(Action messageReceived, Action errorReceived, string channel, string? group, CancellationToken cancellationToken) - => ValueTask.FromResult(ProduceSubscription(conn, channel, group, + async ValueTask IMessageServiceConnection.SubscribeAsync(Action messageReceived, Action errorReceived, string channel, string? group, CancellationToken cancellationToken) + => await ProduceSubscriptionAsync(conn, channel, group, (@event,modelChannel, acknowledge) => { messageReceived(ConvertMessage(@event, channel, acknowledge,out _)); }, errorReceived - )); + ); - ValueTask IInboxQueryableMessageServiceConnection.EstablishInboxSubscriptionAsync(Action messageReceived, CancellationToken cancellationToken) + async ValueTask IInboxQueryableMessageServiceConnection.EstablishInboxSubscriptionAsync(Action messageReceived, CancellationToken cancellationToken) { - channel.ExchangeDeclare(InboxExchange, ExchangeType.Direct, durable: false, autoDelete: true); - channel.QueueDeclare(inboxChannel, durable: false, exclusive: false, autoDelete: true); - return ValueTask.FromResult(new Subscription( + await channel.ExchangeDeclareAsync(InboxExchange, ExchangeType.Direct, durable: false, autoDelete: true, cancellationToken:cancellationToken); + await channel.QueueDeclareAsync(inboxChannel, durable: false, exclusive: false, autoDelete: true, cancellationToken: cancellationToken); + return await Subscription.ProduceInstanceAsync( conn, InboxExchange, inboxChannel, @@ -210,18 +216,18 @@ ValueTask IInboxQueryableMessageServiceConnection.Establis }, (error) => { }, routingKey:inboxChannel - )); + ); } async ValueTask IInboxQueryableMessageServiceConnection.QueryAsync(ServiceMessage message, Guid correlationID, CancellationToken cancellationToken) { - (var props, var data) = ConvertMessage(message, this.channel, correlationID); + (var props, var data) = ConvertMessage(message, correlationID); props.ReplyTo = inboxChannel; await semaphore.WaitAsync(cancellationToken); TransmissionResult result; try { - channel.BasicPublish(message.Channel, string.Empty, props, data); + await channel.BasicPublishAsync(message.Channel, string.Empty, true, props, data,cancellationToken:cancellationToken); result = new TransmissionResult(message.ID); } catch (Exception e) @@ -232,16 +238,16 @@ async ValueTask IInboxQueryableMessageServiceConnection.Quer return result; } - ValueTask IQueryableMessageServiceConnection.SubscribeQueryAsync(Func> messageReceived, Action errorReceived, string channel, string? group, CancellationToken cancellationToken) - => ValueTask.FromResult(ProduceSubscription(conn, channel, group, + async ValueTask IQueryableMessageServiceConnection.SubscribeQueryAsync(Func> messageReceived, Action errorReceived, string channel, string? group, CancellationToken cancellationToken) + => await ProduceSubscriptionAsync(conn, channel, group, async (@event, model, acknowledge) => { var result = await messageReceived(ConvertMessage(@event, channel, acknowledge, out var messageID)); - (var props, var data) = ConvertMessage(result, model, messageID); + (var props, var data) = ConvertMessage(result, messageID); await semaphore.WaitAsync(cancellationToken); try { - this.channel.BasicPublish(InboxExchange, @event.BasicProperties.ReplyTo, props, data); + await this.channel.BasicPublishAsync(InboxExchange, @event.BasicProperties.ReplyTo!, true , props, data); } catch (Exception e) { @@ -250,37 +256,24 @@ async ValueTask IInboxQueryableMessageServiceConnection.Quer semaphore.Release(); }, errorReceived - )); + ); ValueTask IMessageServiceConnection.CloseAsync() - { - Dispose(true); - return ValueTask.CompletedTask; - } + => ((IAsyncDisposable)this).DisposeAsync(); - private void Dispose(bool disposing) + async ValueTask IAsyncDisposable.DisposeAsync() { if (!disposedValue) { - if (disposing) - { - semaphore.Wait(); - channel.Close(); - channel.Dispose(); - conn.Close(); - conn.Dispose(); - semaphore.Release(); - semaphore.Dispose(); - } disposedValue=true; + await semaphore.WaitAsync(); + await channel.CloseAsync(); + await channel.DisposeAsync(); + await conn.CloseAsync(); + await conn.DisposeAsync(); + semaphore.Release(); + semaphore.Dispose(); } } - - void IDisposable.Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } } } diff --git a/Connectors/RabbitMQ/RabbitMQ.csproj b/Connectors/RabbitMQ/RabbitMQ.csproj index 66bd87f..2ccbdb5 100644 --- a/Connectors/RabbitMQ/RabbitMQ.csproj +++ b/Connectors/RabbitMQ/RabbitMQ.csproj @@ -16,7 +16,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Connectors/RabbitMQ/Readme.md b/Connectors/RabbitMQ/Readme.md index 244380a..be62165 100644 --- a/Connectors/RabbitMQ/Readme.md +++ b/Connectors/RabbitMQ/Readme.md @@ -7,9 +7,9 @@ - [#ctor(factory)](#M-MQContract-RabbitMQ-Connection-#ctor-RabbitMQ-Client-ConnectionFactory- 'MQContract.RabbitMQ.Connection.#ctor(RabbitMQ.Client.ConnectionFactory)') - [DefaultTimeout](#P-MQContract-RabbitMQ-Connection-DefaultTimeout 'MQContract.RabbitMQ.Connection.DefaultTimeout') - [MaxMessageBodySize](#P-MQContract-RabbitMQ-Connection-MaxMessageBodySize 'MQContract.RabbitMQ.Connection.MaxMessageBodySize') - - [ExchangeDeclare(exchange,type,durable,autoDelete,arguments)](#M-MQContract-RabbitMQ-Connection-ExchangeDeclare-System-String,System-String,System-Boolean,System-Boolean,System-Collections-Generic-IDictionary{System-String,System-Object}- 'MQContract.RabbitMQ.Connection.ExchangeDeclare(System.String,System.String,System.Boolean,System.Boolean,System.Collections.Generic.IDictionary{System.String,System.Object})') - - [QueueDeclare(queue,durable,exclusive,autoDelete,arguments)](#M-MQContract-RabbitMQ-Connection-QueueDeclare-System-String,System-Boolean,System-Boolean,System-Boolean,System-Collections-Generic-IDictionary{System-String,System-Object}- 'MQContract.RabbitMQ.Connection.QueueDeclare(System.String,System.Boolean,System.Boolean,System.Boolean,System.Collections.Generic.IDictionary{System.String,System.Object})') - - [QueueDelete(queue,ifUnused,ifEmpty)](#M-MQContract-RabbitMQ-Connection-QueueDelete-System-String,System-Boolean,System-Boolean- 'MQContract.RabbitMQ.Connection.QueueDelete(System.String,System.Boolean,System.Boolean)') + - [ExchangeDeclareAsync(exchange,type,durable,autoDelete,arguments)](#M-MQContract-RabbitMQ-Connection-ExchangeDeclareAsync-System-String,System-String,System-Boolean,System-Boolean,System-Collections-Generic-IDictionary{System-String,System-Object}- 'MQContract.RabbitMQ.Connection.ExchangeDeclareAsync(System.String,System.String,System.Boolean,System.Boolean,System.Collections.Generic.IDictionary{System.String,System.Object})') + - [QueueDeclareAsync(queue,durable,exclusive,autoDelete,arguments)](#M-MQContract-RabbitMQ-Connection-QueueDeclareAsync-System-String,System-Boolean,System-Boolean,System-Boolean,System-Collections-Generic-IDictionary{System-String,System-Object}- 'MQContract.RabbitMQ.Connection.QueueDeclareAsync(System.String,System.Boolean,System.Boolean,System.Boolean,System.Collections.Generic.IDictionary{System.String,System.Object})') + - [QueueDeleteAsync(queue,ifUnused,ifEmpty)](#M-MQContract-RabbitMQ-Connection-QueueDeleteAsync-System-String,System-Boolean,System-Boolean- 'MQContract.RabbitMQ.Connection.QueueDeleteAsync(System.String,System.Boolean,System.Boolean)') ## Connection `type` @@ -50,8 +50,8 @@ DEFAULT: 1 minute The maximum message body size allowed - -### ExchangeDeclare(exchange,type,durable,autoDelete,arguments) `method` + +### ExchangeDeclareAsync(exchange,type,durable,autoDelete,arguments) `method` ##### Summary @@ -71,8 +71,8 @@ The connection to allow for chaining calls | autoDelete | [System.Boolean](http://msdn.microsoft.com/query/dev14.query?appId=Dev14IDEF1&l=EN-US&k=k:System.Boolean 'System.Boolean') | Auto Delete when connection closed | | arguments | [System.Collections.Generic.IDictionary{System.String,System.Object}](http://msdn.microsoft.com/query/dev14.query?appId=Dev14IDEF1&l=EN-US&k=k:System.Collections.Generic.IDictionary 'System.Collections.Generic.IDictionary{System.String,System.Object}') | Additional arguements | - -### QueueDeclare(queue,durable,exclusive,autoDelete,arguments) `method` + +### QueueDeclareAsync(queue,durable,exclusive,autoDelete,arguments) `method` ##### Summary @@ -92,8 +92,8 @@ The connection to allow for chaining calls | autoDelete | [System.Boolean](http://msdn.microsoft.com/query/dev14.query?appId=Dev14IDEF1&l=EN-US&k=k:System.Boolean 'System.Boolean') | Auto Delete queue when connection closed | | arguments | [System.Collections.Generic.IDictionary{System.String,System.Object}](http://msdn.microsoft.com/query/dev14.query?appId=Dev14IDEF1&l=EN-US&k=k:System.Collections.Generic.IDictionary 'System.Collections.Generic.IDictionary{System.String,System.Object}') | Additional arguements | - -### QueueDelete(queue,ifUnused,ifEmpty) `method` + +### QueueDeleteAsync(queue,ifUnused,ifEmpty) `method` ##### Summary diff --git a/Connectors/RabbitMQ/Subscription.cs b/Connectors/RabbitMQ/Subscription.cs index 409e592..beea9e8 100644 --- a/Connectors/RabbitMQ/Subscription.cs +++ b/Connectors/RabbitMQ/Subscription.cs @@ -6,37 +6,41 @@ namespace MQContract.RabbitMQ { internal class Subscription : IServiceSubscription { - private readonly IModel channel; - private readonly Guid subscriptionID = Guid.NewGuid(); + private readonly IChannel channel; private readonly string consumerTag; - public Subscription(IConnection conn,string channel,string group, Action> messageReceived, Action errorReceived,string? routingKey=null) + public static async ValueTask ProduceInstanceAsync(IConnection conn, string channel, string group, Action> messageReceived, Action errorReceived, string? routingKey = null) { - this.channel = conn.CreateModel(); - this.channel.QueueBind(group, channel, routingKey??subscriptionID.ToString()); - this.channel.BasicQos(0, 1, false); - var consumer = new EventingBasicConsumer(this.channel); - consumer.Received+=(sender, @event) => + var connectionChannel = await conn.CreateChannelAsync(); + await connectionChannel.QueueBindAsync(group, channel, routingKey??Guid.NewGuid().ToString()); + await connectionChannel.BasicQosAsync(0, 1, false); + var consumer = new AsyncEventingBasicConsumer(connectionChannel); + consumer.ReceivedAsync+= (sender, @event) => { messageReceived( @event, - this.channel, - () => + connectionChannel, + async () => { - this.channel.BasicAck(@event.DeliveryTag, false); - return ValueTask.CompletedTask; + await connectionChannel.BasicAckAsync(@event.DeliveryTag, false); } ); + return Task.CompletedTask; }; - consumerTag = this.channel.BasicConsume(group, false, consumer); + return new Subscription(connectionChannel, await connectionChannel.BasicConsumeAsync(group, false, consumer)); } - public ValueTask EndAsync() + private Subscription(IChannel channel, string consumerTag) { - channel.BasicCancel(consumerTag); - channel.Close(); - return ValueTask.CompletedTask; + this.channel=channel; + this.consumerTag=consumerTag; + } + + public async ValueTask EndAsync() + { + await channel.BasicCancelAsync(consumerTag); + await channel.CloseAsync(); } } } diff --git a/Samples/RabbitMQSample/Program.cs b/Samples/RabbitMQSample/Program.cs index 79671ea..0a82aeb 100644 --- a/Samples/RabbitMQSample/Program.cs +++ b/Samples/RabbitMQSample/Program.cs @@ -8,12 +8,12 @@ Port = 5672, UserName="guest", Password="guest", - MaxMessageSize=1024*1024*4 + MaxInboundMessageBodySize=1024*1024*4 }; -var serviceConnection = new Connection(factory) - .ExchangeDeclare("Greeting", ExchangeType.Fanout) - .ExchangeDeclare("StoredArrivals", ExchangeType.Fanout,true) - .ExchangeDeclare("Arrivals", ExchangeType.Fanout); +var serviceConnection = new Connection(factory); +await serviceConnection.ExchangeDeclareAsync("Greeting", ExchangeType.Fanout); +await serviceConnection.ExchangeDeclareAsync("StoredArrivals", ExchangeType.Fanout, true); +await serviceConnection.ExchangeDeclareAsync("Arrivals", ExchangeType.Fanout); await SampleExecution.ExecuteSample(serviceConnection, "RabbitMQ"); \ No newline at end of file diff --git a/Shared.props b/Shared.props index 0fa27bd..f8c838f 100644 --- a/Shared.props +++ b/Shared.props @@ -1,6 +1,6 @@ - 2.0 + 2.0.1 https://github.com/roger-castaldo/MQContract Readme.md https://github.com/roger-castaldo/MQContract