Skip to content

Commit

Permalink
Merge branch 'elsa-workflows:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
SergerGood authored Dec 16, 2024
2 parents 311b792 + 5d92c57 commit 8583da9
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Elsa.Retention.Contracts;
using Elsa.Workflows.Management;
using Elsa.Workflows.Management.Entities;
using Elsa.Workflows.Management.Filters;

namespace Elsa.Retention.CleanupStrategies;

/// <summary>
/// Deletes the workflow instance.
/// </summary>
public class DeleteWorkflowInstanceStrategy(IWorkflowInstanceStore store) : IDeletionCleanupStrategy<WorkflowInstance>
{
public async Task Cleanup(ICollection<WorkflowInstance> collection)
{
await store.DeleteAsync(new WorkflowInstanceFilter
{
Ids = collection.Select(x => x.Id).ToArray()
});
}
}
2 changes: 2 additions & 0 deletions src/modules/Elsa.Retention/Feature/RetentionFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Elsa.Retention.Extensions;
using Elsa.Retention.Jobs;
using Elsa.Retention.Options;
using Elsa.Workflows.Management.Entities;
using Elsa.Workflows.Runtime.Entities;
using Microsoft.Extensions.DependencyInjection;

Expand Down Expand Up @@ -44,6 +45,7 @@ public override void Apply()
Services.AddScoped<IDeletionCleanupStrategy<StoredBookmark>, DeleteBookmarkStrategy>();
Services.AddScoped<IDeletionCleanupStrategy<ActivityExecutionRecord>, DeleteActivityExecutionRecordStrategy>();
Services.AddScoped<IDeletionCleanupStrategy<WorkflowExecutionLogRecord>, DeleteWorkflowExecutionRecordStrategy>();
Services.AddScoped<IDeletionCleanupStrategy<WorkflowInstance>, DeleteWorkflowInstanceStrategy>();

