diff --git a/src/api/Synapse.Api.Http/ClusterResourceController.cs b/src/api/Synapse.Api.Http/ClusterResourceController.cs index d7c985964..4f0b26b66 100644 --- a/src/api/Synapse.Api.Http/ClusterResourceController.cs +++ b/src/api/Synapse.Api.Http/ClusterResourceController.cs @@ -109,12 +109,16 @@ public virtual async Task WatchResourcesUsingSSE(string? labelSel this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + try { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; - await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); - await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + { + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; + await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + } } + catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { } return this.Ok(); } @@ -149,12 +153,16 @@ public virtual async Task MonitorResourceUsingSSE(string name, Ca this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + try { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; - await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); - await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + { + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; + await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + } } + catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { } return this.Ok(); } diff --git a/src/api/Synapse.Api.Http/NamespacedResourceController.cs b/src/api/Synapse.Api.Http/NamespacedResourceController.cs index f9bad3771..9c38896d0 100644 --- a/src/api/Synapse.Api.Http/NamespacedResourceController.cs +++ b/src/api/Synapse.Api.Http/NamespacedResourceController.cs @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -using Neuroglia.Data.Infrastructure.ResourceOriented; - namespace Synapse.Api.Http; /// @@ -164,12 +162,16 @@ public virtual async Task WatchResourcesUsingSSE(string @namespac this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + try { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; - await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); - await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + { + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; + await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + } } + catch (Exception ex) when(ex is TaskCanceledException || ex is OperationCanceledException) { } return this.Ok(); } @@ -206,12 +208,16 @@ public virtual async Task MonitorResourceUsingSSE(string name, st this.Response.Headers.CacheControl = "no-cache"; this.Response.Headers.Connection = "keep-alive"; await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); - await foreach(var e in response.Data!.WithCancellation(cancellationToken)) + try { - var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; - await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); - await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + await foreach (var e in response.Data!.WithCancellation(cancellationToken)) + { + var sseMessage = $"data: {this.JsonSerializer.SerializeToText(e)}\n\n"; + await this.Response.Body.WriteAsync(Encoding.UTF8.GetBytes(sseMessage), cancellationToken).ConfigureAwait(false); + await this.Response.Body.FlushAsync(cancellationToken).ConfigureAwait(false); + } } + catch (Exception ex) when (ex is TaskCanceledException || ex is OperationCanceledException) { } return this.Ok(); } diff --git a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj index c33ae9ac4..769e4a1b8 100644 --- a/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj +++ b/src/api/Synapse.Api.Http/Synapse.Api.Http.csproj @@ -43,9 +43,9 @@ - - - + + + diff --git a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj index 18c7c68d6..9a4087b57 100644 --- a/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj +++ b/src/api/Synapse.Api.Server/Synapse.Api.Server.csproj @@ -35,7 +35,7 @@ - + diff --git a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj index 08d09127e..9f22ba4fc 100644 --- a/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj +++ b/src/core/Synapse.Core.Infrastructure/Synapse.Core.Infrastructure.csproj @@ -44,12 +44,12 @@ - - - - - - + + + + + + diff --git a/src/core/Synapse.Core/Synapse.Core.csproj b/src/core/Synapse.Core/Synapse.Core.csproj index 7c3daa1da..55f54259e 100644 --- a/src/core/Synapse.Core/Synapse.Core.csproj +++ b/src/core/Synapse.Core/Synapse.Core.csproj @@ -66,8 +66,8 @@ - - + + diff --git a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj index 695f1c4a6..0f78b1ae6 100644 --- a/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj +++ b/src/correlator/Synapse.Correlator/Synapse.Correlator.csproj @@ -36,14 +36,14 @@ - - - - - - - - + + + + + + + + diff --git a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj index e8f75430b..3b5bd5242 100644 --- a/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj +++ b/src/dashboard/Synapse.Dashboard/Synapse.Dashboard.csproj @@ -10,14 +10,14 @@ - + - + diff --git a/src/operator/Synapse.Operator/Services/WorkflowController.cs b/src/operator/Synapse.Operator/Services/WorkflowController.cs index b5976828e..651101c30 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowController.cs @@ -55,7 +55,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) this.Operator!.Select(b => b.Resource.Spec.Selector).DistinctUntilChanged().SubscribeAsync(this.OnResourceSelectorChangedAsync, cancellationToken: cancellationToken); this.Where(e => e.Type == ResourceWatchEventType.Updated) .Select(e => new { Workflow = e.Resource, e.Resource.Spec.Versions }) - .DistinctUntilChanged() + .DistinctUntilChanged(s => s.Versions) .Scan((Previous: (EquatableList)null!, Current: (EquatableList)null!, Workflow: (Workflow)null!), (accumulator, current) => (accumulator.Current, current.Versions, current.Workflow)) .SubscribeAsync(async value => await this.OnWorkflowVersionChangedAsync(value.Workflow, value.Previous, value.Current).ConfigureAwait(false), cancellationToken: cancellationToken); await this.OnResourceSelectorChangedAsync(this.Operator!.Resource.Spec.Selector).ConfigureAwait(false); @@ -190,6 +190,7 @@ protected virtual async Task OnWorkflowVersionChangedAsync(Workflow workflow, Eq if (workflow.Metadata.Labels == null || !workflow.Metadata.Labels.TryGetValue(SynapseDefaults.Resources.Labels.Operator, out _)) if (!await this.TryClaimAsync(workflow, this.CancellationTokenSource.Token).ConfigureAwait(false)) return; if (workflow.Metadata.Labels?[SynapseDefaults.Resources.Labels.Operator] != this.Operator.Resource.GetQualifiedName()) return; var diffPatch = JsonPatchUtility.CreateJsonPatchFromDiff(previous, current); + if (diffPatch.Operations.Count < 1) return; var operation = diffPatch.Operations[0].Op; if (this.Schedulers.TryRemove(workflow.GetQualifiedName(), out var scheduler)) await scheduler.DisposeAsync().ConfigureAwait(false); if (operation == OperationType.Remove) return; diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs index 69ad83104..60c97a5fa 100644 --- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs +++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using Neuroglia.Data.Infrastructure.Services; + namespace Synapse.Operator.Services; /// @@ -21,7 +23,8 @@ namespace Synapse.Operator.Services; /// The service used to access the current /// The service used to manage s /// The service used to access the current -public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorController) +/// The used to manage s +public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IRepository documents) : ResourceController(loggerFactory, controllerOptions, repository) { @@ -35,6 +38,11 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge /// protected IResourceMonitor Operator => operatorController.Operator; + /// + /// Gets the used to manage s + /// + protected IRepository Documents => documents; + /// /// Gets a that contains current es /// @@ -139,24 +147,54 @@ public override async Task StopAsync(CancellationToken cancellationToken) /// protected override async Task OnResourceCreatedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default) { - await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); - if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return; - var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false); - await handler.HandleAsync(cancellationToken).ConfigureAwait(false); + try + { + await base.OnResourceCreatedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + if (!await this.TryClaimAsync(workflowInstance, cancellationToken).ConfigureAwait(false)) return; + var handler = await this.CreateWorkflowInstanceHandlerAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + await handler.HandleAsync(cancellationToken).ConfigureAwait(false); + } + catch(Exception ex) + { + this.Logger.LogError("An error occured while handling the creation of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex); + } } /// protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowInstance, CancellationToken cancellationToken = default) { - await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); - if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false); - var selectors = new LabelSelector[] + try { + await base.OnResourceDeletedAsync(workflowInstance, cancellationToken).ConfigureAwait(false); + if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false); + var selectors = new LabelSelector[] + { new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName()) - }; - await foreach (var correlation in this.Repository.GetAllAsync(null, selectors, cancellationToken: cancellationToken)) + }; + await foreach (var correlation in this.Repository.GetAllAsync(null, selectors, cancellationToken: cancellationToken)) + { + await this.Repository.RemoveAsync(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false); + } + if (workflowInstance.Status != null) + { + var documentReferences = new List(); + if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ContextReference)) documentReferences.Add(workflowInstance.Status.ContextReference); + if (!string.IsNullOrWhiteSpace(workflowInstance.Status.OutputReference)) documentReferences.Add(workflowInstance.Status.OutputReference); + if (workflowInstance.Status.Tasks != null) + { + foreach (var task in workflowInstance.Status.Tasks) + { + if (!string.IsNullOrWhiteSpace(task.ContextReference)) documentReferences.Add(task.ContextReference); + if (!string.IsNullOrWhiteSpace(task.InputReference)) documentReferences.Add(task.InputReference); + if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference); + } + } + foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false); + } + } + catch(Exception ex) { - await this.Repository.RemoveAsync(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false); + this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex); } } diff --git a/src/operator/Synapse.Operator/appsettings.Development.json b/src/operator/Synapse.Operator/appsettings.Development.json index 60cfec5b3..5bdc5c484 100644 --- a/src/operator/Synapse.Operator/appsettings.Development.json +++ b/src/operator/Synapse.Operator/appsettings.Development.json @@ -16,7 +16,7 @@ }, "Runtime": { "Native": { - "Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net8.0\\", + "Directory": "..\\..\\..\\..\\..\\runner\\Synapse.Runner\\bin\\Debug\\net9.0\\", "Executable": "Synapse.Runner.exe" } } diff --git a/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs b/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs index 0186c153d..44f2b9fa5 100644 --- a/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs +++ b/src/runner/Synapse.Runner/Services/Executors/TryTaskExecutor.cs @@ -158,6 +158,7 @@ protected virtual async Task OnTryFaultedAsync(ITaskExecutor executor, Exception protected virtual async Task OnTryCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(executor); + if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false); var last = executor.Task.Instance; var output = executor.Task.Output!; this.Executors.Remove(executor); @@ -188,6 +189,7 @@ protected virtual async Task OnHandlerFaultAsync(ITaskExecutor executor, Excepti protected virtual async Task OnHandlerCompletedAsync(ITaskExecutor executor, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(executor); + if (this.Task.ContextData != executor.Task.ContextData) await this.Task.SetContextDataAsync(executor.Task.ContextData, cancellationToken).ConfigureAwait(false); var last = executor.Task.Instance; var output = executor.Task.Output!; this.Executors.Remove(executor); diff --git a/src/runner/Synapse.Runner/Synapse.Runner.csproj b/src/runner/Synapse.Runner/Synapse.Runner.csproj index 9d36430f9..f1157e34e 100644 --- a/src/runner/Synapse.Runner/Synapse.Runner.csproj +++ b/src/runner/Synapse.Runner/Synapse.Runner.csproj @@ -52,7 +52,7 @@ - + @@ -60,11 +60,11 @@ - - - - - + + + + + diff --git a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj index e358fce95..b5efd4e24 100644 --- a/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj +++ b/tests/Synapse.IntegrationTests/Synapse.IntegrationTests.csproj @@ -14,9 +14,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + diff --git a/tests/Synapse.UnitTests/Cases/Runtime/NativeRuntimeTests.cs b/tests/Synapse.UnitTests/Cases/Runtime/NativeRuntimeTests.cs index 6dab2a37e..96fd043b6 100644 --- a/tests/Synapse.UnitTests/Cases/Runtime/NativeRuntimeTests.cs +++ b/tests/Synapse.UnitTests/Cases/Runtime/NativeRuntimeTests.cs @@ -31,7 +31,7 @@ protected override void ConfigureServices(IServiceCollection services) }; options.Runtime.Native = new() { - Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net8.0"), + Directory = Path.Combine("..", "..", "..", "..", "..", "src", "runner", "Synapse.Runner", "bin", "Debug", "net9.0"), Executable = "Synapse.Runner" }; }); diff --git a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj index 22a363337..83862dd25 100644 --- a/tests/Synapse.UnitTests/Synapse.UnitTests.csproj +++ b/tests/Synapse.UnitTests/Synapse.UnitTests.csproj @@ -14,14 +14,14 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + - + - - - + + +