Skip to content

Consume RabbitMQ messages like ASP.NET Minimal API's, opinionated and resilient first consumers, ensuring that no crucial configurations are hidden

Notifications You must be signed in to change notification settings

luizcarlosfaria/Oragon.RabbitMQ

Repository files navigation

Card

Oragon.RabbitMQ

Quality Gate Status Bugs Code Smells Coverage Duplicated Lines (%) Reliability Rating Security Rating Technical Debt Maintainability Rating Vulnerabilities GitHub last commit NuGet Downloads GitHub Repo stars

Roadmap

Official Release

NuGet Version

Others

GitHub Tag GitHub Release MyGet Version

Tech / Skill

C# .Net Visual Studio Jenkins Telegram

Opinionated and Simplified Minimal APIs for Consuming Messages from RabbitMQ, Ensuring No Crucial Configurations Are Hidden.

What is Oragon.RabbitMQ?

Oragon.RabbitMQ is a Minimal API implementation to consume RabbitMQ queues. It provides everything you need to create resilient RabbitMQ consumers without the need to study numerous books and articles or introduce unknown risks to your environment.

Get Started

Add Consumer Package

dotnet add package Oragon.RabbitMQ

Choose Serialization

System.Text.Json

dotnet add package Oragon.RabbitMQ.Serializer.SystemTextJson

Newtonsoft Json .Net

dotnet add package Oragon.RabbitMQ.Serializer.NewtonsoftJson

Configuring Dependency Injection

Basic Setup

var builder = WebApplication.CreateBuilder(args); //or Host.CreateApplicationBuilder(args);

// ...existing code...

builder.AddRabbitMQConsumer();

/*Pick only one*/

// For JSON.NET
builder.Services.AddSingleton<IAMQPSerializer>(sp => new SystemTextJsonAMQPSerializer(new JsonSerializerOptions(JsonSerializerDefaults.General){ ... }));

// For Newtonsoft Json .Net
builder.Services.AddSingleton<IAMQPSerializer>(sp => new NewtonsoftAMQPSerializer(new JsonSerializerSettings(){ ... }));

// ...existing code...

Configuring IConnectionFactory and IConnection

The consumer will use dependency injection to get a valid instance of RabbitMQ.Client.IConnection. If you do not provide one, you can create a connection configuration as shown below.

// ...existing code...
builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory()
{
    Uri = new Uri("amqp://rabbitmq:5672"),
    DispatchConsumersAsync = true
});

builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());
// ...existing code...

.NET ASPIRE

If you are using .NET Aspire, replace Aspire.RabbitMQ.Client with the Oragon.RabbitMQ.AspireClient package.

Today, Oragon.RabbitMQ.AspireClient supports RabbitMQ.Client 7.x, while Aspire.RabbitMQ.Client supports 6.x. When Aspire.RabbitMQ.Client supports RabbitMQ.Client 7.x, the Oragon.RabbitMQ.AspireClient package will be marked as deprecated.

// ...existing code...
builder.AddRabbitMQClient("rabbitmq");
// ...existing code...

🎯 Map your Queue 🎯

To map your queue using this package, follow these steps:

  1. Build the application: First, build your application using the builder pattern. This initializes the application and prepares it for further configuration.

    var app = builder.Build();
  2. Map the queue: Next, map your queue to a specific service and command/event. This step involves configuring how the service will handle incoming messages from the queue.

    Example 1

    app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => 
        svc.DoSomethingAsync(msg));

    Example 2

    app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => 
        await svc.DoSomethingAsync(msg).ConfigureAwait(false));

    Example 3

    app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
        
        IAMQPResult returnValue;
    
        if (svc.CanXpto(msg))
        {
            await svc.DoXptoAsync(msg);
            returnValue = new AckResult();
        } 
        else 
        {
            returnValue = new RejectResult(requeue: true);
        }
        return returnValue;
    });

    Example 4

    app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
        
        IAMQPResult returnValue;
    
        try
        {
            await svc.DoXptoAsync(msg);
            returnValue = new AckResult();
        }
        catch(Exception ex)
        {
            // Log this exception
            returnValue = new NackResult(requeue: true);
        }
        
        return returnValue;
    });
  3. Run the application: Finally, run the application to start processing messages from the queue.

    app.Run();

Full Example

var builder = WebApplication.CreateBuilder(args);

builder.AddRabbitMQConsumer();

builder.Services.AddSingleton<BusinessService>();

builder.Services.AddSingleton<IAMQPSerializer>(sp => new SystemTextJsonAMQPSerializer(new System.Text.Json.JsonSerializerOptions(System.Text.Json.JsonSerializerDefaults.General){ ... }));

