Skip to content

Commit

Permalink
refactor: ♻️ enhance masstransit messaging registration
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdihadeli committed Aug 20, 2024
1 parent 99135f1 commit b9b0bef
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using BuildingBlocks.Core.Extensions.ServiceCollection;
using BuildingBlocks.Core.Messaging;
using BuildingBlocks.Core.Reflection;
using BuildingBlocks.Core.Reflection.Extensions;
using Humanizer;
using MassTransit;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -96,7 +98,7 @@ void ConfiguratorAction(IBusRegistrationConfigurator busRegistrationConfigurator
cfg.MessageTopology.SetEntityNameFormatter(new CustomEntityNameFormatter());

ApplyMessagesPublishTopology(cfg.PublishTopology, assemblies);
ApplyMessagesConsumeTopology(cfg.ConsumeTopology, assemblies);
ApplyMessagesConsumeTopology(cfg, context, assemblies);
ApplyMessagesSendTopology(cfg.SendTopology, assemblies);

configureReceiveEndpoints?.Invoke(context, cfg);
Expand Down Expand Up @@ -131,9 +133,104 @@ Assembly[] assemblies
) { }

private static void ApplyMessagesConsumeTopology(
IConsumeTopologyConfigurator consumeTopology,
IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator,
IBusRegistrationContext context,
Assembly[] assemblies
) { }
)
{
var consumeTopology = rabbitMqBusFactoryConfigurator.ConsumeTopology;

var messageTypes = ReflectionUtilities
.GetAllTypesImplementingInterface<IIntegrationEvent>(assemblies)
.Where(x => !x.IsGenericType);

foreach (var messageType in messageTypes)
{
var eventEnvelopeInterfaceMessageType = typeof(IEventEnvelope<>).MakeGenericType(messageType);
var eventEnvelopeInterfaceConfigurator = consumeTopology.GetMessageTopology(
eventEnvelopeInterfaceMessageType
);
eventEnvelopeInterfaceConfigurator.ConfigureConsumeTopology = true;

// none event-envelope message types
var messageConfigurator = consumeTopology.GetMessageTopology(messageType);
messageConfigurator.ConfigureConsumeTopology = true;

var eventEnvelopeConsumerInterface = typeof(IConsumer<>).MakeGenericType(eventEnvelopeInterfaceMessageType);
var envelopeConsumerConcretedTypes = eventEnvelopeConsumerInterface
.GetAllTypesImplementingInterface(assemblies)
.Where(x => !x.FullName!.Contains(nameof(MassTransit)));

var consumerType = envelopeConsumerConcretedTypes.SingleOrDefault();

if (consumerType is null)
{
var messageTypeConsumerInterface = typeof(IConsumer<>).MakeGenericType(messageType);
var messageTypeConsumerConcretedTypes = messageTypeConsumerInterface
.GetAllTypesImplementingInterface(assemblies)
.Where(x => !x.FullName!.Contains(nameof(MassTransit)));
var messageTypeConsumerType = messageTypeConsumerConcretedTypes.SingleOrDefault();

if (messageTypeConsumerType is null)
{
continue;
}

consumerType = messageTypeConsumerType;
}

ConfigureMessageReceiveEndpoint(rabbitMqBusFactoryConfigurator, context, messageType, consumerType);
}
}

private static void ConfigureMessageReceiveEndpoint(
IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator,
IBusRegistrationContext context,
Type messageType,
Type consumerType
)
{
// https://github.com/MassTransit/MassTransit/blob/eb3c9ee1007cea313deb39dc7c4eb796b7e61579/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointBuilder.cs#L70
// https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq

// https://masstransit.io/documentation/transports/rabbitmq
// This `queueName` creates an `intermediary exchange` (default is Fanout, but we can change it with re.ExchangeType) with the same queue named which bound to this exchange
rabbitMqBusFactoryConfigurator.ReceiveEndpoint(
queueName: messageType.Name.Underscore(),
re =>
{
re.Durable = true;

// set intermediate exchange type
// intermediate exchange name will be the same as queue name
re.ExchangeType = ExchangeType.Fanout;

// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();

// with setting `ConfigureConsumeTopology` to `false`, we should create `primary exchange` and its bounded exchange manually with using `re.Bind` otherwise with `ConfigureConsumeTopology=true` it get publish topology for message type `T` with `_publishTopology.GetMessageTopology<T>()` and use its ExchangeType and ExchangeName based ofo default EntityFormatter
re.ConfigureConsumeTopology = true;

// // https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
// // masstransit uses `wire-tapping` pattern for defining exchanges. Primary exchange will send the message to intermediary fanout exchange
// // setup primary exchange and its type
// re.Bind(
// $"{type.Name.Underscore()}{MessagingConstants.PrimaryExchangePostfix}",
// e =>
// {
// e.RoutingKey = type.Name.Underscore();
// e.ExchangeType = ExchangeType.Direct;
// }
// );

// https://github.com/MassTransit/MassTransit/discussions/3117
// https://masstransit-project.com/usage/configuration.html#receive-endpoints
re.ConfigureConsumer(context, consumerType);

re.RethrowFaultedMessages();
}
);
}

