Skip to content

Commit

Permalink
migrated interfaces/added channel mapping
Browse files Browse the repository at this point in the history
removed a bunch of the message interfaces in favor of using record objects instead to make implemenetation of connectors simpler as well as clean up the code
added in a channel mapper class that allows for the channels specificed to be mapped/changed during run time for the different messaging styles in case there is a need to do so.
  • Loading branch information
roger-castaldo committed Jul 29, 2024
1 parent dbd19f7 commit eb9afe4
Show file tree
Hide file tree
Showing 63 changed files with 1,949 additions and 1,233 deletions.
8 changes: 4 additions & 4 deletions Abstractions/Interfaces/IContractConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ namespace MQContract.Interfaces
{
public interface IContractConnection
{
Task<IPingResult> PingAsync();
Task<ITransmissionResult> PublishAsync<T>(T message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
Task<PingResult> PingAsync();
Task<TransmissionResult> PublishAsync<T>(T message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
where T : class;
Task<ISubscription> SubscribeAsync<T>(Func<IMessage<T>,Task> messageRecieved, Action<Exception> errorRecieved, string? channel = null, string? group = null, bool ignoreMessageHeader = false,bool synchronous=false, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
where T : class;
Task<IQueryResult<R>> QueryAsync<Q, R>(Q message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
Task<QueryResult<R>> QueryAsync<Q, R>(Q message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
where Q : class
where R : class;
Task<IQueryResult<object>> QueryAsync<Q>(Q message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
Task<QueryResult<object>> QueryAsync<Q>(Q message, TimeSpan? timeout = null, string? channel = null, IMessageHeader? messageHeader = null, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
where Q : class;
Task<ISubscription> SubscribeQueryResponseAsync<Q,R>(Func<IMessage<Q>,Task<QueryResponseMessage<R>>> messageRecieved, Action<Exception> errorRecieved, string? channel = null, string? group = null, bool ignoreMessageHeader = false, bool synchronous = false, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken())
where Q : class
Expand Down
10 changes: 0 additions & 10 deletions Abstractions/Interfaces/IQueryResult.cs

This file was deleted.

16 changes: 16 additions & 0 deletions Abstractions/Interfaces/Messages/IEncodedMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using MQContract.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Interfaces.Messages
{
public interface IEncodedMessage
{
IMessageHeader Header { get; }
string MessageTypeID { get; }
ReadOnlyMemory<byte> Data { get; }
}
}
File renamed without changes.
10 changes: 5 additions & 5 deletions Abstractions/Interfaces/Service/IMessageServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ public interface IMessageServiceConnection
{
int? MaxMessageBodySize { get; }
TimeSpan DefaultTimout { get; }
Task<IPingResult> PingAsync();
Task<ITransmissionResult> PublishAsync(IServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<IServiceSubscription?> SubscribeAsync(Action<IRecievedServiceMessage> messageRecieved, Action<Exception> errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<IServiceQueryResult> QueryAsync(IServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<IServiceSubscription?> SubscribeQueryAsync(Func<IRecievedServiceMessage, Task<IServiceMessage>> messageRecieved, Action<Exception> errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<PingResult> PingAsync();
Task<TransmissionResult> PublishAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<IServiceSubscription?> SubscribeAsync(Action<RecievedServiceMessage> messageRecieved, Action<Exception> errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<ServiceQueryResult> QueryAsync(ServiceMessage message, TimeSpan timeout, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
Task<IServiceSubscription?> SubscribeQueryAsync(Func<RecievedServiceMessage, Task<ServiceMessage>> messageRecieved, Action<Exception> errorRecieved, string channel, string group, IServiceChannelOptions? options = null, CancellationToken cancellationToken = new CancellationToken());
}
}
7 changes: 0 additions & 7 deletions Abstractions/Messages/IErrorServiceMessage.cs

This file was deleted.

16 changes: 0 additions & 16 deletions Abstractions/Messages/IMessage.cs

This file was deleted.

21 changes: 0 additions & 21 deletions Abstractions/Messages/IPingResult.cs

This file was deleted.

7 changes: 0 additions & 7 deletions Abstractions/Messages/IRecievedServiceMessage.cs

This file was deleted.

10 changes: 0 additions & 10 deletions Abstractions/Messages/IServiceMessage.cs

This file was deleted.

6 changes: 0 additions & 6 deletions Abstractions/Messages/IServiceQueryResult.cs

This file was deleted.

17 changes: 0 additions & 17 deletions Abstractions/Messages/ITransmissionResult.cs

This file was deleted.

31 changes: 31 additions & 0 deletions Abstractions/Messages/MessageHeader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public sealed class MessageHeader(IEnumerable<KeyValuePair<string,string>> data)
: IMessageHeader
{
public MessageHeader(Dictionary<string, string?>? headers)
: this(headers?.AsEnumerable().Select(pair=>new KeyValuePair<string,string>(pair.Key,pair.Value??string.Empty))?? []) { }

public MessageHeader(IMessageHeader? originalHeader, Dictionary<string, string?>? appendedHeader)
: this(
(appendedHeader?.AsEnumerable().Select(pair => new KeyValuePair<string, string>(pair.Key, pair.Value??string.Empty))?? [])
.Concat(originalHeader?.Keys.Select(k => new KeyValuePair<string, string>(k, originalHeader?[k]!))?? [])
.DistinctBy(k => k.Key)
)
{ }

public string? this[string tagKey]
=> data.Where(p=>Equals(p.Key,tagKey))
.Select(p=>p.Value)
.FirstOrDefault();

public IEnumerable<string> Keys
=> data.Select(p=>p.Key);
}
}
11 changes: 11 additions & 0 deletions Abstractions/Messages/PingResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record PingResult(string Host,string Version, TimeSpan ResponseTime)
{}
}
13 changes: 13 additions & 0 deletions Abstractions/Messages/QueryResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record QueryResult<T>(string ID,IMessageHeader Header,T? Result=null,string? Error=null)
: TransmissionResult(ID,Error)
where T : class
{}
}
14 changes: 14 additions & 0 deletions Abstractions/Messages/RecievedServiceMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record RecievedServiceMessage(string ID, string MessageTypeID, string Channel, IMessageHeader Header, ReadOnlyMemory<byte> Data)
: ServiceMessage(ID,MessageTypeID,Channel,Header,Data)
{
public DateTime RecievedTimestamp { get; private init; } = DateTime.Now;
}
}
13 changes: 13 additions & 0 deletions Abstractions/Messages/ServiceMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using MQContract.Interfaces.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record ServiceMessage(string ID,string MessageTypeID,string Channel,IMessageHeader Header,ReadOnlyMemory<byte> Data)
: IEncodedMessage
{ }
}
14 changes: 14 additions & 0 deletions Abstractions/Messages/ServiceQueryResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using MQContract.Interfaces.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record ServiceQueryResult(string ID, IMessageHeader Header,string MessageTypeID,ReadOnlyMemory<byte> Data)
: IEncodedMessage
{
}
}
13 changes: 13 additions & 0 deletions Abstractions/Messages/TransmissionResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MQContract.Messages
{
public record TransmissionResult(string ID,string? Error=null)
{
public bool IsError=>!string.IsNullOrWhiteSpace(Error);
}
}
Loading

0 comments on commit eb9afe4

Please sign in to comment.