From 14b99201da6d80bab2ac366f14f897d7f5f2f678 Mon Sep 17 00:00:00 2001 From: roger-castaldo Date: Tue, 30 Jul 2024 09:25:29 -0400 Subject: [PATCH] code cleanup cleaned up some code as well as modified the NATS.io connector to use headers to hide the additional message properties to avoid using additional external libraries and for cleaner usage. --- .../Interfaces/Messages/IEncodedMessage.cs | 5 - Abstractions/Messages/MessageHeader.cs | 8 +- Abstractions/Messages/PingResult.cs | 8 +- Abstractions/Messages/QueryResult.cs | 8 +- .../Messages/RecievedServiceMessage.cs | 8 +- Abstractions/Messages/ServiceMessage.cs | 5 - Abstractions/Messages/ServiceQueryResult.cs | 5 - Abstractions/Messages/TransmissionResult.cs | 8 +- AutomatedTesting/ChannelMapperTests.cs | 6 - .../ContractConnectionTests/PublishTests.cs | 1 - .../ContractConnectionTests/QueryTests.cs | 1 - AutomatedTesting/Helper.cs | 3 - Connectors/KubeMQ/Connection.cs | 1 - .../KubeMQ/Interfaces/IKubeMQPingResult.cs | 9 +- .../KubeMQ/Subscriptions/PubSubscription.cs | 1 - .../KubeMQ/Subscriptions/QuerySubscription.cs | 1 - Connectors/NATS/Connection.cs | 103 +++++++++--------- Connectors/NATS/Exceptions.cs | 8 +- Connectors/NATS/Messages/MessageHeader.cs | 19 ---- Connectors/NATS/Messages/NatsMessage.cs | 19 ---- .../NATS/Messages/NatsQueryResponseMessage.cs | 22 ---- Connectors/NATS/NATS.csproj | 1 - .../Options/StreamPublishChannelOptions.cs | 5 - .../Options/StreamPublishSubscriberOptions.cs | 5 - .../NATS/Serialization/MessageSerializer.cs | 18 --- .../Serialization/SerializationRegistry.cs | 19 ---- .../IInternalServiceSubscription.cs | 5 - .../NATS/Subscriptions/PublishSubscription.cs | 17 +-- .../NATS/Subscriptions/QuerySubscription.cs | 45 ++------ .../NATS/Subscriptions/StreamSubscription.cs | 20 +--- .../NATS/Subscriptions/SubscriptionBase.cs | 27 ++++- Core/ChannelMapper.cs | 8 +- Core/Factories/ConversionPath.cs | 1 - Core/Interfaces/Conversion/IConversionPath.cs | 1 - Core/Messages/ErrorServiceMessage.cs | 10 +- 35 files changed, 99 insertions(+), 332 deletions(-) delete mode 100644 Connectors/NATS/Messages/MessageHeader.cs delete mode 100644 Connectors/NATS/Messages/NatsMessage.cs delete mode 100644 Connectors/NATS/Messages/NatsQueryResponseMessage.cs delete mode 100644 Connectors/NATS/Serialization/MessageSerializer.cs delete mode 100644 Connectors/NATS/Serialization/SerializationRegistry.cs diff --git a/Abstractions/Interfaces/Messages/IEncodedMessage.cs b/Abstractions/Interfaces/Messages/IEncodedMessage.cs index c876b56..ecc37bc 100644 --- a/Abstractions/Interfaces/Messages/IEncodedMessage.cs +++ b/Abstractions/Interfaces/Messages/IEncodedMessage.cs @@ -1,9 +1,4 @@ using MQContract.Messages; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.Interfaces.Messages { diff --git a/Abstractions/Messages/MessageHeader.cs b/Abstractions/Messages/MessageHeader.cs index d31be5f..d1d281e 100644 --- a/Abstractions/Messages/MessageHeader.cs +++ b/Abstractions/Messages/MessageHeader.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.Messages +namespace MQContract.Messages { public sealed class MessageHeader(IEnumerable> data) : IMessageHeader diff --git a/Abstractions/Messages/PingResult.cs b/Abstractions/Messages/PingResult.cs index 9a6e0ce..363e2d3 100644 --- a/Abstractions/Messages/PingResult.cs +++ b/Abstractions/Messages/PingResult.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.Messages +namespace MQContract.Messages { public record PingResult(string Host,string Version, TimeSpan ResponseTime) {} diff --git a/Abstractions/Messages/QueryResult.cs b/Abstractions/Messages/QueryResult.cs index ab685a7..132dc69 100644 --- a/Abstractions/Messages/QueryResult.cs +++ b/Abstractions/Messages/QueryResult.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.Messages +namespace MQContract.Messages { public record QueryResult(string ID,IMessageHeader Header,T? Result=null,string? Error=null) : TransmissionResult(ID,Error) diff --git a/Abstractions/Messages/RecievedServiceMessage.cs b/Abstractions/Messages/RecievedServiceMessage.cs index 4f107d9..84f168b 100644 --- a/Abstractions/Messages/RecievedServiceMessage.cs +++ b/Abstractions/Messages/RecievedServiceMessage.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.Messages +namespace MQContract.Messages { public record RecievedServiceMessage(string ID, string MessageTypeID, string Channel, IMessageHeader Header, ReadOnlyMemory Data) : ServiceMessage(ID,MessageTypeID,Channel,Header,Data) diff --git a/Abstractions/Messages/ServiceMessage.cs b/Abstractions/Messages/ServiceMessage.cs index eef3232..5e73633 100644 --- a/Abstractions/Messages/ServiceMessage.cs +++ b/Abstractions/Messages/ServiceMessage.cs @@ -1,9 +1,4 @@ using MQContract.Interfaces.Messages; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.Messages { diff --git a/Abstractions/Messages/ServiceQueryResult.cs b/Abstractions/Messages/ServiceQueryResult.cs index 5f6c5b7..6f0bd8b 100644 --- a/Abstractions/Messages/ServiceQueryResult.cs +++ b/Abstractions/Messages/ServiceQueryResult.cs @@ -1,9 +1,4 @@ using MQContract.Interfaces.Messages; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.Messages { diff --git a/Abstractions/Messages/TransmissionResult.cs b/Abstractions/Messages/TransmissionResult.cs index 9f55040..af50dbf 100644 --- a/Abstractions/Messages/TransmissionResult.cs +++ b/Abstractions/Messages/TransmissionResult.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.Messages +namespace MQContract.Messages { public record TransmissionResult(string ID,string? Error=null) { diff --git a/AutomatedTesting/ChannelMapperTests.cs b/AutomatedTesting/ChannelMapperTests.cs index 4df49d7..22df693 100644 --- a/AutomatedTesting/ChannelMapperTests.cs +++ b/AutomatedTesting/ChannelMapperTests.cs @@ -3,14 +3,8 @@ using MQContract.Attributes; using MQContract.Interfaces.Service; using MQContract; -using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Text; -using System.Threading.Tasks; using System.Reflection; -using MQContract.Interfaces; using System.Text.Json; namespace AutomatedTesting diff --git a/AutomatedTesting/ContractConnectionTests/PublishTests.cs b/AutomatedTesting/ContractConnectionTests/PublishTests.cs index be632b4..294241f 100644 --- a/AutomatedTesting/ContractConnectionTests/PublishTests.cs +++ b/AutomatedTesting/ContractConnectionTests/PublishTests.cs @@ -6,7 +6,6 @@ using Moq; using MQContract; using MQContract.Attributes; -using MQContract.Interfaces; using MQContract.Interfaces.Encoding; using MQContract.Interfaces.Encrypting; using MQContract.Interfaces.Service; diff --git a/AutomatedTesting/ContractConnectionTests/QueryTests.cs b/AutomatedTesting/ContractConnectionTests/QueryTests.cs index 405169d..c3e508f 100644 --- a/AutomatedTesting/ContractConnectionTests/QueryTests.cs +++ b/AutomatedTesting/ContractConnectionTests/QueryTests.cs @@ -9,7 +9,6 @@ using MQContract.Interfaces.Encoding; using MQContract.Interfaces.Encrypting; using MQContract.Interfaces.Service; -using MQContract.Messages; using System.Diagnostics; using System.IO.Compression; using System.Reflection; diff --git a/AutomatedTesting/Helper.cs b/AutomatedTesting/Helper.cs index 8389e40..6a60c08 100644 --- a/AutomatedTesting/Helper.cs +++ b/AutomatedTesting/Helper.cs @@ -1,8 +1,5 @@ using AutomatedTesting.ServiceInjection; using Microsoft.Extensions.DependencyInjection; -using Moq; -using System.Threading.Tasks; -using System.Threading; namespace AutomatedTesting { diff --git a/Connectors/KubeMQ/Connection.cs b/Connectors/KubeMQ/Connection.cs index 0a6a128..3d18603 100644 --- a/Connectors/KubeMQ/Connection.cs +++ b/Connectors/KubeMQ/Connection.cs @@ -10,7 +10,6 @@ using MQContract.KubeMQ.Subscriptions; using MQContract.Messages; using System.Diagnostics; -using System.IO.MemoryMappedFiles; using System.Text.RegularExpressions; namespace MQContract.KubeMQ diff --git a/Connectors/KubeMQ/Interfaces/IKubeMQPingResult.cs b/Connectors/KubeMQ/Interfaces/IKubeMQPingResult.cs index 95f1874..e00389e 100644 --- a/Connectors/KubeMQ/Interfaces/IKubeMQPingResult.cs +++ b/Connectors/KubeMQ/Interfaces/IKubeMQPingResult.cs @@ -1,11 +1,4 @@ -using MQContract.Messages; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.KubeMQ.Interfaces +namespace MQContract.KubeMQ.Interfaces { public interface IKubeMQPingResult { diff --git a/Connectors/KubeMQ/Subscriptions/PubSubscription.cs b/Connectors/KubeMQ/Subscriptions/PubSubscription.cs index ec53d80..3e16391 100644 --- a/Connectors/KubeMQ/Subscriptions/PubSubscription.cs +++ b/Connectors/KubeMQ/Subscriptions/PubSubscription.cs @@ -1,6 +1,5 @@ using Grpc.Core; using Microsoft.Extensions.Logging; -using MQContract.KubeMQ.Messages; using MQContract.KubeMQ.Options; using MQContract.KubeMQ.SDK.Connection; using MQContract.KubeMQ.SDK.Grpc; diff --git a/Connectors/KubeMQ/Subscriptions/QuerySubscription.cs b/Connectors/KubeMQ/Subscriptions/QuerySubscription.cs index 19cea9a..e42f9ba 100644 --- a/Connectors/KubeMQ/Subscriptions/QuerySubscription.cs +++ b/Connectors/KubeMQ/Subscriptions/QuerySubscription.cs @@ -1,6 +1,5 @@ using Grpc.Core; using Microsoft.Extensions.Logging; -using MQContract.KubeMQ.Messages; using MQContract.KubeMQ.SDK.Connection; using MQContract.KubeMQ.SDK.Grpc; using MQContract.Messages; diff --git a/Connectors/NATS/Connection.cs b/Connectors/NATS/Connection.cs index df510fc..b69a943 100644 --- a/Connectors/NATS/Connection.cs +++ b/Connectors/NATS/Connection.cs @@ -1,26 +1,21 @@ using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.Extensions.Options; using MQContract.Interfaces.Service; using MQContract.Messages; -using MQContract.NATS.Messages; using MQContract.NATS.Options; -using MQContract.NATS.Serialization; using MQContract.NATS.Subscriptions; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; -using System; -using System.Collections.Generic; -using System.Linq; using System.Text; -using System.Threading.Channels; -using System.Threading.Tasks; namespace MQContract.NATS { public class Connection : IMessageServiceConnection,IDisposable { + private const string MESSAGE_IDENTIFIER_HEADER = "_MessageID"; + private const string MESSAGE_TYPE_HEADER = "_MessageTypeID"; + private const string QUERY_RESPONSE_ERROR_TYPE = "NatsQueryError"; + private readonly NatsConnection natsConnection; private readonly NatsJSContext natsJSContext; private readonly ILogger? logger; @@ -101,14 +96,42 @@ public async Task PingAsync() await natsConnection.PingAsync() ); - public static NatsHeaders ExtractHeader(IMessageHeader header) + internal static NatsHeaders ExtractHeader(ServiceMessage message) => new(new Dictionary( - header.Keys.Select(k=> + message.Header.Keys.Select(k=> new KeyValuePair( k, - new Microsoft.Extensions.Primitives.StringValues(header[k])) + new Microsoft.Extensions.Primitives.StringValues(message.Header[k])) ) + .Concat([ + new(MESSAGE_IDENTIFIER_HEADER,message.ID), + new(MESSAGE_TYPE_HEADER,message.MessageTypeID) + ]) )); + internal static IMessageHeader ExtractHeader(NatsHeaders? header,out string? messageID,out string? messageTypeID) + { + if (header?.TryGetValue(MESSAGE_IDENTIFIER_HEADER, out var mid)??false) + messageID = mid.ToString(); + else + messageID=null; + if (header?.TryGetValue(MESSAGE_TYPE_HEADER, out var mti)??false) + messageTypeID=mti.ToString(); + else + messageTypeID=null; + return new MessageHeader(header? + .Where(pair=>!Equals(pair.Key,MESSAGE_IDENTIFIER_HEADER)&&!Equals(pair.Key,MESSAGE_TYPE_HEADER)) + .Select(pair => new KeyValuePair(pair.Key, pair.Value.ToString()))?? [] + ); + } + + internal static NatsHeaders ProduceQueryError(Exception exception,string messageID,out byte[] data) + { + data = UTF8Encoding.UTF8.GetBytes(exception.Message); + return new(new Dictionary([ + new KeyValuePair(MESSAGE_IDENTIFIER_HEADER,messageID), + new KeyValuePair(MESSAGE_TYPE_HEADER,QUERY_RESPONSE_ERROR_TYPE) + ])); + } public async Task PublishAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default) { @@ -120,32 +143,20 @@ public async Task PublishAsync(ServiceMessage message, TimeS { if (publishChannelOptions.Config!=null) await CreateStreamAsync(publishChannelOptions.Config, cancellationToken); - var ack = await natsJSContext.PublishAsync( + var ack = await natsJSContext.PublishAsync( message.Channel, - new NatsMessage() - { - ID=message.ID, - MessageTypeID=message.MessageTypeID, - Data=message.Data - }, - headers: ExtractHeader(message.Header), - serializer: MessageSerializer.Default, + message.Data.ToArray(), + headers: ExtractHeader(message), cancellationToken: cancellationToken ); return new TransmissionResult(message.ID, (ack.Error!=null ? $"{ack.Error.Code}:{ack.Error.Description}" : null)); } else { - await natsConnection.PublishAsync( + await natsConnection.PublishAsync( message.Channel, - new NatsMessage() - { - ID=message.ID, - MessageTypeID=message.MessageTypeID, - Data=message.Data - }, - headers: ExtractHeader(message.Header), - serializer: MessageSerializer.Default, + message.Data.ToArray(), + headers: ExtractHeader(message), cancellationToken: cancellationToken ); return new TransmissionResult(message.ID); @@ -158,26 +169,20 @@ await natsConnection.PublishAsync( public async Task QueryAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default) { - var result = await natsConnection.RequestAsync( + var result = await natsConnection.RequestAsync( message.Channel, - new NatsMessage() - { - ID=message.ID, - MessageTypeID=message.MessageTypeID, - Data=message.Data - }, - headers: ExtractHeader(message.Header), - requestSerializer: MessageSerializer.Default, - replySerializer: MessageSerializer.Default, + message.Data.ToArray(), + headers: ExtractHeader(message), cancellationToken: cancellationToken ); - if (!string.IsNullOrWhiteSpace(result.Data?.Error)) - throw new QueryAsyncReponseException(result.Data?.Error); + if (Equals(result.Headers?[MESSAGE_TYPE_HEADER], QUERY_RESPONSE_ERROR_TYPE)) + throw new QueryAsyncReponseException(UTF8Encoding.UTF8.GetString(result.Data!)); + var headers = ExtractHeader(result.Headers, out var messageID, out var messageTypeID); return new ServiceQueryResult( - result.Data?.ID??string.Empty, - new MQContract.NATS.Messages.MessageHeader(result.Headers), - result.Data?.MessageTypeID??string.Empty, - result.Data?.Data??new ReadOnlyMemory() + messageID??string.Empty, + headers, + messageTypeID??string.Empty, + result.Data??new ReadOnlyMemory() ); } @@ -195,10 +200,9 @@ public async Task QueryAsync(ServiceMessage message, TimeSpa } else subscription = new PublishSubscription( - natsConnection.SubscribeAsync( + natsConnection.SubscribeAsync( channel, queueGroup: group, - serializer: MessageSerializer.Default, cancellationToken: cancellationToken ), messageRecieved, @@ -215,10 +219,9 @@ public async Task QueryAsync(ServiceMessage message, TimeSpa public async Task SubscribeQueryAsync(Func> messageRecieved, Action errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default) { var sub = new QuerySubscription( - natsConnection.SubscribeAsync( + natsConnection.SubscribeAsync( channel, queueGroup: group, - serializer: MessageSerializer.Default, cancellationToken: cancellationToken ), messageRecieved, diff --git a/Connectors/NATS/Exceptions.cs b/Connectors/NATS/Exceptions.cs index 5573b9f..d9332eb 100644 --- a/Connectors/NATS/Exceptions.cs +++ b/Connectors/NATS/Exceptions.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.NATS +namespace MQContract.NATS { /// /// Thrown when an error occurs attempting to connect to the NATS server. diff --git a/Connectors/NATS/Messages/MessageHeader.cs b/Connectors/NATS/Messages/MessageHeader.cs deleted file mode 100644 index dcf7fb1..0000000 --- a/Connectors/NATS/Messages/MessageHeader.cs +++ /dev/null @@ -1,19 +0,0 @@ -using MQContract.Messages; -using NATS.Client.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.NATS.Messages -{ - internal class MessageHeader(NatsHeaders? headers) : IMessageHeader - { - public string? this[string tagKey] - => (headers?.TryGetValue(tagKey,out var value)??false ? value.ToString() : null); - - public IEnumerable Keys - => headers?.Keys?? []; - } -} diff --git a/Connectors/NATS/Messages/NatsMessage.cs b/Connectors/NATS/Messages/NatsMessage.cs deleted file mode 100644 index 25b8ff9..0000000 --- a/Connectors/NATS/Messages/NatsMessage.cs +++ /dev/null @@ -1,19 +0,0 @@ -using ProtoBuf; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.NATS.Messages -{ - [ProtoContract] - internal record NatsMessage { - [ProtoMember(1)] - public string ID { get; set; } = string.Empty; - [ProtoMember(2)] - public string MessageTypeID { get; set; } = string.Empty; - [ProtoMember(3)] - public ReadOnlyMemory Data { get; set; } = new(); - } -} diff --git a/Connectors/NATS/Messages/NatsQueryResponseMessage.cs b/Connectors/NATS/Messages/NatsQueryResponseMessage.cs deleted file mode 100644 index 17005a2..0000000 --- a/Connectors/NATS/Messages/NatsQueryResponseMessage.cs +++ /dev/null @@ -1,22 +0,0 @@ -using ProtoBuf; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.NATS.Messages -{ - [ProtoContract] - internal record NatsQueryResponseMessage - { - [ProtoMember(1)] - public string ID { get; set; } = string.Empty; - [ProtoMember(2)] - public string MessageTypeID { get; set; } = string.Empty; - [ProtoMember(3)] - public ReadOnlyMemory? Data { get; set; } = null; - [ProtoMember(4)] - public string? Error { get; set; } = null; - } -} diff --git a/Connectors/NATS/NATS.csproj b/Connectors/NATS/NATS.csproj index e7d8cfe..1552d06 100644 --- a/Connectors/NATS/NATS.csproj +++ b/Connectors/NATS/NATS.csproj @@ -10,7 +10,6 @@ - diff --git a/Connectors/NATS/Options/StreamPublishChannelOptions.cs b/Connectors/NATS/Options/StreamPublishChannelOptions.cs index f7afb68..fe4fc0d 100644 --- a/Connectors/NATS/Options/StreamPublishChannelOptions.cs +++ b/Connectors/NATS/Options/StreamPublishChannelOptions.cs @@ -1,10 +1,5 @@ using MQContract.Interfaces.Service; using NATS.Client.JetStream.Models; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Options { diff --git a/Connectors/NATS/Options/StreamPublishSubscriberOptions.cs b/Connectors/NATS/Options/StreamPublishSubscriberOptions.cs index ec71ee4..bf4a76f 100644 --- a/Connectors/NATS/Options/StreamPublishSubscriberOptions.cs +++ b/Connectors/NATS/Options/StreamPublishSubscriberOptions.cs @@ -1,10 +1,5 @@ using MQContract.Interfaces.Service; using NATS.Client.JetStream.Models; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Options { diff --git a/Connectors/NATS/Serialization/MessageSerializer.cs b/Connectors/NATS/Serialization/MessageSerializer.cs deleted file mode 100644 index 30c0689..0000000 --- a/Connectors/NATS/Serialization/MessageSerializer.cs +++ /dev/null @@ -1,18 +0,0 @@ -using NATS.Client.Core; -using System.Buffers; -using ProtoBuf; -using MQContract.NATS.Messages; - -namespace MQContract.NATS.Serialization -{ - internal class MessageSerializer : INatsSerializer - { - public static readonly INatsSerializer Default = new MessageSerializer(); - - public T? Deserialize(in ReadOnlySequence buffer) - => Serializer.Deserialize(buffer); - - public void Serialize(IBufferWriter bufferWriter, T value) - => Serializer.Serialize(bufferWriter, value); - } -} diff --git a/Connectors/NATS/Serialization/SerializationRegistry.cs b/Connectors/NATS/Serialization/SerializationRegistry.cs deleted file mode 100644 index a4877a7..0000000 --- a/Connectors/NATS/Serialization/SerializationRegistry.cs +++ /dev/null @@ -1,19 +0,0 @@ -using NATS.Client.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract.NATS.Serialization -{ - internal class SerializationRegistry : INatsSerializerRegistry - { - public static readonly INatsSerializerRegistry Default = new SerializationRegistry(); - public INatsDeserialize GetDeserializer() - => MessageSerializer.Default; - - public INatsSerialize GetSerializer() - => MessageSerializer.Default; - } -} diff --git a/Connectors/NATS/Subscriptions/IInternalServiceSubscription.cs b/Connectors/NATS/Subscriptions/IInternalServiceSubscription.cs index 67a184d..0bb1b80 100644 --- a/Connectors/NATS/Subscriptions/IInternalServiceSubscription.cs +++ b/Connectors/NATS/Subscriptions/IInternalServiceSubscription.cs @@ -1,9 +1,4 @@ using MQContract.Interfaces.Service; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Subscriptions { diff --git a/Connectors/NATS/Subscriptions/PublishSubscription.cs b/Connectors/NATS/Subscriptions/PublishSubscription.cs index 862ad89..94fce0d 100644 --- a/Connectors/NATS/Subscriptions/PublishSubscription.cs +++ b/Connectors/NATS/Subscriptions/PublishSubscription.cs @@ -1,16 +1,9 @@ using MQContract.Messages; -using MQContract.NATS.Messages; -using MQContract.NATS.Serialization; using NATS.Client.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Subscriptions { - internal class PublishSubscription(IAsyncEnumerable> asyncEnumerable, + internal class PublishSubscription(IAsyncEnumerable> asyncEnumerable, Action messageRecieved, Action errorRecieved, CancellationToken cancellationToken) : SubscriptionBase(cancellationToken) { @@ -20,13 +13,7 @@ protected override async Task RunAction() { try { - messageRecieved(new( - msg.Data?.ID??string.Empty, - msg.Data?.MessageTypeID??string.Empty, - msg.Subject, - new MQContract.NATS.Messages.MessageHeader(msg.Headers), - msg.Data?.Data??new ReadOnlyMemory() - )); + messageRecieved(ExtractMessage(msg)); } catch (Exception ex) { diff --git a/Connectors/NATS/Subscriptions/QuerySubscription.cs b/Connectors/NATS/Subscriptions/QuerySubscription.cs index 08c80ea..217b8ec 100644 --- a/Connectors/NATS/Subscriptions/QuerySubscription.cs +++ b/Connectors/NATS/Subscriptions/QuerySubscription.cs @@ -1,17 +1,9 @@ -using MQContract.Interfaces.Service; -using MQContract.Messages; -using MQContract.NATS.Messages; -using MQContract.NATS.Serialization; +using MQContract.Messages; using NATS.Client.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Subscriptions { - internal class QuerySubscription(IAsyncEnumerable> asyncEnumerable, + internal class QuerySubscription(IAsyncEnumerable> asyncEnumerable, Func> messageRecieved, Action errorRecieved, CancellationToken cancellationToken) : SubscriptionBase(cancellationToken) { @@ -19,40 +11,25 @@ protected override async Task RunAction() { await foreach (var msg in asyncEnumerable.WithCancellation(cancelToken.Token)) { + var recievedMessage = ExtractMessage(msg); try { - var result = await messageRecieved(new( - msg.Data?.ID??string.Empty, - msg.Data?.MessageTypeID??string.Empty, - msg.Subject, - new MQContract.NATS.Messages.MessageHeader(msg.Headers), - msg.Data?.Data??new ReadOnlyMemory() - )); - await msg.ReplyAsync( - new NatsQueryResponseMessage() - { - ID=result.ID, - MessageTypeID=result.MessageTypeID, - Data= result.Data - }, - headers: Connection.ExtractHeader(result.Header), + var result = await messageRecieved(recievedMessage); + await msg.ReplyAsync( + result.Data.ToArray(), + headers: Connection.ExtractHeader(result), replyTo: msg.ReplyTo, - serializer: MessageSerializer.Default, cancellationToken: cancelToken.Token ); } catch (Exception ex) { errorRecieved(ex); - await msg.ReplyAsync( - new NatsQueryResponseMessage() - { - ID=msg.Data?.ID??string.Empty, - MessageTypeID=msg.Data?.MessageTypeID??string.Empty, - Error= ex.Message - }, + var headers = Connection.ProduceQueryError(ex, recievedMessage.ID, out var responseData); + await msg.ReplyAsync( + responseData, replyTo: msg.ReplyTo, - serializer: MessageSerializer.Default, + headers:headers, cancellationToken: cancelToken.Token ); } diff --git a/Connectors/NATS/Subscriptions/StreamSubscription.cs b/Connectors/NATS/Subscriptions/StreamSubscription.cs index ddbe5c3..63dcd2f 100644 --- a/Connectors/NATS/Subscriptions/StreamSubscription.cs +++ b/Connectors/NATS/Subscriptions/StreamSubscription.cs @@ -1,13 +1,5 @@ -using MQContract.Interfaces.Service; -using MQContract.Messages; -using MQContract.NATS.Messages; -using MQContract.NATS.Serialization; +using MQContract.Messages; using NATS.Client.JetStream; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace MQContract.NATS.Subscriptions { @@ -23,18 +15,12 @@ protected override async Task RunAction() { await consumer.RefreshAsync(cancelToken.Token); // or try to recreate consumer - await foreach (var msg in consumer.ConsumeAsync(serializer: MessageSerializer.Default).WithCancellation(cancelToken.Token)) + await foreach (var msg in consumer.ConsumeAsync().WithCancellation(cancelToken.Token)) { var success = true; try { - messageRecieved(new( - msg.Data?.ID??string.Empty, - msg.Data?.MessageTypeID??string.Empty, - msg.Subject, - new MQContract.NATS.Messages.MessageHeader(msg.Headers), - msg.Data?.Data??new ReadOnlyMemory() - )); + messageRecieved(ExtractMessage(msg)); } catch (Exception ex) { diff --git a/Connectors/NATS/Subscriptions/SubscriptionBase.cs b/Connectors/NATS/Subscriptions/SubscriptionBase.cs index fbbe53f..cb8999f 100644 --- a/Connectors/NATS/Subscriptions/SubscriptionBase.cs +++ b/Connectors/NATS/Subscriptions/SubscriptionBase.cs @@ -1,9 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; +using MQContract.Messages; +using NATS.Client.Core; +using NATS.Client.JetStream; namespace MQContract.NATS.Subscriptions { @@ -12,6 +9,24 @@ internal abstract class SubscriptionBase(CancellationToken cancellationToken) : private bool disposedValue; protected readonly CancellationTokenSource cancelToken = new(); + protected static RecievedServiceMessage ExtractMessage(NatsJSMsg recievedMessage) + => ExtractMessage(recievedMessage.Headers, recievedMessage.Subject, recievedMessage.Data); + + protected static RecievedServiceMessage ExtractMessage(NatsMsg recievedMessage) + => ExtractMessage(recievedMessage.Headers, recievedMessage.Subject, recievedMessage.Data); + + private static RecievedServiceMessage ExtractMessage(NatsHeaders? headers, string subject, byte[]? data) + { + var convertedHeaders = Connection.ExtractHeader(headers, out var messageID, out var messageTypeID); + return new( + messageID??string.Empty, + messageTypeID??string.Empty, + subject, + convertedHeaders, + data??new ReadOnlyMemory() + ); + } + protected abstract Task RunAction(); public void Run() { diff --git a/Core/ChannelMapper.cs b/Core/ChannelMapper.cs index 5f541f9..6dc5d0a 100644 --- a/Core/ChannelMapper.cs +++ b/Core/ChannelMapper.cs @@ -1,10 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace MQContract +namespace MQContract { public class ChannelMapper { diff --git a/Core/Factories/ConversionPath.cs b/Core/Factories/ConversionPath.cs index d9a89d6..c53744c 100644 --- a/Core/Factories/ConversionPath.cs +++ b/Core/Factories/ConversionPath.cs @@ -5,7 +5,6 @@ using MQContract.Interfaces.Encoding; using MQContract.Interfaces.Encrypting; using MQContract.Interfaces.Messages; -using MQContract.Messages; namespace MQContract.Factories { diff --git a/Core/Interfaces/Conversion/IConversionPath.cs b/Core/Interfaces/Conversion/IConversionPath.cs index 7623471..63b9a2a 100644 --- a/Core/Interfaces/Conversion/IConversionPath.cs +++ b/Core/Interfaces/Conversion/IConversionPath.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.Logging; using MQContract.Interfaces.Messages; -using MQContract.Messages; namespace MQContract.Interfaces.Conversion { diff --git a/Core/Messages/ErrorServiceMessage.cs b/Core/Messages/ErrorServiceMessage.cs index 2f0b38f..c8dc888 100644 --- a/Core/Messages/ErrorServiceMessage.cs +++ b/Core/Messages/ErrorServiceMessage.cs @@ -1,12 +1,4 @@ -using MQContract.Attributes; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Text.Unicode; -using System.Threading.Channels; -using System.Threading.Tasks; -using static System.Runtime.InteropServices.JavaScript.JSType; +using System.Text; namespace MQContract.Messages {