Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Default Message Mapper #3407

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public IBrighterBuilder MapperRegistry(Action<ServiceCollectionMessageMapperRegi
if (registerMappers == null) throw new ArgumentNullException(nameof(registerMappers));

registerMappers(_mapperRegistry);
_mapperRegistry.EnsureDefaultMessageMapperIsRegistered();

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ public static MessageMapperRegistry MessageMapperRegistry(IServiceProvider provi

var messageMapperRegistry = new MessageMapperRegistry(
new ServiceProviderMapperFactory(provider),
new ServiceProviderMapperFactoryAsync(provider)
new ServiceProviderMapperFactoryAsync(provider),
preardon marked this conversation as resolved.
Show resolved Hide resolved
serviceCollectionMessageMapperRegistry.DefaultMessageMapper,
serviceCollectionMessageMapperRegistry.DefaultMessageMapperAsync
);

foreach (var messageMapper in serviceCollectionMessageMapperRegistry.Mappers)
Expand All @@ -391,6 +393,7 @@ public static MessageMapperRegistry MessageMapperRegistry(IServiceProvider provi
{
messageMapperRegistry.RegisterAsync(messageMapper.Key, messageMapper.Value);
}


return messageMapperRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,23 @@ THE SOFTWARE. */
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Paramore.Brighter.MessageMappers;

namespace Paramore.Brighter.Extensions.DependencyInjection
{
/// <summary>
/// When parsing for message mappers in assemblies, stores any found message mappers. A later step will add these to the message mapper registry
/// Not used directly
/// </summary>
public class ServiceCollectionMessageMapperRegistry
public class ServiceCollectionMessageMapperRegistry(
IServiceCollection serviceCollection,
ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
private readonly IServiceCollection _serviceCollection;
private readonly ServiceLifetime _lifetime;

public Dictionary<Type, Type> Mappers { get; } = new Dictionary<Type, Type>();
public Type DefaultMessageMapper { get; private set; } = typeof(JsonMessageMapper<>);
preardon marked this conversation as resolved.
Show resolved Hide resolved
public Dictionary<Type, Type> AsyncMappers { get; } = new Dictionary<Type, Type>();

public ServiceCollectionMessageMapperRegistry(IServiceCollection serviceCollection, ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
_serviceCollection = serviceCollection;
_lifetime = lifetime;
}

public Type DefaultMessageMapperAsync { get; private set; } = typeof(JsonMessageMapper<>);

/// <summary>
/// Register a mapper with the collection (generic version)
/// </summary>
Expand Down Expand Up @@ -76,7 +72,7 @@ public void RegisterAsync<TRequest, TMessageMapper>() where TRequest : class, IR
/// <param name="mapper">The type of the mapper</param>
public void Add(Type message, Type mapper)
{
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, lifetime));
Mappers.Add(message, mapper);
}

Expand All @@ -87,8 +83,40 @@ public void Add(Type message, Type mapper)
/// <param name="mapper">The type of the mapper</param>
public void AddAsync(Type message, Type mapper)
{
_serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, _lifetime));
serviceCollection.TryAdd(new ServiceDescriptor(mapper, mapper, lifetime));
AsyncMappers.Add(message, mapper);
}

/// <summary>
/// Set the Default Message Mapper
/// </summary>
/// <param name="defaultMapper">Type of default Message Mapper</param>
public void SetDefaultMessageMapper(Type defaultMapper)
{
serviceCollection.TryAdd(new ServiceDescriptor(defaultMapper, defaultMapper, lifetime));
DefaultMessageMapper = defaultMapper;
}

/// <summary>
/// Set the Default Async Message Mapper
/// </summary>
/// <param name="defaultMapper">Type of default async message mapper</param>
public void SetDefaultMessageMapperAsync(Type defaultMapper)
{
serviceCollection.TryAdd(new ServiceDescriptor(defaultMapper, defaultMapper, lifetime));
DefaultMessageMapperAsync = defaultMapper;
}

/// <summary>
/// Ensure that the default mappers are registered with Dependency Injection
/// </summary>
public void EnsureDefaultMessageMapperIsRegistered()
{
if(DefaultMessageMapper != null)
serviceCollection.TryAdd(new ServiceDescriptor(DefaultMessageMapper, DefaultMessageMapper, lifetime));
if (DefaultMessageMapperAsync != null)
serviceCollection.TryAdd(new ServiceDescriptor(DefaultMessageMapperAsync, DefaultMessageMapperAsync,
lifetime));
}
}
}
47 changes: 29 additions & 18 deletions src/Paramore.Brighter/MessageMapperRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ THE SOFTWARE. */

