Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
cleaned up some code as well as modified the NATS.io connector to use headers to hide the additional message properties to avoid using additional external libraries and for cleaner usage.
  • Loading branch information
roger-castaldo committed Jul 30, 2024
1 parent eb9afe4 commit 14b9920
Show file tree
Hide file tree
Showing 35 changed files with 99 additions and 332 deletions.
5 changes: 0 additions & 5 deletions Abstractions/Interfaces/Messages/IEncodedMessage.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using MQContract.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Interfaces.Messages
{
Expand Down
8 changes: 1 addition & 7 deletions Abstractions/Messages/MessageHeader.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
namespace MQContract.Messages
{
public sealed class MessageHeader(IEnumerable<KeyValuePair<string,string>> data)
: IMessageHeader
Expand Down
8 changes: 1 addition & 7 deletions Abstractions/Messages/PingResult.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
namespace MQContract.Messages
{
public record PingResult(string Host,string Version, TimeSpan ResponseTime)
{}
Expand Down
8 changes: 1 addition & 7 deletions Abstractions/Messages/QueryResult.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
namespace MQContract.Messages
{
public record QueryResult<T>(string ID,IMessageHeader Header,T? Result=null,string? Error=null)
: TransmissionResult(ID,Error)
Expand Down
8 changes: 1 addition & 7 deletions Abstractions/Messages/RecievedServiceMessage.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
namespace MQContract.Messages
{
public record RecievedServiceMessage(string ID, string MessageTypeID, string Channel, IMessageHeader Header, ReadOnlyMemory<byte> Data)
: ServiceMessage(ID,MessageTypeID,Channel,Header,Data)
Expand Down
5 changes: 0 additions & 5 deletions Abstractions/Messages/ServiceMessage.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using MQContract.Interfaces.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
Expand Down
5 changes: 0 additions & 5 deletions Abstractions/Messages/ServiceQueryResult.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using MQContract.Interfaces.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
Expand Down
8 changes: 1 addition & 7 deletions Abstractions/Messages/TransmissionResult.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
namespace MQContract.Messages
{
public record TransmissionResult(string ID,string? Error=null)
{
Expand Down
6 changes: 0 additions & 6 deletions AutomatedTesting/ChannelMapperTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,8 @@
using MQContract.Attributes;
using MQContract.Interfaces.Service;
using MQContract;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reflection;
using MQContract.Interfaces;
using System.Text.Json;

namespace AutomatedTesting
Expand Down
1 change: 0 additions & 1 deletion AutomatedTesting/ContractConnectionTests/PublishTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using Moq;
using MQContract;
using MQContract.Attributes;
using MQContract.Interfaces;
using MQContract.Interfaces.Encoding;
using MQContract.Interfaces.Encrypting;
using MQContract.Interfaces.Service;
Expand Down
1 change: 0 additions & 1 deletion AutomatedTesting/ContractConnectionTests/QueryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using MQContract.Interfaces.Encoding;
using MQContract.Interfaces.Encrypting;
using MQContract.Interfaces.Service;
using MQContract.Messages;
using System.Diagnostics;
using System.IO.Compression;
using System.Reflection;
Expand Down
3 changes: 0 additions & 3 deletions AutomatedTesting/Helper.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
using AutomatedTesting.ServiceInjection;
using Microsoft.Extensions.DependencyInjection;
using Moq;
using System.Threading.Tasks;
using System.Threading;

namespace AutomatedTesting
{
Expand Down
1 change: 0 additions & 1 deletion Connectors/KubeMQ/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using MQContract.KubeMQ.Subscriptions;
using MQContract.Messages;
using System.Diagnostics;
using System.IO.MemoryMappedFiles;
using System.Text.RegularExpressions;

namespace MQContract.KubeMQ
Expand Down
9 changes: 1 addition & 8 deletions Connectors/KubeMQ/Interfaces/IKubeMQPingResult.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
using MQContract.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.KubeMQ.Interfaces
namespace MQContract.KubeMQ.Interfaces
{
public interface IKubeMQPingResult
{
Expand Down
1 change: 0 additions & 1 deletion Connectors/KubeMQ/Subscriptions/PubSubscription.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using MQContract.KubeMQ.Messages;
using MQContract.KubeMQ.Options;
using MQContract.KubeMQ.SDK.Connection;
using MQContract.KubeMQ.SDK.Grpc;
Expand Down
1 change: 0 additions & 1 deletion Connectors/KubeMQ/Subscriptions/QuerySubscription.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using MQContract.KubeMQ.Messages;
using MQContract.KubeMQ.SDK.Connection;
using MQContract.KubeMQ.SDK.Grpc;
using MQContract.Messages;
Expand Down
103 changes: 53 additions & 50 deletions Connectors/NATS/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MQContract.Interfaces.Service;
using MQContract.Messages;
using MQContract.NATS.Messages;
using MQContract.NATS.Options;
using MQContract.NATS.Serialization;
using MQContract.NATS.Subscriptions;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace MQContract.NATS
{
public class Connection : IMessageServiceConnection,IDisposable
{
private const string MESSAGE_IDENTIFIER_HEADER = "_MessageID";
private const string MESSAGE_TYPE_HEADER = "_MessageTypeID";
private const string QUERY_RESPONSE_ERROR_TYPE = "NatsQueryError";

private readonly NatsConnection natsConnection;
private readonly NatsJSContext natsJSContext;
private readonly ILogger? logger;
Expand Down Expand Up @@ -101,14 +96,42 @@ public async Task<PingResult> PingAsync()
await natsConnection.PingAsync()
);

public static NatsHeaders ExtractHeader(IMessageHeader header)
internal static NatsHeaders ExtractHeader(ServiceMessage message)
=> new(new Dictionary<string, Microsoft.Extensions.Primitives.StringValues>(
header.Keys.Select(k=>
message.Header.Keys.Select(k=>
new KeyValuePair<string, Microsoft.Extensions.Primitives.StringValues>(
k,
new Microsoft.Extensions.Primitives.StringValues(header[k]))
new Microsoft.Extensions.Primitives.StringValues(message.Header[k]))
)
.Concat([
new(MESSAGE_IDENTIFIER_HEADER,message.ID),
new(MESSAGE_TYPE_HEADER,message.MessageTypeID)
])
));
internal static IMessageHeader ExtractHeader(NatsHeaders? header,out string? messageID,out string? messageTypeID)
{
if (header?.TryGetValue(MESSAGE_IDENTIFIER_HEADER, out var mid)??false)
messageID = mid.ToString();
else
messageID=null;
if (header?.TryGetValue(MESSAGE_TYPE_HEADER, out var mti)??false)
messageTypeID=mti.ToString();
else
messageTypeID=null;
return new MessageHeader(header?
.Where(pair=>!Equals(pair.Key,MESSAGE_IDENTIFIER_HEADER)&&!Equals(pair.Key,MESSAGE_TYPE_HEADER))
.Select(pair => new KeyValuePair<string, string>(pair.Key, pair.Value.ToString()))?? []
);
}

internal static NatsHeaders ProduceQueryError(Exception exception,string messageID,out byte[] data)
{
data = UTF8Encoding.UTF8.GetBytes(exception.Message);
return new(new Dictionary<string,Microsoft.Extensions.Primitives.StringValues>([
new KeyValuePair<string,Microsoft.Extensions.Primitives.StringValues>(MESSAGE_IDENTIFIER_HEADER,messageID),
new KeyValuePair<string,Microsoft.Extensions.Primitives.StringValues>(MESSAGE_TYPE_HEADER,QUERY_RESPONSE_ERROR_TYPE)
]));
}

public async Task<TransmissionResult> PublishAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default)
{
Expand All @@ -120,32 +143,20 @@ public async Task<TransmissionResult> PublishAsync(ServiceMessage message, TimeS
{
if (publishChannelOptions.Config!=null)
await CreateStreamAsync(publishChannelOptions.Config, cancellationToken);
var ack = await natsJSContext.PublishAsync<NatsMessage>(
var ack = await natsJSContext.PublishAsync<byte[]>(
message.Channel,
new NatsMessage()
{
ID=message.ID,
MessageTypeID=message.MessageTypeID,
Data=message.Data
},
headers: ExtractHeader(message.Header),
serializer: MessageSerializer<NatsMessage>.Default,
message.Data.ToArray(),
headers: ExtractHeader(message),
cancellationToken: cancellationToken
);
return new TransmissionResult(message.ID, (ack.Error!=null ? $"{ack.Error.Code}:{ack.Error.Description}" : null));
}
else
{
await natsConnection.PublishAsync<NatsMessage>(
await natsConnection.PublishAsync<byte[]>(
message.Channel,
new NatsMessage()
{
ID=message.ID,
MessageTypeID=message.MessageTypeID,
Data=message.Data
},
headers: ExtractHeader(message.Header),
serializer: MessageSerializer<NatsMessage>.Default,
message.Data.ToArray(),
headers: ExtractHeader(message),
cancellationToken: cancellationToken
);
return new TransmissionResult(message.ID);
Expand All @@ -158,26 +169,20 @@ await natsConnection.PublishAsync<NatsMessage>(

public async Task<ServiceQueryResult> QueryAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default)
{
var result = await natsConnection.RequestAsync<NatsMessage, NatsQueryResponseMessage>(
var result = await natsConnection.RequestAsync<byte[], byte[]>(
message.Channel,
new NatsMessage()
{
ID=message.ID,
MessageTypeID=message.MessageTypeID,
Data=message.Data
},
headers: ExtractHeader(message.Header),
requestSerializer: MessageSerializer<NatsMessage>.Default,
replySerializer: MessageSerializer<NatsQueryResponseMessage>.Default,
message.Data.ToArray(),
headers: ExtractHeader(message),
cancellationToken: cancellationToken
);
if (!string.IsNullOrWhiteSpace(result.Data?.Error))
throw new QueryAsyncReponseException(result.Data?.Error);
if (Equals(result.Headers?[MESSAGE_TYPE_HEADER], QUERY_RESPONSE_ERROR_TYPE))
throw new QueryAsyncReponseException(UTF8Encoding.UTF8.GetString(result.Data!));
var headers = ExtractHeader(result.Headers, out var messageID, out var messageTypeID);
return new ServiceQueryResult(
result.Data?.ID??string.Empty,
new MQContract.NATS.Messages.MessageHeader(result.Headers),
result.Data?.MessageTypeID??string.Empty,
result.Data?.Data??new ReadOnlyMemory<byte>()
messageID??string.Empty,
headers,
messageTypeID??string.Empty,
result.Data??new ReadOnlyMemory<byte>()
);
}

Expand All @@ -195,10 +200,9 @@ public async Task<ServiceQueryResult> QueryAsync(ServiceMessage message, TimeSpa
}
else
subscription = new PublishSubscription(
natsConnection.SubscribeAsync<NatsMessage>(
natsConnection.SubscribeAsync<byte[]>(
channel,
queueGroup: group,
serializer: MessageSerializer<NatsMessage>.Default,
cancellationToken: cancellationToken
),
messageRecieved,
Expand All @@ -215,10 +219,9 @@ public async Task<ServiceQueryResult> QueryAsync(ServiceMessage message, TimeSpa
public async Task<IServiceSubscription?> SubscribeQueryAsync(Func<RecievedServiceMessage, Task<ServiceMessage>> messageRecieved, Action<Exception> errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = default)
{
var sub = new QuerySubscription(
natsConnection.SubscribeAsync<NatsMessage>(
natsConnection.SubscribeAsync<byte[]>(
channel,
queueGroup: group,
serializer: MessageSerializer<NatsMessage>.Default,
cancellationToken: cancellationToken
),
messageRecieved,
Expand Down
8 changes: 1 addition & 7 deletions Connectors/NATS/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.NATS
namespace MQContract.NATS
{
/// <summary>
/// Thrown when an error occurs attempting to connect to the NATS server.
Expand Down
19 changes: 0 additions & 19 deletions Connectors/NATS/Messages/MessageHeader.cs

This file was deleted.

19 changes: 0 additions & 19 deletions Connectors/NATS/Messages/NatsMessage.cs

This file was deleted.

Loading

0 comments on commit 14b9920

Please sign in to comment.