Services.AddScoped<IRelatedEntityCollector, BookmarkCollector>();
Services.AddScoped<IRelatedEntityCollector, ActivityExecutionRecordCollector>();
Expand Down
17 changes: 13 additions & 4 deletions src/modules/Elsa.Retention/Jobs/CleanupJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class CleanupJob(
/// <param name="cancellationToken"></param>
public async Task ExecuteAsync(CancellationToken cancellationToken = default)
{
Console.WriteLine(DateTime.Now.ToLongTimeString());
var collectors = GetServices(typeof(IRelatedEntityCollector), typeof(IRelatedEntityCollector<>));
var deletedWorkflowInstances = 0L;

Expand All @@ -51,6 +52,10 @@ public async Task ExecuteAsync(CancellationToken cancellationToken = default)
foreach (var collectorService in collectors)
{
var cleanupStrategyConcreteType = policy.CleanupStrategy.MakeGenericType(collectorService.Key);

if(cleanupStrategyConcreteType == typeof(WorkflowInstance))
continue;

var collector = collectorService.Value as IRelatedEntityCollector;
var cleanupService = serviceProvider.GetService(cleanupStrategyConcreteType) as ICleanupStrategy;

Expand All @@ -71,11 +76,15 @@ public async Task ExecuteAsync(CancellationToken cancellationToken = default)
await cleanupService.Cleanup(entities);
}
}

var cleanupWorkflowInstances = policy.CleanupStrategy.MakeGenericType(typeof(WorkflowInstance));
var workflowInstanceCleaner = serviceProvider.GetService(cleanupWorkflowInstances) as ICleanupStrategy<WorkflowInstance>;

deletedWorkflowInstances += await workflowInstanceStore.DeleteAsync(new WorkflowInstanceFilter
{
Ids = page.Items.Select(x => x.Id).ToArray()
}, cancellationToken);
if (workflowInstanceCleaner == null)
throw new Exception($"{policy.CleanupStrategy} has no strategy to clean WorkflowInstances");

await workflowInstanceCleaner.Cleanup(page.Items);
deletedWorkflowInstances += page.Items.Count;

if (page.TotalCount <= page.Items.Count + pageArgs.Offset)
{
Expand Down
33 changes: 2 additions & 31 deletions src/modules/Elsa.Workflows.Core/Activities/Fork.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Collections.Immutable;
using System.ComponentModel;
using System.Runtime.CompilerServices;
using Elsa.Extensions;
using Elsa.Workflows.Attributes;
using Elsa.Workflows.Signals;
using Elsa.Workflows.UIHints;
Expand All @@ -18,7 +17,7 @@ namespace Elsa.Workflows.Activities;
public class Fork : Activity
{
/// <inheritdoc />
public Fork([CallerFilePath] string? source = default, [CallerLineNumber] int? line = default) : base(source, line)
public Fork([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
{
// Handle break signals directly instead of using the BreakBehavior. The behavior stops propagation of the signal, which is not what we want.
OnSignalReceived<BreakSignal>(OnBreakSignalReceived);
Expand Down Expand Up @@ -48,13 +47,7 @@ private async ValueTask CompleteChildAsync(ActivityCompletedContext context)

if (isBreaking)
{
// Remove all bookmarks from other branches.
RemoveBookmarks(targetContext);

// Signal activity completion.
await CompleteAsync(targetContext);

// Exit.
return;
}

Expand All @@ -75,37 +68,15 @@ private async ValueTask CompleteChildAsync(ActivityCompletedContext context)
switch (joinMode)
{
case ForkJoinMode.WaitAny:
{
// Remove all bookmarks from other branches.
RemoveBookmarks(targetContext);

// Signal activity completion.
await CompleteAsync(targetContext);
}
break;
case ForkJoinMode.WaitAll:
{
var allSet = allChildActivityIds.All(x => completedActivityIds.Contains(x));

if (allSet)
// Signal activity completion.
await CompleteAsync(targetContext);
}
if (allSet) await CompleteAsync(targetContext);
break;
}
}

private void RemoveBookmarks(ActivityExecutionContext context)
{
// Find all descendants for each branch and remove them as well as any associated bookmarks.
var workflowExecutionContext = context.WorkflowExecutionContext;
var forkNode = context.ActivityNode;
var branchNodes = forkNode.Children;
var branchDescendantActivityIds = branchNodes.SelectMany(x => x.Flatten()).Select(x => x.Activity.Id).ToHashSet();

workflowExecutionContext.Bookmarks.RemoveWhere(x => branchDescendantActivityIds.Contains(x.ActivityId));
}

private void OnBreakSignalReceived(BreakSignal signal, SignalContext signalContext)
{
signalContext.ReceiverActivityExecutionContext.SetIsBreaking();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@ private async Task CancelActivityAsync()
ClearBookmarks();
ClearCompletionCallbacks();
WorkflowExecutionContext.Bookmarks.RemoveWhere(x => x.ActivityNodeId == NodeId);

// Add an execution log entry.
AddExecutionLogEntry("Canceled", payload: JournalData);

await this.SendSignalAsync(new CancelSignal());
await CancelChildActivitiesAsync();

// ReSharper disable once MethodSupportsCancellation
await _publisher.SendAsync(new ActivityCancelled(this));
}

private async Task CancelChildActivitiesAsync()
{
var childContexts = WorkflowExecutionContext.ActivityExecutionContexts.Where(x => x.ParentActivityExecutionContext == this && x.CanCancelActivity()).ToList();

foreach (var childContext in childContexts)
await childContext.CancelActivityAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ public BulkDispatchWorkflowsTests(App app) : base(app)
_signalManager = Scope.ServiceProvider.GetRequiredService<SignalManager>();
}

/// <summary>
/// Dispatches and waits for child workflows to complete.
/// </summary>
[Fact]
public async Task DispatchAndWaitWorkflow_ShouldWaitForChildWorkflowToComplete()
{
var workflowClient = await _workflowRuntime.CreateClientAsync();
await workflowClient.CreateInstanceAsync(new CreateWorkflowInstanceRequest
{
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionId(GreetEmployeesWorkflow.DefinitionId, VersionOptions.Published)
});
await workflowClient.RunInstanceAsync(RunWorkflowInstanceRequest.Empty);
await _signalManager.WaitAsync<string>("Completed");
}
// /// <summary>
// /// Dispatches and waits for child workflows to complete.
// /// </summary>
// [Fact(Skip = "This test is flaky and needs to be fixed.")]
// public async Task DispatchAndWaitWorkflow_ShouldWaitForChildWorkflowToComplete()
// {
// var workflowClient = await _workflowRuntime.CreateClientAsync();
// await workflowClient.CreateInstanceAsync(new CreateWorkflowInstanceRequest
// {
// WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionId(GreetEmployeesWorkflow.DefinitionId, VersionOptions.Published)
// });
// await workflowClient.RunInstanceAsync(RunWorkflowInstanceRequest.Empty);
// await _signalManager.WaitAsync<string>("Completed");
// }

/// <summary>
/// Individual items are sent as input to child workflows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@ namespace Elsa.Workflows.ComponentTests.Scenarios.DispatchWorkflows;

public class DispatchWorkflowsTests : AppComponentTest
{
private readonly WorkflowEvents _workflowEvents;
private readonly SignalManager _signalManager;
private readonly IWorkflowRuntime _workflowRuntime;

private readonly object _childWorkflowCompletedSignal = new();
private readonly object _parentWorkflowCompletedSignal = new();

public DispatchWorkflowsTests(App app) : base(app)
{
_workflowRuntime = Scope.ServiceProvider.GetRequiredService<IWorkflowRuntime>();
_workflowEvents = Scope.ServiceProvider.GetRequiredService<WorkflowEvents>();
_signalManager = Scope.ServiceProvider.GetRequiredService<SignalManager>();
_workflowEvents.WorkflowInstanceSaved += OnWorkflowInstanceSaved;
}

[Fact]
[Fact (Skip = "This test is flaky and needs to be fixed.")]
public async Task DispatchAndWaitWorkflow_ShouldWaitForChildWorkflowToComplete()
{
var workflowClient = await _workflowRuntime.CreateClientAsync();
Expand All @@ -37,27 +31,6 @@ await workflowClient.CreateInstanceAsync(new CreateWorkflowInstanceRequest
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionId(DispatchAndWaitWorkflow.DefinitionId, VersionOptions.Published)
});
await workflowClient.RunInstanceAsync(RunWorkflowInstanceRequest.Empty);
var childWorkflowInstanceArgs = await _signalManager.WaitAsync<WorkflowInstanceSavedEventArgs>(_childWorkflowCompletedSignal);
var parentWorkflowInstanceArgs = await _signalManager.WaitAsync<WorkflowInstanceSavedEventArgs>(_parentWorkflowCompletedSignal);

Assert.Equal(WorkflowStatus.Finished, childWorkflowInstanceArgs.WorkflowInstance.Status);
Assert.Equal(WorkflowStatus.Finished, parentWorkflowInstanceArgs.WorkflowInstance.Status);
}

private void OnWorkflowInstanceSaved(object? sender, WorkflowInstanceSavedEventArgs e)
{
if (e.WorkflowInstance.Status != WorkflowStatus.Finished)
return;

if (e.WorkflowInstance.DefinitionId == ChildWorkflow.DefinitionId)
_signalManager.Trigger(_childWorkflowCompletedSignal, e);

if (e.WorkflowInstance.DefinitionId == DispatchAndWaitWorkflow.DefinitionId)
_signalManager.Trigger(_parentWorkflowCompletedSignal, e);
}

protected override void OnDispose()
{
_workflowEvents.WorkflowInstanceSaved -= OnWorkflowInstanceSaved;
await _signalManager.WaitAsync<string>("Completed");
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Testing.Shared.Activities;
using Elsa.Workflows.Activities;
using Elsa.Workflows.Runtime.Activities;

Expand All @@ -17,7 +18,8 @@ protected override void Build(IWorkflowBuilder builder)
{
WorkflowDefinitionId = new(ChildWorkflow.DefinitionId),
WaitForCompletion = new (true)
}
},
new TriggerSignal("Completed")
}
};
}
Expand Down

0 comments on commit 8583da9

Please sign in to comment.