builder.Services.AddSingleton<IConnectionFactory>(sp => new ConnectionFactory(){ Uri = new Uri("amqp://rabbitmq:5672"), DispatchConsumersAsync = true });

builder.Services.AddSingleton(sp => sp.GetRequiredService<IConnectionFactory>().CreateConnectionAsync().GetAwaiter().GetResult());

var app = builder.Build();

app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => 
    svc.DoSomethingAsync(msg));

app.Run();

Concepts

Decoupling Business Logic from Infrastructure

This approach is designed to decouple RabbitMQ consumers from business logic, ensuring that business code remains unaware of the queue consumption context. The result is incredibly simple, decoupled, agnostic, more reusable, and highly testable code.

Opinionated Design: Why?

This consumer is focused on creating a resilient consumer using manual acknowledgments.

  • The automatic flow produces a BasicReject without requeue when serialization failures (e.g., incorrectly formatted messages), you must use dead-lettering to ensure that your message will not be lost.
  • The automatic flow produces a BasicNack without requeue for processing failures. You must use dead-lettering to ensure that your message will not be lost.
  • The automatic flow produces a BasicAck for success. If you need more control, return an instance of IAMQPResult to control this behavior.
  • Minimal API design style made with minimum and cached reflection
  • Extensible with support for custom serializers and encoders

Flexible

AMQP Flow Control

Autoflow uses Ack, Nack, and Reject automatically, but you can control the flow.

Inside the Oragon.RabbitMQ.Consumer.Actions namespace, you can find some results:

  • AckResult (new AckResult();)
  • ComposableResult (new ComposableResult(params IAMQPResult[] results);)
  • NackResult (new NackResult(bool requeue);)
  • RejectResult (new RejectResult(bool requeue);)
  • ReplyResult (new ReplyResult(object objectToReply);) ⚠️EXPERIMENTAL⚠️

Example:

app.MapQueue("queueName", ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {
    
    IAMQPResult returnValue;

    if (svc.CanXpto(msg))
    {
        svc.DoXpto(msg);
        returnValue = new AckResult();
    } 
    else 
    {
        returnValue = new RejectResult(requeue: true);
    }
    return returnValue;
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);

Async or Not

app.MapQueue("queueName", async ([FromServices] BusinessService svc, BusinessCommandOrEvent msg) => {

    if (await svc.CanXpto(msg))
    {
       await svc.DoXpto(msg);
       return new AckResult();
    } 
    else 
    {
        return new RejectResult(requeue: true);
    }
})
.WithPrefetch(2000)
.WithDispatchConcurrency(4);

Model Binder Examples

Special Types

For these types, the model binder will set the correct current instance without needing a special attribute.

  • RabbitMQ.Client.IConnection
  • RabbitMQ.Client.IChannel
  • RabbitMQ.Client.Events.BasicDeliverEventArgs
  • RabbitMQ.Client.DeliveryModes
  • RabbitMQ.Client.IReadOnlyBasicProperties
  • System.IServiceProvider (scoped)

Special Names

Some string parameters are considered special, and the model binder will use a name to set the correct current string from the consumer.

Queue Name

The model binder will set the name of the queue that the consumer is consuming.

  • queue
  • queueName

Routing Key

The model binder will set a routing key from the AMQP message.

  • routing
  • routingKey

Exchange Name

The model binder will set an exchange name from the AMQP message.

  • exchange
  • exchangeName

Consumer Tag

The model binder will set a consumer tag from the actual consumer.

  • consumer
  • consumerTag

Telemetry

For version 1.0.0, I've removed all implementations of automatic telemetry and OpenTelemetry. It will be available as soon as possible.

Stages and Requirements for Launch

  • Migrate Demo to Library Project
  • Core: Queue Consumer
  • Core: Rpc Queue Consumer
  • Core: Support Keyed Services
  • Core: Support of new design of RabbitMQ.Client
  • Create Samples
  • Review All SuppressMessageAttribute
  • Create Docs
  • Benchmarks
  • Automate Badges
  • Add SonarCloud
  • Code Coverage > 80%
  • Add CI/CD
  • Add Unit Tests
  • Add Integrated Tests with TestContainers
  • Test CI/CD Flow: MyGet Alpha Packages with Symbols
  • Test CI/CD Flow: MyGet Packages without Symbols
  • Test CI/CD Flow: Nuget Packages without Symbols
  • Change original behavior based on lambda expressions to dynamic delegate.

About

Consume RabbitMQ messages like ASP.NET Minimal API's, opinionated and resilient first consumers, ensuring that no crucial configurations are hidden

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published