Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated output handling of asynchronously run workflows to be the same as when run synchronously #6228

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/common/Elsa.Testing.Shared.Integration/TestWorkflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using Elsa.Workflows;

namespace Elsa.Testing.Shared;

public class TestWorkflow : WorkflowBase
{
private readonly Action<IWorkflowBuilder> _buildWorkflow;

public TestWorkflow(Action<IWorkflowBuilder> buildWorkflow)
{
_buildWorkflow = buildWorkflow;
}

protected override void Build(IWorkflowBuilder workflowBuilder)
{
_buildWorkflow(workflowBuilder);
}
}
89 changes: 89 additions & 0 deletions src/common/Elsa.Testing.Shared.Integration/WorkflowExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using Elsa.Extensions;
using Elsa.Features.Services;
using Elsa.Mediator.Contracts;
using Elsa.Workflows;
using Elsa.Workflows.Notifications;
using Elsa.Workflows.Runtime;
using Elsa.Workflows.Runtime.Contracts;
using Elsa.Workflows.Runtime.Requests;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Elsa.Testing.Shared;

public static class WorkflowExtensions
{
public static async Task<WorkflowFinished?> DispatchWorkflowAndRunToCompletion(
this IWorkflow workflowDefinition,
Action<IServiceCollection>? configureServices = null,
Action<IModule>? configureElsa = null,
string? instanceid = null,
string? definitionVersionId = null,
TimeSpan? timeout = default)
{
SemaphoreSlim semaphore = new (0, 1);
WorkflowFinished? workflowFinishedRecord = null;

var host = Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
configureServices?.Invoke(services);

// This notification handler will capture the WorkflowFinished record (to be returned) and release the semaphore
services.AddNotificationHandler<WorkflowFinishedAction, WorkflowFinished>(sp => new WorkflowFinishedAction(notification =>
{
workflowFinishedRecord = notification;
semaphore.Release();
}
));

services.AddElsa(elsa => {
configureElsa?.Invoke(elsa);
});
})
.Build();

try
{
// Start the Host
await host.StartAsync(CancellationToken.None);

await host.Services.PopulateRegistriesAsync();

// Build the workflow
IWorkflowBuilderFactory workflowBuilderFactory = host.Services.GetRequiredService<IWorkflowBuilderFactory>();
var workflow = await workflowBuilderFactory.CreateBuilder().BuildWorkflowAsync(workflowDefinition);

// Register the workflow
var workflowRegistry = host.Services.GetRequiredService<IWorkflowRegistry>();
await workflowRegistry.RegisterAsync(workflow);

// Dispatch the workflow
var workflowDispatcher = host.Services.GetRequiredService<IWorkflowDispatcher>();
var dispatchWorkflowResponse = await workflowDispatcher.DispatchAsync(new DispatchWorkflowDefinitionRequest()
{
DefinitionVersionId = "sampleWorkflow",
InstanceId = "instanceId"
});
dispatchWorkflowResponse.ThrowIfFailed();

// Wait for the workflow to complete, and then return the WorkflowFinished notification
bool signaled = await semaphore.WaitAsync(timeout ?? TimeSpan.FromSeconds(5));
return signaled ? workflowFinishedRecord : null;
}
finally
{
// Stop the Host
await host.StopAsync(CancellationToken.None);
}
}

