From b719d4b21f8eb25c80fa0f3e76e011101f5f603a Mon Sep 17 00:00:00 2001 From: mircotamburini <37830748+mircotamburini@users.noreply.github.com> Date: Tue, 26 Dec 2023 11:16:00 +0100 Subject: [PATCH] Elsa.Activities.Mqtt.Activities.MqttMessage substitute MQTTnet.MqttApplicationMessage problem of serialization (#4675) some classes bcome sealed for better performance. --- .../Activities/MqttMessage.cs | 10 ++++++++++ .../MqttMessageReceived.cs | 7 +++---- .../Options/MqttClientOptions.cs | 2 +- .../Services/BusClientFactory.cs | 7 +++---- .../Services/IMqttClientWrapper.cs | 5 ++--- .../Services/MqttClientWrapper.cs | 19 +++++++++++++------ .../Services/MqttTopicsStarter.cs | 14 ++++++-------- .../Elsa.Activities.Mqtt/Services/Worker.cs | 8 ++++---- 8 files changed, 42 insertions(+), 30 deletions(-) create mode 100644 src/activities/Elsa.Activities.Mqtt/Activities/MqttMessage.cs diff --git a/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessage.cs b/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessage.cs new file mode 100644 index 0000000000..68f0aca471 --- /dev/null +++ b/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessage.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Elsa.Activities.Mqtt.Activities +{ + public record MqttMessage ( string Topic, string Payload); +} diff --git a/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessageReceived/MqttMessageReceived.cs b/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessageReceived/MqttMessageReceived.cs index e726f04294..7d825e2e4c 100644 --- a/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessageReceived/MqttMessageReceived.cs +++ b/src/activities/Elsa.Activities.Mqtt/Activities/MqttMessageReceived/MqttMessageReceived.cs @@ -2,7 +2,6 @@ using Elsa.ActivityResults; using Elsa.Attributes; using Elsa.Services.Models; -using MQTTnet; namespace Elsa.Activities.Mqtt.Activities.MqttMessageReceived @@ -34,10 +33,10 @@ public MqttMessageReceived(IMessageReceiverClientFactory messageReceiver) private IActivityExecutionResult ExecuteInternalAsync(ActivityExecutionContext context) { - if (context.Input != null && context.Input.GetType() == typeof(MqttApplicationMessage)) + if (context.Input != null && context.Input.GetType() == typeof(MqttMessage)) { - var message = (MqttApplicationMessage)context.Input; - Output = System.Text.Encoding.UTF8.GetString(message.Payload); + var message = (MqttMessage)context.Input; + Output = message.Payload; TopicReceived = message.Topic; } diff --git a/src/activities/Elsa.Activities.Mqtt/Options/MqttClientOptions.cs b/src/activities/Elsa.Activities.Mqtt/Options/MqttClientOptions.cs index cbf531dc89..7504e176a4 100644 --- a/src/activities/Elsa.Activities.Mqtt/Options/MqttClientOptions.cs +++ b/src/activities/Elsa.Activities.Mqtt/Options/MqttClientOptions.cs @@ -4,7 +4,7 @@ namespace Elsa.Activities.Mqtt.Options { - public class MqttClientOptions + public sealed class MqttClientOptions { public string Topic { get; } public string Host { get; } diff --git a/src/activities/Elsa.Activities.Mqtt/Services/BusClientFactory.cs b/src/activities/Elsa.Activities.Mqtt/Services/BusClientFactory.cs index a525e750f0..e7ed06c705 100644 --- a/src/activities/Elsa.Activities.Mqtt/Services/BusClientFactory.cs +++ b/src/activities/Elsa.Activities.Mqtt/Services/BusClientFactory.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.DependencyInjection; using MQTTnet; -using MQTTnet.Client; using System; using System.Collections.Generic; using System.Threading; @@ -8,7 +7,7 @@ namespace Elsa.Activities.Mqtt.Services { - public class BusClientFactory : IMessageReceiverClientFactory, IMessageSenderClientFactory + public sealed class BusClientFactory : IMessageReceiverClientFactory, IMessageSenderClientFactory { private readonly IServiceProvider _serviceProvider; private readonly IDictionary _senders = new Dictionary(); @@ -53,13 +52,13 @@ public async Task GetReceiverAsync(Options.MqttClientOptions { return messageReceiverDateTime; } - + var mqttFactory = new MqttFactory(); var newClient = mqttFactory.CreateMqttClient(); var newMessageReceiver = ActivatorUtilities.CreateInstance(_serviceProvider, newClient, options); - _receivers.Add(options.GetHashCode(), newMessageReceiver ); + _receivers.Add(options.GetHashCode(), newMessageReceiver); return newMessageReceiver; } finally diff --git a/src/activities/Elsa.Activities.Mqtt/Services/IMqttClientWrapper.cs b/src/activities/Elsa.Activities.Mqtt/Services/IMqttClientWrapper.cs index cc30025d0d..661e30d316 100644 --- a/src/activities/Elsa.Activities.Mqtt/Services/IMqttClientWrapper.cs +++ b/src/activities/Elsa.Activities.Mqtt/Services/IMqttClientWrapper.cs @@ -1,5 +1,4 @@ -using Elsa.Activities.Mqtt.Options; -using MQTTnet; +using Elsa.Activities.Mqtt.Activities; using MQTTnet.Client; using System; using System.Threading.Tasks; @@ -11,6 +10,6 @@ public interface IMqttClientWrapper : IDisposable IMqttClient Client { get; } Options.MqttClientOptions Options { get; } Task PublishMessageAsync(string topic, string message); - Task SetMessageHandlerAsync(Func handler); + Task SetMessageHandlerAsync(Func handler); } } diff --git a/src/activities/Elsa.Activities.Mqtt/Services/MqttClientWrapper.cs b/src/activities/Elsa.Activities.Mqtt/Services/MqttClientWrapper.cs index cdba1344d3..b5e5af3035 100644 --- a/src/activities/Elsa.Activities.Mqtt/Services/MqttClientWrapper.cs +++ b/src/activities/Elsa.Activities.Mqtt/Services/MqttClientWrapper.cs @@ -1,3 +1,4 @@ +using Elsa.Activities.Mqtt.Activities; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Client; @@ -10,10 +11,10 @@ namespace Elsa.Activities.Mqtt.Services { - public class MqttClientWrapper : IMqttClientWrapper + public sealed class MqttClientWrapper : IMqttClientWrapper { private readonly ILogger _logger; - private Func? _messageHandler; + private Func? _messageHandler; private readonly SemaphoreSlim _semaphore = new(1); public IMqttClient Client { get; } public Options.MqttClientOptions Options { get; } @@ -25,8 +26,8 @@ public MqttClientWrapper(IMqttClient client, Options.MqttClientOptions options, _logger = logger; } - - private async Task SubscribeAsync(string topic, Func handler) + + private async Task SubscribeAsync(string topic, Func handler) { if (!Client.IsConnected) { @@ -39,7 +40,13 @@ private async Task SubscribeAsync(string topic, Func { if (_messageHandler != null) - await _messageHandler(e.ApplicationMessage); + { + var appmsg = e.ApplicationMessage; + var msg = new MqttMessage( + appmsg.Topic, + appmsg.Payload == null ? null : System.Text.Encoding.UTF8.GetString(appmsg.Payload)); + await _messageHandler(msg); + } else _logger.LogWarning("Attempted to subscribe to topic {Topic}, but no message handler was set.", Options.Topic); }; @@ -57,7 +64,7 @@ public async Task PublishMessageAsync(string topic, string message) await DisconnectAsync(); } - public async Task SetMessageHandlerAsync(Func handler) + public async Task SetMessageHandlerAsync(Func handler) { await SubscribeAsync(Options.Topic, handler); } diff --git a/src/activities/Elsa.Activities.Mqtt/Services/MqttTopicsStarter.cs b/src/activities/Elsa.Activities.Mqtt/Services/MqttTopicsStarter.cs index e955762500..422ba67bdb 100644 --- a/src/activities/Elsa.Activities.Mqtt/Services/MqttTopicsStarter.cs +++ b/src/activities/Elsa.Activities.Mqtt/Services/MqttTopicsStarter.cs @@ -1,4 +1,6 @@ +using Elsa.Activities.Mqtt.Bookmarks; using Elsa.Activities.Mqtt.Options; +using Elsa.Models; using Elsa.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -8,14 +10,10 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; -using Elsa.Activities.Mqtt.Bookmarks; -using Elsa.Models; -using Elsa.Services.Models; -using Elsa.Services.WorkflowStorage; namespace Elsa.Activities.Mqtt.Services { - public class MqttTopicsStarter : IMqttTopicsStarter + public sealed class MqttTopicsStarter : IMqttTopicsStarter { private readonly SemaphoreSlim _semaphore = new(1); private readonly IMessageReceiverClientFactory _receiverFactory; @@ -23,7 +21,7 @@ public class MqttTopicsStarter : IMqttTopicsStarter private readonly IServiceScopeFactory _scopeFactory; private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; - private readonly IDictionary _workers; + private readonly IDictionary _workers; public MqttTopicsStarter( IMessageReceiverClientFactory receiverFactory, @@ -127,7 +125,7 @@ public async IAsyncEnumerable GetConfigurationsAsync([Enumera private MqttClientOptions CreateConfigurationFromBookmark(MessageReceivedBookmark bookmark, string activityId) { - return new MqttClientOptions(bookmark.Topic,bookmark.Host,bookmark.Port,bookmark.Username,bookmark.Password,bookmark.Qos); + return new MqttClientOptions(bookmark.Topic, bookmark.Host, bookmark.Port, bookmark.Username, bookmark.Password, bookmark.Qos); } private async Task DisposeExistingWorkersAsync() @@ -141,6 +139,6 @@ private async Task DisposeExistingWorkersAsync() private async Task DisposeReceiverAsync(IMqttClientWrapper messageReceiver) => await _receiverFactory.DisposeReceiverAsync(messageReceiver); - + } } \ No newline at end of file diff --git a/src/activities/Elsa.Activities.Mqtt/Services/Worker.cs b/src/activities/Elsa.Activities.Mqtt/Services/Worker.cs index 9bf41fc255..287d5ac6c6 100644 --- a/src/activities/Elsa.Activities.Mqtt/Services/Worker.cs +++ b/src/activities/Elsa.Activities.Mqtt/Services/Worker.cs @@ -1,3 +1,4 @@ +using Elsa.Activities.Mqtt.Activities; using Elsa.Activities.Mqtt.Activities.MqttMessageReceived; using Elsa.Activities.Mqtt.Bookmarks; using Elsa.Activities.Mqtt.Options; @@ -5,14 +6,13 @@ using Elsa.Services; using Elsa.Services.Models; using Microsoft.Extensions.DependencyInjection; -using MQTTnet; using System; using System.Threading; using System.Threading.Tasks; namespace Elsa.Activities.Mqtt.Services { - public class Worker + public sealed class Worker { private readonly Func _disposeReceiverAction; private readonly IMqttClientWrapper _receiverClient; @@ -40,7 +40,7 @@ public void Ping() private IBookmark CreateBookmark(MqttClientOptions options) => new MessageReceivedBookmark(options.Topic, options.Host, options.Port, options.Username, options.Password, options.QualityOfService); - private async Task TriggerWorkflowsAsync(MqttApplicationMessage message, CancellationToken cancellationToken) + private async Task TriggerWorkflowsAsync(MqttMessage message, CancellationToken cancellationToken) { var bookmark = CreateBookmark(_receiverClient.Options); var launchContext = new WorkflowsQuery(ActivityType, bookmark); @@ -49,7 +49,7 @@ private async Task TriggerWorkflowsAsync(MqttApplicationMessage message, Cancell await workflowLaunchpad.CollectAndDispatchWorkflowsAsync(launchContext, new WorkflowInput(message), cancellationToken); } - private async Task OnMessageReceived(MqttApplicationMessage message) => await TriggerWorkflowsAsync(message, CancellationToken.None); + private async Task OnMessageReceived(MqttMessage message) => await TriggerWorkflowsAsync(message, CancellationToken.None); } } \ No newline at end of file