From 6b11115743652ee61ef600a373ae749e73ce5572 Mon Sep 17 00:00:00 2001 From: Bob Hauser Date: Thu, 19 Dec 2024 15:47:27 -0500 Subject: [PATCH 1/2] Updated output handling of asynchronously run workflows to be the same as when run synchronously --- .../TestWorkflow.cs | 18 ++ .../WorkflowExtensions.cs | 89 ++++++++++ .../BackgroundActivityInvokerMiddleware.cs | 2 +- .../Services/BackgroundActivityInvoker.cs | 33 +--- .../Tests.cs | 3 +- .../Activities/SampleActivity.cs | 28 +++ .../RunAsynchronousActivityOutput/Tests.cs | 164 ++++++++++++++++++ 7 files changed, 304 insertions(+), 33 deletions(-) create mode 100644 src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs create mode 100644 src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs create mode 100644 test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Activities/SampleActivity.cs create mode 100644 test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs diff --git a/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs b/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs new file mode 100644 index 0000000000..002e98ff8f --- /dev/null +++ b/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs @@ -0,0 +1,18 @@ +using Elsa.Workflows; + +namespace Elsa.Testing.Shared; + +public class TestWorkflow : WorkflowBase +{ + private readonly Action _buildWorkflow; + + public TestWorkflow(Action buildWorkflow) + { + _buildWorkflow = buildWorkflow; + } + + protected override void Build(IWorkflowBuilder workflowBuilder) + { + _buildWorkflow(workflowBuilder); + } +} diff --git a/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs b/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs new file mode 100644 index 0000000000..b9e5cb67b2 --- /dev/null +++ b/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs @@ -0,0 +1,89 @@ +using Elsa.Extensions; +using Elsa.Features.Services; +using Elsa.Mediator.Contracts; +using Elsa.Workflows; +using Elsa.Workflows.Notifications; +using Elsa.Workflows.Runtime; +using Elsa.Workflows.Runtime.Contracts; +using Elsa.Workflows.Runtime.Requests; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Elsa.Testing.Shared; + +public static class WorkflowExtensions +{ + public static async Task DispatchWorkflowAndRunToCompletion( + this IWorkflow workflowDefinition, + Action? configureServices = null, + Action? configureElsa = null, + string? instanceid = null, + string? definitionVersionId = null, + TimeSpan? timeout = default) + { + SemaphoreSlim semaphore = new (0, 1); + WorkflowFinished? workflowFinishedRecord = null; + + var host = Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + configureServices?.Invoke(services); + + // This notification handler will capture the WorkflowFinished record (to be returned) and release the semaphore + services.AddNotificationHandler(sp => new WorkflowFinishedAction(notification => + { + workflowFinishedRecord = notification; + semaphore.Release(); + } + )); + + services.AddElsa(elsa => { + configureElsa?.Invoke(elsa); + }); + }) + .Build(); + + try + { + // Start the Host + await host.StartAsync(CancellationToken.None); + + await host.Services.PopulateRegistriesAsync(); + + // Build the workflow + IWorkflowBuilderFactory workflowBuilderFactory = host.Services.GetRequiredService(); + var workflow = await workflowBuilderFactory.CreateBuilder().BuildWorkflowAsync(workflowDefinition); + + // Register the workflow + var workflowRegistry = host.Services.GetRequiredService(); + await workflowRegistry.RegisterAsync(workflow); + + // Dispatch the workflow + var workflowDispatcher = host.Services.GetRequiredService(); + var dispatchWorkflowResponse = await workflowDispatcher.DispatchAsync(new DispatchWorkflowDefinitionRequest() + { + DefinitionVersionId = "sampleWorkflow", + InstanceId = "instanceId" + }); + dispatchWorkflowResponse.ThrowIfFailed(); + + // Wait for the workflow to complete, and then return the WorkflowFinished notification + bool signaled = await semaphore.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); + return signaled ? workflowFinishedRecord : null; + } + finally + { + // Stop the Host + await host.StopAsync(CancellationToken.None); + } + } + + class WorkflowFinishedAction(Action action) : INotificationHandler + { + public Task HandleAsync(WorkflowFinished notification, CancellationToken cancellationToken) + { + action(notification); + return Task.CompletedTask; + } + } +} diff --git a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs index e0ed60f80d..98681722c6 100644 --- a/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs +++ b/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs @@ -122,7 +122,7 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context) continue; var output = (Output?)outputDescriptor.ValueGetter(activity); - context.Set(output, outputEntry.Value); + context.Set(output, outputEntry.Value, outputDescriptor.Name); } } diff --git a/src/modules/Elsa.Workflows.Runtime/Services/BackgroundActivityInvoker.cs b/src/modules/Elsa.Workflows.Runtime/Services/BackgroundActivityInvoker.cs index c2541fe79b..ab2badf954 100644 --- a/src/modules/Elsa.Workflows.Runtime/Services/BackgroundActivityInvoker.cs +++ b/src/modules/Elsa.Workflows.Runtime/Services/BackgroundActivityInvoker.cs @@ -90,35 +90,8 @@ private async Task ResumeWorkflowAsync(ActivityExecutionContext activityExecutio } private IDictionary ExtractActivityOutput(ActivityExecutionContext activityExecutionContext) - { - var outputDescriptors = activityExecutionContext.ActivityDescriptor.Outputs; - var outputValues = new Dictionary(); - - foreach (var outputDescriptor in outputDescriptors) - { - var output = (Output?)outputDescriptor.ValueGetter(activityExecutionContext.Activity); - - if (output == null) - continue; - - var memoryBlockReference = output.MemoryBlockReference(); - - if (!activityExecutionContext.ExpressionExecutionContext.TryGetBlock(memoryBlockReference, out var memoryBlock)) - continue; - - var variableMetadata = memoryBlock.Metadata as VariableBlockMetadata; - var driver = variableMetadata?.StorageDriverType; - - // We only capture output written to the workflow itself. Other drivers like blob storage, etc. will be ignored since the foreground context will be loading those. - if (driver != typeof(WorkflowStorageDriver) && driver != typeof(WorkflowInstanceStorageDriver) && driver != null) - continue; - - var outputValue = activityExecutionContext.Get(memoryBlockReference); - - if (outputValue != null) - outputValues[outputDescriptor.Name] = outputValue; - } - - return outputValues; + { + var activityOutputRegister = activityExecutionContext.WorkflowExecutionContext.GetActivityOutputRegister(); + return activityOutputRegister.FindMany(a => a.ActivityInstanceId == activityExecutionContext.Id).ToDictionary(o => o.OutputName, o => o.Value!); } } \ No newline at end of file diff --git a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/JsonObjectToObjectRemainsJsonObject/Tests.cs b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/JsonObjectToObjectRemainsJsonObject/Tests.cs index c89a2db5cf..04151f9f9d 100644 --- a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/JsonObjectToObjectRemainsJsonObject/Tests.cs +++ b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/JsonObjectToObjectRemainsJsonObject/Tests.cs @@ -1,5 +1,4 @@ using Elsa.Testing.Shared; -using Elsa.Workflows.IntegrationTests.Scenarios.JsonObjectToObjectRemainsJsonObject.Workflows; using Microsoft.Extensions.DependencyInjection; using Xunit.Abstractions; @@ -22,7 +21,7 @@ public JsonObjectJintTests(ITestOutputHelper testOutputHelper) public async Task Test1() { await _services.PopulateRegistriesAsync(); - await _workflowRunner.RunAsync(); + await _workflowRunner.RunAsync(); var lines = _capturingTextWriter.Lines.ToList(); Assert.Equal(new[] { "Baz" }, lines); } diff --git a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Activities/SampleActivity.cs b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Activities/SampleActivity.cs new file mode 100644 index 0000000000..b8fe5ae6f3 --- /dev/null +++ b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Activities/SampleActivity.cs @@ -0,0 +1,28 @@ +using Elsa.Workflows.Attributes; +using Elsa.Workflows.Models; + +namespace Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput.Activities; + +[Activity("Elsa.Test", Kind = ActivityKind.Task)] +internal class SampleActivity : CodeActivity +{ + + public SampleActivity() + { + RunAsynchronously = true; + } + + public Input? Number1 { get; set; } = default; + public Input? Number2 { get; set; } = default; + public Output? Sum { get; set; } = default; + public Output? Product { get; set; } = default; + + protected override void Execute(ActivityExecutionContext context) + { + int number1 = context.Get(Number1); + int number2 = context.Get(Number2); + + context.Set(Sum, number1 + number2); + context.Set(Product, number1 * number2); + } +} diff --git a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs new file mode 100644 index 0000000000..88fb19f748 --- /dev/null +++ b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs @@ -0,0 +1,164 @@ +using Elsa.Common.Services; +using Elsa.Extensions; +using Elsa.Testing.Shared; +using Elsa.Workflows.Activities; +using Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput.Activities; +using Elsa.Workflows.Memory; +using Elsa.Workflows.Notifications; +using Elsa.Workflows.Runtime.Distributed; +using Elsa.Workflows.Runtime.Entities; +using Elsa.Workflows.Runtime.Stores; +using Microsoft.Extensions.DependencyInjection; + +namespace Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput; + +public class Tests +{ + [Theory(DisplayName = "Activity outputs captured in activity execution record")] + [InlineData(true)] + [InlineData(false)] + public async Task ActivityOutputCaptureTest(bool runAsynchronously) + { + // Arrange + + TestWorkflow workflow = new(workflowBuilder => + { + workflowBuilder.DefinitionId = "sampleWorkflow"; + workflowBuilder.Id = "sampleWorkflow"; + + var variable1 = new Variable(); + workflowBuilder.Root = new Sequence + { + Variables = + { + variable1 + }, + Activities = + { + new SampleActivity() + { + Id = "SampleActivity1", + RunAsynchronously = runAsynchronously, + Number1 = new(4), + Number2 = new(8), + Sum = new(variable1), + Product = null + } + } + }; + } + ); + + var activityExecutionStore = new MemoryActivityExecutionStore(new MemoryStore()); + + // Act + + WorkflowFinished? workflowFinishedRecord = await workflow.DispatchWorkflowAndRunToCompletion( + configureElsa: elsa => + { + elsa.UseWorkflowRuntime(workflowRuntime => { + workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore; + }); + } + ); + + // Assert + + Assert.NotNull(workflowFinishedRecord); + Assert.Equal(WorkflowStatus.Finished, workflowFinishedRecord.WorkflowState.Status); + Assert.Equal(WorkflowSubStatus.Finished, workflowFinishedRecord.WorkflowState.SubStatus); + + var activityExecutionRecord = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity1" }); + Assert.NotNull(activityExecutionRecord?.Outputs); + Assert.Equal(2, activityExecutionRecord.Outputs!.Count); + Assert.Equal(12, activityExecutionRecord.Outputs!.GetValue("Sum")); + Assert.Equal(32, activityExecutionRecord.Outputs!.GetValue("Product")); + + var activityOutputRegister = workflowFinishedRecord.WorkflowExecutionContext.GetActivityOutputRegister(); + Assert.Equal(2, activityOutputRegister.FindMany(p => true).Count()); + Assert.Equal(12, activityOutputRegister.FindOutputByActivityId("SampleActivity1", "Sum")); + Assert.Equal(32, activityOutputRegister.FindOutputByActivityId("SampleActivity1", "Product")); + } + + [Theory(DisplayName = "Activity outputs captured in activity execution record")] + [InlineData(true)] + [InlineData(false)] + public async Task ActivityOutputCaptureParallelTest(bool runAsynchronously) + { + + // Arrange + + TestWorkflow workflow = new(workflowBuilder => + { + workflowBuilder.DefinitionId = "sampleWorkflow"; + workflowBuilder.Id = "sampleWorkflow"; + + var variable1 = new Variable(); + var variable2 = new Variable(); + workflowBuilder.Root = new Elsa.Workflows.Activities.Parallel() + { + Variables = + { + variable1, + variable2, + }, + Activities = + { + new SampleActivity() + { + Id = "SampleActivity1", + RunAsynchronously = runAsynchronously, + Number1 = new(4), + Number2 = new(8), + Sum = new(variable1), + }, + new SampleActivity() + { + Id = "SampleActivity2", + RunAsynchronously = runAsynchronously, + Number1 = new(2), + Number2 = new(7), + Product = new(variable2), + } + } + }; + } + ); + + var activityExecutionStore = new MemoryActivityExecutionStore(new MemoryStore()); + + // Act + + WorkflowFinished? workflowFinishedRecord = await workflow.DispatchWorkflowAndRunToCompletion( + configureServices: services => + { + services.AddScoped(); + }, + configureElsa: elsa => + { + elsa.UseWorkflowRuntime(workflowRuntime => { + workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore; + workflowRuntime.WorkflowRuntime = sp => sp.GetRequiredService(); + }); + } + ); + + // Assert + + Assert.NotNull(workflowFinishedRecord); + Assert.Equal(WorkflowStatus.Finished, workflowFinishedRecord.WorkflowState.Status); + Assert.Equal(WorkflowSubStatus.Finished, workflowFinishedRecord.WorkflowState.SubStatus); + + var activityExecutionRecord1 = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity1" }); + Assert.NotNull(activityExecutionRecord1?.Outputs); + Assert.Equal(2, activityExecutionRecord1.Outputs!.Count); + Assert.Equal(12, activityExecutionRecord1.Outputs!.GetValue("Sum")); + Assert.Equal(32, activityExecutionRecord1.Outputs!.GetValue("Product")); + + var activityExecutionRecord2 = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity2" }); + Assert.NotNull(activityExecutionRecord2?.Outputs); + Assert.Equal(2, activityExecutionRecord2.Outputs!.Count); + Assert.Equal(9, activityExecutionRecord2.Outputs!.GetValue("Sum")); + Assert.Equal(14, activityExecutionRecord2.Outputs!.GetValue("Product")); + } +} \ No newline at end of file From 6ffc7be19b0a16203a36c4d1330c7a14504cc59c Mon Sep 17 00:00:00 2001 From: Bob Hauser Date: Thu, 19 Dec 2024 16:22:29 -0500 Subject: [PATCH 2/2] updated tests --- src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs | 5 +++++ .../Elsa.Testing.Shared.Integration/WorkflowExtensions.cs | 7 +++---- .../Scenarios/RunAsynchronousActivityOutput/Tests.cs | 6 ------ 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs b/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs index 002e98ff8f..6dcc93d6d0 100644 --- a/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs +++ b/src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs @@ -14,5 +14,10 @@ public TestWorkflow(Action buildWorkflow) protected override void Build(IWorkflowBuilder workflowBuilder) { _buildWorkflow(workflowBuilder); + + if (string.IsNullOrEmpty(workflowBuilder.Id)) + { + workflowBuilder.Id = Guid.NewGuid().ToString(); + } } } diff --git a/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs b/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs index b9e5cb67b2..dc64ba1078 100644 --- a/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs +++ b/src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs @@ -17,8 +17,7 @@ public static class WorkflowExtensions this IWorkflow workflowDefinition, Action? configureServices = null, Action? configureElsa = null, - string? instanceid = null, - string? definitionVersionId = null, + string? instanceId = null, TimeSpan? timeout = default) { SemaphoreSlim semaphore = new (0, 1); @@ -62,8 +61,8 @@ public static class WorkflowExtensions var workflowDispatcher = host.Services.GetRequiredService(); var dispatchWorkflowResponse = await workflowDispatcher.DispatchAsync(new DispatchWorkflowDefinitionRequest() { - DefinitionVersionId = "sampleWorkflow", - InstanceId = "instanceId" + DefinitionVersionId = workflow.DefinitionHandle.DefinitionVersionId, + InstanceId = instanceId ?? Guid.NewGuid().ToString(), }); dispatchWorkflowResponse.ThrowIfFailed(); diff --git a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs index 88fb19f748..c0c9561719 100644 --- a/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs +++ b/test/integration/Elsa.Workflows.IntegrationTests/Scenarios/RunAsynchronousActivityOutput/Tests.cs @@ -23,9 +23,6 @@ public async Task ActivityOutputCaptureTest(bool runAsynchronously) TestWorkflow workflow = new(workflowBuilder => { - workflowBuilder.DefinitionId = "sampleWorkflow"; - workflowBuilder.Id = "sampleWorkflow"; - var variable1 = new Variable(); workflowBuilder.Root = new Sequence { @@ -90,9 +87,6 @@ public async Task ActivityOutputCaptureParallelTest(bool runAsynchronously) TestWorkflow workflow = new(workflowBuilder => { - workflowBuilder.DefinitionId = "sampleWorkflow"; - workflowBuilder.Id = "sampleWorkflow"; - var variable1 = new Variable(); var variable2 = new Variable(); workflowBuilder.Root = new Elsa.Workflows.Activities.Parallel()