class WorkflowFinishedAction(Action<WorkflowFinished> action) : INotificationHandler<WorkflowFinished>
{
public Task HandleAsync(WorkflowFinished notification, CancellationToken cancellationToken)
{
action(notification);
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private static void CaptureOutputIfAny(ActivityExecutionContext context)
continue;

var output = (Output?)outputDescriptor.ValueGetter(activity);
context.Set(output, outputEntry.Value);
context.Set(output, outputEntry.Value, outputDescriptor.Name);
sfmskywalker marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,35 +90,8 @@ private async Task ResumeWorkflowAsync(ActivityExecutionContext activityExecutio
}

private IDictionary<string, object> ExtractActivityOutput(ActivityExecutionContext activityExecutionContext)
{
var outputDescriptors = activityExecutionContext.ActivityDescriptor.Outputs;
var outputValues = new Dictionary<string, object>();

foreach (var outputDescriptor in outputDescriptors)
{
var output = (Output?)outputDescriptor.ValueGetter(activityExecutionContext.Activity);

if (output == null)
continue;

var memoryBlockReference = output.MemoryBlockReference();

if (!activityExecutionContext.ExpressionExecutionContext.TryGetBlock(memoryBlockReference, out var memoryBlock))
continue;

var variableMetadata = memoryBlock.Metadata as VariableBlockMetadata;
var driver = variableMetadata?.StorageDriverType;

// We only capture output written to the workflow itself. Other drivers like blob storage, etc. will be ignored since the foreground context will be loading those.
if (driver != typeof(WorkflowStorageDriver) && driver != typeof(WorkflowInstanceStorageDriver) && driver != null)
continue;

var outputValue = activityExecutionContext.Get(memoryBlockReference);

if (outputValue != null)
outputValues[outputDescriptor.Name] = outputValue;
}

return outputValues;
{
var activityOutputRegister = activityExecutionContext.WorkflowExecutionContext.GetActivityOutputRegister();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear that this "solution" may be too simplistic since the old code clearly went out of it's way to only return values that are backed by WorkflowInstanceStorageDriver. I'm not sure that this is a concern now considering that the parent workflow seems to be now resumed in the same thread. Anyway - I'd welcome discussion in this particular change (as well as any other change!).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern is that when an activity produces a large object, e.g. a file or JSON string of 5 MB, the user might use a different variable storage driver, such as "Blob Storage Driver" that stores the data in Azure Storage.

For this reason, we want to avoid those large values to become part of the output register, given that it will be serialized as well.

Obviously, the current implementation is rather crude and intended to be a temporary hack.
The ultimate solution is to allow for better control of how output is recorded.

For example, for small values, it might be OK to be recorded. The assumption here is that if the storage driver is "Workflow Instance", which stores the value as part of the workflow instance, it's safe to also include it as part of the output register.

For large values, the assumption is that the user will have used the Memory storage driver or a custom one such as "Blob Storage Driver". In this case, we definitely do not want the value to be recorded in the output register.

Now, we actually do want to record something in the output register. In the case of large blobs, a download link could be appropriate.

So perhaps a cleaner approach would be to leverage the associated storage driver to have it produce a value suitable for storing in the output register. This would clean up this code because now we no longer need to test against the storage driver type.

Pending that solution, perhaps for now it might be best to keep the code to only include variables into the output if those variables are associated with a workflow instance driver.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. Having inclusion in the output register dependent on the associated storage driver implies that unmapped (unbound) outputs would continue to not be registered in the output register when activities are run asynchronously. And having synchronously executed activities behave differently than asynchronously executed activities also feels odd (for synchronously executed activities I think all outputs are registered regardless of being bound or not, and regardless of storage driver). And, in fact, for bound outputs I think it is less important to be included in the output register since down stream activities can just map to the variable.

Perhaps the best solution would be for me to introduce a IBackgroundActivityOutputExtractor service where the default implementation would do exactly what the existing (prior to my PR) BackgroundActivityInvoker.ExtractActivityOutput method does and would allow me to use an implementation that instead returns all outputs (basically what I have in this PR). What do you think of this @sfmskywalker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, considering the bug is actually more about the logging of outputs in the ActivityExecutionRecord and not about the activity output register (although there is certainly overlap)... how about we use the activities specified persistence mode to determine whether to return the output from the background activity (also taking into account any outputs which are bound to WorkflowInstanceStorageDriver variables)? So, if an activity executed asynchronously, we would return outputs that are either configured to LogPersistenceMode.Include or bound to a WorkflowInstanceStorageDriver variable.

return activityOutputRegister.FindMany(a => a.ActivityInstanceId == activityExecutionContext.Id).ToDictionary(o => o.OutputName, o => o.Value!);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Elsa.Testing.Shared;
using Elsa.Workflows.IntegrationTests.Scenarios.JsonObjectToObjectRemainsJsonObject.Workflows;
using Microsoft.Extensions.DependencyInjection;
using Xunit.Abstractions;

Expand All @@ -22,7 +21,7 @@ public JsonObjectJintTests(ITestOutputHelper testOutputHelper)
public async Task Test1()
{
await _services.PopulateRegistriesAsync();
await _workflowRunner.RunAsync<TestWorkflow>();
await _workflowRunner.RunAsync<Workflows.TestWorkflow>();
var lines = _capturingTextWriter.Lines.ToList();
Assert.Equal(new[] { "Baz" }, lines);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Models;

namespace Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput.Activities;

[Activity("Elsa.Test", Kind = ActivityKind.Task)]
internal class SampleActivity : CodeActivity
{

public SampleActivity()
{
RunAsynchronously = true;
}

public Input<int>? Number1 { get; set; } = default;
public Input<int>? Number2 { get; set; } = default;
public Output<int>? Sum { get; set; } = default;
public Output<int>? Product { get; set; } = default;

protected override void Execute(ActivityExecutionContext context)
{
int number1 = context.Get(Number1);
int number2 = context.Get(Number2);

context.Set(Sum, number1 + number2);
context.Set(Product, number1 * number2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
using Elsa.Common.Services;
using Elsa.Extensions;
using Elsa.Testing.Shared;
using Elsa.Workflows.Activities;
using Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput.Activities;
using Elsa.Workflows.Memory;
using Elsa.Workflows.Notifications;
using Elsa.Workflows.Runtime.Distributed;
using Elsa.Workflows.Runtime.Entities;
using Elsa.Workflows.Runtime.Stores;
using Microsoft.Extensions.DependencyInjection;

namespace Elsa.Workflows.IntegrationTests.Scenarios.RunAsynchronousActivityOutput;

public class Tests
{
[Theory(DisplayName = "Activity outputs captured in activity execution record")]
[InlineData(true)]
[InlineData(false)]
public async Task ActivityOutputCaptureTest(bool runAsynchronously)
{
// Arrange

TestWorkflow workflow = new(workflowBuilder =>
{
workflowBuilder.DefinitionId = "sampleWorkflow";
workflowBuilder.Id = "sampleWorkflow";

var variable1 = new Variable<int>();
workflowBuilder.Root = new Sequence
{
Variables =
{
variable1
},
Activities =
{
new SampleActivity()
{
Id = "SampleActivity1",
RunAsynchronously = runAsynchronously,
Number1 = new(4),
Number2 = new(8),
Sum = new(variable1),
Product = null
}
}
};
}
);

var activityExecutionStore = new MemoryActivityExecutionStore(new MemoryStore<ActivityExecutionRecord>());

// Act

WorkflowFinished? workflowFinishedRecord = await workflow.DispatchWorkflowAndRunToCompletion(
configureElsa: elsa =>
{
elsa.UseWorkflowRuntime(workflowRuntime => {
workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore;
});
}
);

// Assert

Assert.NotNull(workflowFinishedRecord);
Assert.Equal(WorkflowStatus.Finished, workflowFinishedRecord.WorkflowState.Status);
Assert.Equal(WorkflowSubStatus.Finished, workflowFinishedRecord.WorkflowState.SubStatus);

var activityExecutionRecord = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity1" });
Assert.NotNull(activityExecutionRecord?.Outputs);
Assert.Equal(2, activityExecutionRecord.Outputs!.Count);
Assert.Equal(12, activityExecutionRecord.Outputs!.GetValue<int>("Sum"));
Assert.Equal(32, activityExecutionRecord.Outputs!.GetValue<int>("Product"));

var activityOutputRegister = workflowFinishedRecord.WorkflowExecutionContext.GetActivityOutputRegister();
Assert.Equal(2, activityOutputRegister.FindMany(p => true).Count());
Assert.Equal(12, activityOutputRegister.FindOutputByActivityId("SampleActivity1", "Sum"));
Assert.Equal(32, activityOutputRegister.FindOutputByActivityId("SampleActivity1", "Product"));
}

[Theory(DisplayName = "Activity outputs captured in activity execution record")]
[InlineData(true)]
[InlineData(false)]
public async Task ActivityOutputCaptureParallelTest(bool runAsynchronously)
{

// Arrange

TestWorkflow workflow = new(workflowBuilder =>
{
workflowBuilder.DefinitionId = "sampleWorkflow";
workflowBuilder.Id = "sampleWorkflow";

var variable1 = new Variable<int>();
var variable2 = new Variable<int>();
workflowBuilder.Root = new Elsa.Workflows.Activities.Parallel()
{
Variables =
{
variable1,
variable2,
},
Activities =
{
new SampleActivity()
{
Id = "SampleActivity1",
RunAsynchronously = runAsynchronously,
Number1 = new(4),
Number2 = new(8),
Sum = new(variable1),
},
new SampleActivity()
{
Id = "SampleActivity2",
RunAsynchronously = runAsynchronously,
Number1 = new(2),
Number2 = new(7),
Product = new(variable2),
}
}
};
}
);

var activityExecutionStore = new MemoryActivityExecutionStore(new MemoryStore<ActivityExecutionRecord>());

// Act

WorkflowFinished? workflowFinishedRecord = await workflow.DispatchWorkflowAndRunToCompletion(
configureServices: services =>
{
services.AddScoped<DistributedWorkflowRuntime>();
},
configureElsa: elsa =>
{
elsa.UseWorkflowRuntime(workflowRuntime => {
workflowRuntime.ActivityExecutionLogStore = sp => activityExecutionStore;
workflowRuntime.WorkflowRuntime = sp => sp.GetRequiredService<DistributedWorkflowRuntime>();
Copy link
Contributor Author

@bobhauser bobhauser Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a separate bug or not... but to be able to run multiple asynchronous activities in parallel (the only way it makes sense to me) the new DistributedWorkflowRuntime workflow runtime must be used. I don't believe that this was not the case with 3.2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was indeed intentional to require the DistributedWorkflowRuntime or ProtoActorWorkflowRuntime to have support for parallel activity execution. To be honest, I'd love it to be able to still have support for parallel activity execution using the LocalWorkflowRuntime. Originally, the DistributedWorkflowRuntime was created for hosting Elsa on multiple nodes, which requires distributed locking. But it's fair to expect that when running on a single node, you should still be able to run activities in parallel. Perhaps this could be added easily by updating the LocalWorkflowRuntime with a semaphore to synchronise access to the workflow instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it should be possible to run activities in parallel when running on a single node. I'll likely look at this as a separate PR.

Copy link
Contributor Author

@bobhauser bobhauser Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My solution for this (running activities in parallel running on a single node) was to use a memory based implementation of distributed lock, and then to just use DistributedWorkflowRuntime. The main snag I ran into when I tried a difference approach is that DistributedWorkflowClient internally uses LocalWorkflowClient. If you are interested in the memory based implementation of distributed lock let me know, otherwise I'll just keep it local.

});
}
);

// Assert

Assert.NotNull(workflowFinishedRecord);
Assert.Equal(WorkflowStatus.Finished, workflowFinishedRecord.WorkflowState.Status);
Assert.Equal(WorkflowSubStatus.Finished, workflowFinishedRecord.WorkflowState.SubStatus);

var activityExecutionRecord1 = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity1" });
Assert.NotNull(activityExecutionRecord1?.Outputs);
Assert.Equal(2, activityExecutionRecord1.Outputs!.Count);
Assert.Equal(12, activityExecutionRecord1.Outputs!.GetValue<int>("Sum"));
Assert.Equal(32, activityExecutionRecord1.Outputs!.GetValue<int>("Product"));

var activityExecutionRecord2 = await activityExecutionStore.FindAsync(new() { ActivityId = "SampleActivity2" });
Assert.NotNull(activityExecutionRecord2?.Outputs);
Assert.Equal(2, activityExecutionRecord2.Outputs!.Count);
Assert.Equal(9, activityExecutionRecord2.Outputs!.GetValue<int>("Sum"));
Assert.Equal(14, activityExecutionRecord2.Outputs!.GetValue<int>("Product"));
}
}
Loading