Opinionated and Simplified Minimal APIs for Consuming Messages from RabbitMQ, Ensuring No Crucial Configurations Are Hidden.
Oragon.RabbitMQ is a Minimal API implementation to consume RabbitMQ queues. It provides everything you need to create resilient RabbitMQ consumers without the need to study numerous books and articles or introduce unknown risks to your environment.
dotnet add package Oragon.RabbitMQ
dotnet add package Oragon.RabbitMQ.Serializer.SystemTextJson
dotnet add package Oragon.RabbitMQ.Serializer.NewtonsoftJson
var builder = WebApplication.CreateBuilder(args); //or Host.CreateApplicationBuilder(args);
// ...existing code...
builder.AddRabbitMQConsumer();
/*Pick only one*/
// For JSON.NET
builder.Services.AddSingleton<IAMQPSerializer>(sp => new SystemTextJsonAMQPSerializer(new JsonSerializerOptions(JsonSerializerDefaults.General){ ... }));
// For Newtonsoft Json .Net
builder.Services.AddSingleton<IAMQPSerializer>(sp => new NewtonsoftAMQPSerializer(new JsonSerializerSettings(){ ... }));
// ...existing code...
The consumer will use dependency injection to get a valid instance of RabbitMQ.Client.IConnection. If you do not provide one, you can create a connection configuration as shown below.
// ...existing code...
builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory()
{
Uri = new Uri("amqp://rabbitmq:5672"),
DispatchConsumersAsync = true
});
builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());
// ...existing code...
If you are using .NET Aspire, replace Aspire.RabbitMQ.Client
with the Oragon.RabbitMQ.AspireClient
package.
Today, Oragon.RabbitMQ.AspireClient
supports RabbitMQ.Client 7.x, while Aspire.RabbitMQ.Client
supports 6.x. When Aspire.RabbitMQ.Client
supports RabbitMQ.Client 7.x, the Oragon.RabbitMQ.AspireClient
package will be marked as deprecated.
// ...existing code...
builder.AddRabbitMQClient("rabbitmq");
// ...existing code...
To map your queue using this package, follow these steps:
-
Build the application: First, build your application using the builder pattern. This initializes the application and prepares it for further configuration.
var app = builder.Build();
-
Map the queue: Next, map your queue to a specific service and command/event. This step involves configuring how the service will handle incoming messages from the queue.
app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => svc.DoSomethingAsync(msg));
app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => await svc.DoSomethingAsync(msg).ConfigureAwait(false));
app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => { IAMQPResult returnValue; if (svc.CanXpto(msg)) { await svc.DoXptoAsync(msg); returnValue = new AckResult(); } else { returnValue = new RejectResult(requeue: true); } return returnValue; });
app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => { IAMQPResult returnValue; try { await svc.DoXptoAsync(msg); returnValue = new AckResult(); } catch(Exception ex) { // Log this exception returnValue = new NackResult(requeue: true); } return returnValue; });
-
Run the application: Finally, run the application to start processing messages from the queue.
app.Run();
var builder = WebApplication.CreateBuilder(args);
builder.AddRabbitMQConsumer();
builder.Services.AddSingleton<BusinessService>();
builder.Services.AddSingleton<IAMQPSerializer>(sp => new SystemTextJsonAMQPSerializer(new System.Text.Json.JsonSerializerOptions(System.Text.Json.JsonSerializerDefaults.General){ ... }));
builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory(){ Uri = new Uri("amqp://rabbitmq:5672"), DispatchConsumersAsync = true });
builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());
var app = builder.Build();
app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) =>
svc.DoSomethingAsync(msg));
app.Run();
This approach is designed to decouple RabbitMQ consumers from business logic, ensuring that business code remains unaware of the queue consumption context. The result is incredibly simple, decoupled, agnostic, more reusable, and highly testable code.
This consumer is focused on creating a resilient consumer using manual acknowledgments.
- The automatic flow produces a
BasicReject
without requeue when serialization failures (e.g., incorrectly formatted messages), you must use dead-lettering to ensure that your message will not be lost. - The automatic flow produces a
BasicNack
without requeue for processing failures. You must use dead-lettering to ensure that your message will not be lost. - The automatic flow produces a
BasicAck
for success. If you need more control, return an instance ofIAMQPResult
to control this behavior. - Minimal API design style made with minimum and cached reflection
- Extensible with support for custom serializers and encoders
Autoflow uses Ack, Nack, and Reject automatically, but you can control the flow.
Inside the Oragon.RabbitMQ.Consumer.Actions
namespace, you can find some results:
- AckResult (
new AckResult();
) - ComposableResult (
new ComposableResult(params IAMQPResult[] results);
) - NackResult (
new NackResult(bool requeue);
) - RejectResult (
new RejectResult(bool requeue);
) - ReplyResult (
new ReplyResult(object objectToReply);
)⚠️ EXPERIMENTAL⚠️
Example:
app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
IAMQPResult returnValue;
if (svc.CanXpto(msg))
{
svc.DoXpto(msg);
returnValue = new AckResult();
}
else
{
returnValue = new RejectResult(requeue: true);
}
return returnValue;
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);
app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
if (await svc.CanXpto(msg))
{
await svc.DoXpto(msg);
return new AckResult();
}
else
{
return new RejectResult(requeue: true);
}
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);
For these types, the model binder will set the correct current instance without needing a special attribute.
- RabbitMQ.Client.IConnection
- RabbitMQ.Client.IChannel
- RabbitMQ.Client.Events.BasicDeliverEventArgs
- RabbitMQ.Client.DeliveryModes
- RabbitMQ.Client.IReadOnlyBasicProperties
- System.IServiceProvider (scoped)
Some string parameters are considered special, and the model binder will use a name to set the correct current string from the consumer.
The model binder will set the name of the queue that the consumer is consuming.
- queue
- queueName
The model binder will set a routing key from the AMQP message.
- routing
- routingKey
The model binder will set an exchange name from the AMQP message.
- exchange
- exchangeName
The model binder will set a consumer tag from the actual consumer.
- consumer
- consumerTag
For version 1.0.0, I've removed all implementations of automatic telemetry and OpenTelemetry. It will be available as soon as possible.
- Migrate Demo to Library Project
- Core: Queue Consumer
- Core: Rpc Queue Consumer
- Core: Support Keyed Services
- Core: Support of new design of RabbitMQ.Client
- Create Samples
- Review All SuppressMessageAttribute
- Create Docs
- Benchmarks
- Automate Badges
- Add SonarCloud
- Code Coverage > 80%
- Add CI/CD
- Add Unit Tests
- Add Integrated Tests with TestContainers
- Test CI/CD Flow: MyGet Alpha Packages with Symbols
- Test CI/CD Flow: MyGet Packages without Symbols
- Test CI/CD Flow: Nuget Packages without Symbols
- Change original behavior based on lambda expressions to dynamic delegate.