Skip to content

Commit

Permalink
Merge pull request #406 from serverlessworkflow/fix-ignore-skipped-ta…
Browse files Browse the repository at this point in the history
…sk-directive

Fixed the WorkflowExecutor and DoTaskExecutor to ignore the flow directive of skipped tasks
  • Loading branch information
cdavernas authored Sep 11, 2024
2 parents 2b7a777 + 185bcaa commit 761bc43
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static class WorkflowDefinitionExtensions
public static MapEntry<string, TaskDefinition>? GetTaskAfter(this WorkflowDefinition workflow, TaskInstance after)
{
ArgumentNullException.ThrowIfNull(after);
switch (after.Next)
switch (after.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : after.Next)
{
case FlowDirective.Continue:
var afterTask = workflow.Do[after.Name!];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,19 @@ protected virtual async Task OnSubTaskFaultAsync(ITaskExecutor executor, Cancell
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnSubtaskCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(executor);
var last = executor.Task.Instance;
var output = executor.Task.Output!;
var nextDefinition = this.Task.Definition.Do.GetTaskAfter(last);
this.Executors.Remove(executor);
if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false);
if (nextDefinition == null || nextDefinition.Value == null)
{
await this.SetResultAsync(output, last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
await this.SetResultAsync(output, last.Status != TaskInstanceStatus.Skipped && last.Next == FlowDirective.End ? FlowDirective.End : this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
return;
}
var nextDefinitionIndex = this.Task.Definition.Do.Keys.ToList().IndexOf(nextDefinition.Key);
TaskInstance next;
switch (executor.Task.Instance.Next)
switch (executor.Task.Instance.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : executor.Task.Instance.Next)
{
case FlowDirective.End:
await this.SetResultAsync(output, FlowDirective.End, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
this.OpenApi = (OpenApiCallDefinition)this.JsonSerializer.Convert(this.Task.Definition.With, typeof(OpenApiCallDefinition))!;
using var httpClient = this.HttpClientFactory.CreateClient();
await httpClient.ConfigureAuthenticationAsync(this.Task.Workflow.Definition, this.OpenApi.Document.Endpoint.Authentication, this.ServiceProvider, cancellationToken).ConfigureAwait(false);
var uri = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary());
if (uri.IsRuntimeExpression()) uri = await this.Task.Workflow.Expressions.EvaluateAsync<string>(uri, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
using var request = new HttpRequestMessage(HttpMethod.Get, uri);
var uriString = StringFormatter.NamedFormat(this.OpenApi.Document.EndpointUri.OriginalString, this.Task.Input.ToDictionary());
if (uriString.IsRuntimeExpression()) uriString = await this.Task.Workflow.Expressions.EvaluateAsync<string>(uriString, this.Task.Input, this.GetExpressionEvaluationArguments(), cancellationToken).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(uriString)) throw new NullReferenceException("The OpenAPI endpoint URI cannot be null or whitespace");
if (!Uri.TryCreate(uriString, UriKind.RelativeOrAbsolute, out var uri) || uri == null) throw new Exception($"Failed to parse the specified string '{uriString}' into a new URI");
using var request = new HttpRequestMessage(HttpMethod.Get, uriString);
using var response = await httpClient.SendAsync(request, cancellationToken).ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
var responseContent = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);
this.Logger.LogInformation("Failed to retrieve the OpenAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", this.OpenApi.Document.EndpointUri, response.StatusCode);
this.Logger.LogInformation("Failed to retrieve the OpenAPI document at location '{uri}'. The remote server responded with a non-success status code '{statusCode}'.", uri, response.StatusCode);
this.Logger.LogDebug("Response content:\r\n{responseContent}", responseContent ?? "None");
response.EnsureSuccessStatusCode();
}
Expand All @@ -125,11 +127,11 @@ protected override async Task DoInitializeAsync(CancellationToken cancellationTo
var operation = this.Document.Paths
.SelectMany(p => p.Value.Operations)
.FirstOrDefault(o => o.Value.OperationId == this.OpenApi.OperationId);
if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{this.OpenApi.OperationId}' in OpenAPI document at '{this.OpenApi.Document.EndpointUri}'");
if (operation.Value == null) throw new NullReferenceException($"Failed to find an operation with id '{this.OpenApi.OperationId}' in OpenAPI document at '{uri}'");
this.HttpMethod = operation.Key.ToHttpMethod();
this.Operation = operation.Value;
this.Servers = this.Document.Servers.Select(s => s.Url).ToList();
if (this.Servers.Count == 0) this.Servers.Add(this.OpenApi.Document.EndpointUri.OriginalString.Replace(this.OpenApi.Document.EndpointUri.PathAndQuery, string.Empty));
if (this.Servers.Count == 0) this.Servers.Add(uri.OriginalString.Replace(uri.PathAndQuery, string.Empty));
var path = this.Document.Paths.Single(p => p.Value.Operations.Any(o => o.Value.OperationId == operation.Value.OperationId));
this.Path = path.Key;
await this.BuildParametersAsync(cancellationToken).ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion src/runner/Synapse.Runner/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected virtual async Task OnTaskFaultedAsync(ITaskExecutor executor, Cancella
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnTaskCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken)
{
var nextDefinition = executor.Task.Instance.Next switch
var nextDefinition = (executor.Task.Instance.Status == TaskInstanceStatus.Skipped ? FlowDirective.Continue : executor.Task.Instance.Next) switch
{
FlowDirective.End or FlowDirective.Exit => null,
_ => this.Workflow.Definition.GetTaskAfter(executor.Task.Instance)
Expand Down

0 comments on commit 761bc43

Please sign in to comment.