using System;
using System.Collections.Generic;
using Paramore.Brighter.MessageMappers;

namespace Paramore.Brighter
{
Expand All @@ -40,16 +41,24 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe
private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync;
private readonly Dictionary<Type, Type> _messageMappers = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _asyncMessageMappers = new Dictionary<Type, Type>();
private readonly Type? _defaultMessageMapper;
private readonly Type? _defaultMessageMapperAsync;

/// <summary>
/// Initializes a new instance of the <see cref="MessageMapperRegistry"/> class.
/// </summary>
/// <param name="messageMapperFactory">The message mapper factory.</param>
/// <param name="messageMapperFactoryAsync">The async message mapper factory</param>
public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync)
/// <param name="defaultMessageMapper">The default message Mapper</param>
/// <param name="defaultMessageMapperAsync">The default Async Message Mapper</param>
public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
IAmAMessageMapperFactoryAsync? messageMapperFactoryAsync, Type? defaultMessageMapper = null,
Type? defaultMessageMapperAsync = null)
{
_messageMapperFactory = messageMapperFactory;
_messageMapperFactoryAsync = messageMapperFactoryAsync;
_defaultMessageMapper = defaultMessageMapper;
_defaultMessageMapperAsync = defaultMessageMapperAsync;

if (messageMapperFactory == null && messageMapperFactoryAsync == null)
throw new ConfigurationException("Should have at least one factory");
Expand All @@ -62,33 +71,35 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, IAm
/// <returns>IAmAMessageMapper&lt;TRequest&gt;.</returns>
public IAmAMessageMapper<TRequest>? Get<TRequest>() where TRequest : class, IRequest
{
if ( _messageMapperFactory is not null && _messageMappers.ContainsKey(typeof(TRequest)))
{
var messageMapperType = _messageMappers[typeof(TRequest)];
return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}
else
{
if (_messageMapperFactory is null)
return null;
}

var messageMapperType = _messageMappers.ContainsKey(typeof(TRequest))
? _messageMappers[typeof(TRequest)]
: _defaultMessageMapper;

if (messageMapperType is null) return null;

return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}

/// <summary>
/// Gets this instance.
/// </summary>
/// <typeparam name="TRequest">The type of the t request.</typeparam>
/// <returns>IAmAMessageMapperAsync&lt;TRequest&gt;.</returns>
public IAmAMessageMapperAsync<TRequest>? GetAsync<TRequest>() where TRequest : class, IRequest
{
if (_messageMapperFactoryAsync is not null && _asyncMessageMappers.ContainsKey(typeof(TRequest)))
{
var messageMapperType = _asyncMessageMappers[typeof(TRequest)];
return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}
else
{
if (_messageMapperFactoryAsync is null)
return null;
}

var messageMapperType = _asyncMessageMappers.ContainsKey(typeof(TRequest))
? _asyncMessageMappers[typeof(TRequest)]
: _defaultMessageMapperAsync;

if (messageMapperType is null) return null;

return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}

/// <summary>
Expand Down
47 changes: 47 additions & 0 deletions src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Paramore.Brighter.MessageMappers;

public class JsonMessageMapper<TRequest>(RequestContext? context) : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
{
public IRequestContext? Context { get; set; } = context;

public Task<Message> MapToMessageAsync(TRequest request, Publication publication,
CancellationToken cancellationToken = default)
=> Task.FromResult(MapToMessage(request, publication));

public Task<TRequest> MapToRequestAsync(Message message, CancellationToken cancellationToken = default)
=> Task.FromResult(MapToRequest(message));

public Message MapToMessage(TRequest request, Publication publication)
{
MessageType messageType = request switch
{
ICommand => MessageType.MT_COMMAND,
IEvent => MessageType.MT_EVENT,
_ => throw new ArgumentException(@"This message mapper can only map Commands and Events", nameof(request))
};

if(publication.Topic is null)
throw new ArgumentException($"No Topic Defined for {publication}");

var header = new MessageHeader(messageId: request.Id, topic: publication.Topic, messageType: messageType);

var body = new MessageBody(JsonSerializer.Serialize(request, JsonSerialisationOptions.Options));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle there is an async serializer. I am not sure if it actually has meaning. But we should investigate. The MS route here is to pass an enum flag to the common method and then just use sync or async code in the body, based on that flag as needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var message = new Message(header, body);
return message;
}

public TRequest MapToRequest(Message message)
{
var request = JsonSerializer.Deserialize<TRequest>(message.Body.Value, JsonSerialisationOptions.Options);

if (request is null)
throw new ArgumentException($"Unable to deseralise message body for {message.Header.Topic}");

return request;
}
}
Loading