Skip to content

Commit

Permalink
Merge pull request #459 from serverlessworkflow/fix-try-catch-context…
Browse files Browse the repository at this point in the history
…-handling

Fix try catch context handling
  • Loading branch information
cdavernas authored Dec 5, 2024
2 parents 7fd2983 + 9951627 commit 40914e3
Show file tree
Hide file tree
Showing 16 changed files with 122 additions and 67 deletions.
24 changes: 16 additions & 8 deletions src/api/Synapse.Api.Http/ClusterResourceController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string? labelSel
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down Expand Up @@ -149,12 +153,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, Ca
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down
26 changes: 16 additions & 10 deletions src/api/Synapse.Api.Http/NamespacedResourceController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.ResourceOriented;

namespace Synapse.Api.Http;

/// <summary>
Expand Down Expand Up @@ -164,12 +162,16 @@ public virtual async Task<IActionResult> WatchResourcesUsingSSE(string @namespac
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when(ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down Expand Up @@ -206,12 +208,16 @@ public virtual async Task<IActionResult> MonitorResourceUsingSSE(string name, st
this.Response.Headers.CacheControl = "no-cache";
this.Response.Headers.Connection = "keep-alive";
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach(var e in response.Data!.WithCancellation(cancellationToken))
try
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
await foreach (var e in response.Data!.WithCancellation(cancellationToken))
{
var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n";
await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false);
await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { }
return this.Ok();
}

Expand Down
6 changes: 3 additions & 3 deletions src/api/Synapse.Api.Http/Synapse.Api.Http.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.0" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.0.0" />
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.2" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.2" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/api/Synapse.Api.Server/Synapse.Api.Server.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<PackageReference Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="9.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@

<ItemGroup>
<PackageReference Include="IdentityModel" Version="7.0.0" />
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.2.0" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.16.0" />
<PackageReference Include="Neuroglia.Data.Infrastructure.Redis" Version="4.16.0" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.16.0" />
<PackageReference Include="Neuroglia.Mediation" Version="4.16.0" />
<PackageReference Include="Neuroglia.Plugins" Version="4.16.0" />
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.3.0" />
<PackageReference Include="Neuroglia.Data.Expressions.Abstractions" Version="4.16.2" />
<PackageReference Include="Neuroglia.Data.Infrastructure.Redis" Version="4.16.2" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented.Redis" Version="4.16.2" />
<PackageReference Include="Neuroglia.Mediation" Version="4.16.2" />
<PackageReference Include="Neuroglia.Plugins" Version="4.16.2" />
<PackageReference Include="ServerlessWorkflow.Sdk.IO" Version="1.0.0-alpha5.1" />
</ItemGroup>

Expand Down
4 changes: 2 additions & 2 deletions src/core/Synapse.Core/Synapse.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
<ItemGroup>
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
<PackageReference Include="KubernetesClient" Version="15.0.1" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.16.0" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.16.0" />
<PackageReference Include="Neuroglia.Data.Infrastructure.ResourceOriented" Version="4.16.2" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents" Version="4.16.2" />
<PackageReference Include="Semver" Version="3.0.0" />
<PackageReference Include="ServerlessWorkflow.Sdk" Version="1.0.0-alpha5.1" />
</ItemGroup>
Expand Down
16 changes: 8 additions & 8 deletions src/correlator/Synapse.Correlator/Synapse.Correlator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
<PackageReference Include="Microsoft.Extensions.Configuration.KeyPerFile" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.16.0" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.16.0" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.AspNetCore" Version="4.16.0" />
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.0" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.16.0" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.0.0" />
<PackageReference Include="Neuroglia.Data.Expressions.JavaScript" Version="4.16.2" />
<PackageReference Include="Neuroglia.Data.Expressions.JQ" Version="4.16.2" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.AspNetCore" Version="4.16.2" />
<PackageReference Include="Neuroglia.Mediation.AspNetCore" Version="4.16.2" />
<PackageReference Include="Neuroglia.Eventing.CloudEvents.Infrastructure" Version="4.16.2" />
<PackageReference Include="Neuroglia.Security.AspNetCore" Version="4.16.2" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.1.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="7.1.0" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Blazor.Bootstrap" Version="3.1.1" />
<PackageReference Include="Blazor.Bootstrap" Version="3.2.0" />
<PackageReference Include="BlazorMonaco" Version="3.2.0" />
<PackageReference Include="IdentityModel" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Authentication" Version="9.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.DevServer" Version="9.0.0" PrivateAssets="all" />
<PackageReference Include="moment.net" Version="1.3.4" />
<PackageReference Include="Neuroglia.Blazor.Dagre" Version="4.16.0" />
<PackageReference Include="Neuroglia.Blazor.Dagre" Version="4.16.2" />
</ItemGroup>

<ItemGroup>
Expand Down
3 changes: 2 additions & 1 deletion src/operator/Synapse.Operator/Services/WorkflowController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public override async Task StartAsync(CancellationToken cancellationToken)
this.Operator!.Select(b => b.Resource.Spec.Selector).DistinctUntilChanged().SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Updated)
.Select(e => new { Workflow = e.Resource, e.Resource.Spec.Versions })
.DistinctUntilChanged()
.DistinctUntilChanged(s => s.Versions)
.Scan((Previous: (EquatableList<WorkflowDefinition>)null!, Current: (EquatableList<WorkflowDefinition>)null!, Workflow: (Workflow)null!), (accumulator, current) => (accumulator.Current, current.Versions, current.Workflow))
.SubscribeAsync(async value => await this.OnWorkflowVersionChangedAsync(value.Workflow, value.Previous, value.Current).ConfigureAwait(false), cancellationToken: cancellationToken);
await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false);
Expand Down Expand Up @@ -190,6 +190,7 @@ protected virtual async Task OnWorkflowVersionChangedAsync(Workflow workflow, Eq
if (workflow.Metadata.Labels == null || !workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out _)) if (!await this.TryClaimAsync(workflow, this.CancellationTokenSource.Token).ConfigureAwait(false)) return;
if (workflow.Metadata.Labels?[SynapseDefaults.Resources.Labels.Operator] != this.Operator.Resource.GetQualifiedName()) return;
var diffPatch = JsonPatchUtility.CreateJsonPatchFromDiff(previous, current);
if (diffPatch.Operations.Count < 1) return;
var operation = diffPatch.Operations[0].Op;
if (this.Schedulers.TryRemove(workflow.GetQualifiedName(), out var scheduler)) await scheduler.DisposeAsync().ConfigureAwait(false);
if (operation == OperationType.Remove) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using Neuroglia.Data.Infrastructure.Services;