private static void ApplyMessagesPublishTopology(
IRabbitMqPublishTopologyConfigurator publishTopology,
Expand All @@ -145,10 +242,9 @@ Assembly[] assemblies
.GetAllTypesImplementingInterface<IIntegrationEvent>(assemblies)
.Where(x => !x.IsGenericType);

// Print the results
foreach (var type in messageTypes)
foreach (var messageType in messageTypes)
{
var eventEnvelopeInterfaceMessageType = typeof(IEventEnvelope<>).MakeGenericType(type);
var eventEnvelopeInterfaceMessageType = typeof(IEventEnvelope<>).MakeGenericType(messageType);
var eventEnvelopeInterfaceConfigurator = publishTopology.GetMessageTopology(
eventEnvelopeInterfaceMessageType
);
Expand All @@ -157,12 +253,13 @@ Assembly[] assemblies
eventEnvelopeInterfaceConfigurator.Durable = true;
eventEnvelopeInterfaceConfigurator.ExchangeType = ExchangeType.Direct;

var eventEnvelopeMessageType = typeof(EventEnvelope<>).MakeGenericType(type);
var eventEnvelopeMessageType = typeof(EventEnvelope<>).MakeGenericType(messageType);
var eventEnvelopeMessageTypeConfigurator = publishTopology.GetMessageTopology(eventEnvelopeMessageType);
eventEnvelopeMessageTypeConfigurator.Durable = true;
eventEnvelopeMessageTypeConfigurator.ExchangeType = ExchangeType.Direct;

var messageConfigurator = publishTopology.GetMessageTopology(type);
// none event-envelope message types
var messageConfigurator = publishTopology.GetMessageTopology(messageType);
messageConfigurator.Durable = true;
messageConfigurator.ExchangeType = ExchangeType.Direct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using FoodDelivery.Services.Shared.Customers.Customers.Events.V1.Integration;
using Humanizer;
using MassTransit;
using RabbitMQ.Client;

namespace FoodDelivery.Services.Customers.Customers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
</PropertyGroup>

<ItemGroup>
<Folder Include="Customers\Extensions\" />
<Folder Include="Shared\Data\Migrations" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,77 +12,79 @@ internal static class MassTransitExtensions
{
internal static void AddProductEndpoints(this IRabbitMqBusFactoryConfigurator cfg, IBusRegistrationContext context)
{
// https://github.com/MassTransit/MassTransit/blob/eb3c9ee1007cea313deb39dc7c4eb796b7e61579/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointBuilder.cs#L70
// https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq

// https://masstransit.io/documentation/transports/rabbitmq
// This `queueName` creates an `intermediary exchange` (default is Fanout, but we can change it with re.ExchangeType) with the same queue named which bound to this exchange
cfg.ReceiveEndpoint(
nameof(ProductStockReplenishedV1).Underscore(),
re =>
{
re.Durable = true;

// set intermediate exchange type
// intermediate exchange name will be the same as queue name
re.ExchangeType = ExchangeType.Fanout;

// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();

// with setting `ConfigureConsumeTopology` to `false`, we should create `primary exchange` and its bounded exchange manually with using `re.Bind` otherwise with `ConfigureConsumeTopology=true` it get publish topology for message type `T` with `_publishTopology.GetMessageTopology<T>()` and use its ExchangeType and ExchangeName based ofo default EntityFormatter
re.ConfigureConsumeTopology = false;

// https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
// masstransit uses `wire-tapping` pattern for defining exchanges. Primary exchange will send the message to intermediary fanout exchange
// setup primary exchange and its type
re.Bind<IEventEnvelope<ProductStockReplenishedV1>>(e =>
{
e.RoutingKey = nameof(ProductStockReplenishedV1).Underscore();
e.ExchangeType = ExchangeType.Direct;
});

// https://github.com/MassTransit/MassTransit/discussions/3117
// https://masstransit-project.com/usage/configuration.html#receive-endpoints
re.ConfigureConsumer<ProductStockReplenishedConsumer>(context);

re.RethrowFaultedMessages();
}
);

// https://masstransit.io/documentation/transports/rabbitmq
// This `queueName` creates an `intermediary exchange` (default is Fanout, but we can change it with re.ExchangeType) with the same queue named which bound to this exchange
cfg.ReceiveEndpoint(
nameof(ProductCreatedV1).Underscore(),
re =>
{
re.Durable = true;

// set intermediate exchange type
// intermediate exchange name will be the same as queue name
re.ExchangeType = ExchangeType.Fanout;

// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();

// with setting `ConfigureConsumeTopology` to `false`, we should create `primary exchange` and its bounded exchange manually with using `re.Bind` otherwise with `ConfigureConsumeTopology=true` it get publish topology for message type `T` with `_publishTopology.GetMessageTopology<T>()` and use its ExchangeType and ExchangeName based ofo default EntityFormatter
re.ConfigureConsumeTopology = false;

// https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
// masstransit uses `wire-tapping` pattern for defining exchanges. Primary exchange will send the message to intermediary fanout exchange
// setup primary exchange and its type
re.Bind<IEventEnvelope<ProductCreatedV1>>(e =>
{
e.RoutingKey = nameof(ProductCreatedV1).Underscore();
e.ExchangeType = ExchangeType.Direct;
});

// https://github.com/MassTransit/MassTransit/discussions/3117
// https://masstransit-project.com/usage/configuration.html#receive-endpoints
re.ConfigureConsumer<ProductCreatedConsumer>(context);

re.RethrowFaultedMessages();
}
);
// we configured some shared settings for all receive endpoint message in masstransit consuming topologies

// // https://github.com/MassTransit/MassTransit/blob/eb3c9ee1007cea313deb39dc7c4eb796b7e61579/src/Transports/MassTransit.RabbitMqTransport/RabbitMqTransport/Configuration/RabbitMqReceiveEndpointBuilder.cs#L70
// // https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
//
// // https://masstransit.io/documentation/transports/rabbitmq
// // This `queueName` creates an `intermediary exchange` (default is Fanout, but we can change it with re.ExchangeType) with the same queue named which bound to this exchange
// cfg.ReceiveEndpoint(
// nameof(ProductStockReplenishedV1).Underscore(),
// re =>
// {
// re.Durable = true;
//
// // set intermediate exchange type
// // intermediate exchange name will be the same as queue name
// re.ExchangeType = ExchangeType.Fanout;
//
// // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
// re.SetQuorumQueue();
//
// // with setting `ConfigureConsumeTopology` to `false`, we should create `primary exchange` and its bounded exchange manually with using `re.Bind` otherwise with `ConfigureConsumeTopology=true` it get publish topology for message type `T` with `_publishTopology.GetMessageTopology<T>()` and use its ExchangeType and ExchangeName based ofo default EntityFormatter
// re.ConfigureConsumeTopology = false;
//
// // https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
// // masstransit uses `wire-tapping` pattern for defining exchanges. Primary exchange will send the message to intermediary fanout exchange
// // setup primary exchange and its type
// re.Bind<IEventEnvelope<ProductStockReplenishedV1>>(e =>
// {
// e.RoutingKey = nameof(ProductStockReplenishedV1).Underscore();
// e.ExchangeType = ExchangeType.Direct;
// });
//
// // https://github.com/MassTransit/MassTransit/discussions/3117
// // https://masstransit-project.com/usage/configuration.html#receive-endpoints
// re.ConfigureConsumer<ProductStockReplenishedConsumer>(context);
//
// re.RethrowFaultedMessages();
// }
// );
//
// // https://masstransit.io/documentation/transports/rabbitmq
// // This `queueName` creates an `intermediary exchange` (default is Fanout, but we can change it with re.ExchangeType) with the same queue named which bound to this exchange
// cfg.ReceiveEndpoint(
// nameof(ProductCreatedV1).Underscore(),
// re =>
// {
// re.Durable = true;
//
// // set intermediate exchange type
// // intermediate exchange name will be the same as queue name
// re.ExchangeType = ExchangeType.Fanout;
//
// // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
// re.SetQuorumQueue();
//
// // with setting `ConfigureConsumeTopology` to `false`, we should create `primary exchange` and its bounded exchange manually with using `re.Bind` otherwise with `ConfigureConsumeTopology=true` it get publish topology for message type `T` with `_publishTopology.GetMessageTopology<T>()` and use its ExchangeType and ExchangeName based ofo default EntityFormatter
// re.ConfigureConsumeTopology = false;
//
// // https://spring.io/blog/2011/04/01/routing-topologies-for-performance-and-scalability-with-rabbitmq
// // masstransit uses `wire-tapping` pattern for defining exchanges. Primary exchange will send the message to intermediary fanout exchange
// // setup primary exchange and its type
// re.Bind<IEventEnvelope<ProductCreatedV1>>(e =>
// {
// e.RoutingKey = nameof(ProductCreatedV1).Underscore();
// e.ExchangeType = ExchangeType.Direct;
// });
//
// // https://github.com/MassTransit/MassTransit/discussions/3117
// // https://masstransit-project.com/usage/configuration.html#receive-endpoints
// re.ConfigureConsumer<ProductCreatedConsumer>(context);
//
// re.RethrowFaultedMessages();
// }
// );
}
}
Loading

0 comments on commit b9b0bef

Please sign in to comment.