namespace Synapse.Operator.Services;

/// <summary>
Expand All @@ -21,7 +23,8 @@ namespace Synapse.Operator.Services;
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController)
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository<Document, string> documents)
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
{

Expand All @@ -35,6 +38,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
/// </summary>
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;

/// <summary>
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
/// </summary>
protected IRepository<Document, string> Documents => documents;

/// <summary>
/// Gets a <see cref="ConcurrentDictionary{TKey, TValue}"/> that contains current <see cref="WorkflowInstanceHandler"/>es
/// </summary>
Expand Down Expand Up @@ -139,24 +147,54 @@ public override async Task StopAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
{
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
try
{
await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return;
var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
await handler.HandleAsync(cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

/// <inheritdoc/>
protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default)
{
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
try
{
await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false);
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
{
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
};
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
};
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
}
if (workflowInstance.Status != null)
{
var documentReferences = new List<string>();
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ContextReference)) documentReferences.Add(workflowInstance.Status.ContextReference);
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.OutputReference)) documentReferences.Add(workflowInstance.Status.OutputReference);
if (workflowInstance.Status.Tasks != null)
{
foreach (var task in workflowInstance.Status.Tasks)
{
if (!string.IsNullOrWhiteSpace(task.ContextReference)) documentReferences.Add(task.ContextReference);
if (!string.IsNullOrWhiteSpace(task.InputReference)) documentReferences.Add(task.InputReference);
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
}
}
foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
}
}
catch(Exception ex)
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/operator/Synapse.Operator/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"Runtime": {
"Native": {
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net8.0\\",
"Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net9.0\\",
"Executable": "Synapse.Runner.exe"
}
}
Expand Down
Loading

0 comments on commit 40914e3

Please sign in to comment.