From a8191dfa3875400c80c12ed235fda11e0f47416e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Wed, 13 Nov 2024 03:45:57 +0100 Subject: [PATCH] Add poststorage child workflows Allow to configure a set of poststorage child workflows that will be started after AIP storage. These workflows will receive the AIPUUID as a parameter and the parent workflow will only wait for them to be started by Temporal. They are started with a disconnected context and using the abandon parent close policy, so they can continue running after the parent workflow finishes, therefore their results are ignored. --- enduro.toml | 6 +++ internal/config/config.go | 2 + internal/poststorage/poststorage.go | 11 +++++ internal/workflow/processing.go | 39 ++++++++++++++++++ internal/workflow/processing_test.go | 60 ++++++++++++++++++++++++---- 5 files changed, 111 insertions(+), 7 deletions(-) create mode 100644 internal/poststorage/poststorage.go diff --git a/enduro.toml b/enduro.toml index e6027002..cc9ebf83 100644 --- a/enduro.toml +++ b/enduro.toml @@ -236,6 +236,12 @@ workflowName = "preprocessing" enabled = true xsdPath = "/home/enduro/premis.xsd" +# Temporal configurations to trigger poststorage child workflows, allows multiple sections. +# [[poststorage]] +# namespace = "default" +# taskQueue = "poststorage" +# workflowName = "poststorage" + [failedSips] endpoint = "http://minio.enduro-sdps:9000" pathStyle = true diff --git a/internal/config/config.go b/internal/config/config.go index 0c301b21..33e3f04a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,6 +22,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/premis" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" @@ -48,6 +49,7 @@ type Configuration struct { Database db.Config Event event.Config ExtractActivity archiveextract.Config + Poststorage []poststorage.Config Preprocessing preprocessing.Config Preservation pres.Config Storage storage.Config diff --git a/internal/poststorage/poststorage.go b/internal/poststorage/poststorage.go new file mode 100644 index 00000000..f41b657d --- /dev/null +++ b/internal/poststorage/poststorage.go @@ -0,0 +1,11 @@ +package poststorage + +type Config struct { + Namespace string + TaskQueue string + WorkflowName string +} + +type WorkflowParams struct { + AIPUUID string +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index f66bc7df..035e273e 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -37,6 +37,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" @@ -762,6 +763,10 @@ func (w *ProcessingWorkflow) SessionHandler( return err } } + + if err := w.poststorage(sessCtx, tinfo.SIPID); err != nil { + return err + } } else if !tinfo.req.AutoApproveAIP { // Record package rejection in review preservation task { @@ -1040,6 +1045,10 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo } } + if err := w.poststorage(ctx, tinfo.SIPID); err != nil { + return err + } + // Delete transfer. activityOpts = withActivityOptsForRequest(ctx) err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{ @@ -1114,6 +1123,36 @@ func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tin } } +// poststorage executes the configured poststorage child workflows. It uses +// a disconnected context, abandon as parent close policy and only waits +// until the workflows are started, ignoring their results. +func (w *ProcessingWorkflow) poststorage(ctx temporalsdk_workflow.Context, aipUUID string) error { + var err error + disconnectedCtx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) + + for _, cfg := range w.cfg.Poststorage { + psCtx := temporalsdk_workflow.WithChildOptions( + disconnectedCtx, + temporalsdk_workflow.ChildWorkflowOptions{ + Namespace: cfg.Namespace, + TaskQueue: cfg.TaskQueue, + WorkflowID: fmt.Sprintf("%s-%s", cfg.WorkflowName, aipUUID), + ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_ABANDON, + }, + ) + err = errors.Join( + err, + temporalsdk_workflow.ExecuteChildWorkflow( + psCtx, + cfg.WorkflowName, + poststorage.WorkflowParams{AIPUUID: aipUUID}, + ).GetChildWorkflowExecution().Get(psCtx, nil), + ) + } + + return err +} + func (w *ProcessingWorkflow) createPreservationTask( ctx temporalsdk_workflow.Context, pt datatypes.PreservationTask, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 0c4af23e..1a7cd04e 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -39,6 +39,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/premis" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" @@ -94,6 +95,13 @@ func preprocessingChildWorkflow( return nil, nil } +func poststorageChildWorkflow( + ctx temporalsdk_workflow.Context, + params *poststorage.WorkflowParams, +) (*interface{}, error) { + return nil, nil +} + func (s *ProcessingWorkflowTestSuite) CreateTransferDir() string { s.transferDir = s.T().TempDir() @@ -138,10 +146,6 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration s.setupA3mWorkflowTest(ctrl, pkgsvc) } - s.env.RegisterWorkflowWithOptions( - preprocessingChildWorkflow, - temporalsdk_workflow.RegisterOptions{Name: "preprocessing"}, - ) s.env.RegisterActivityWithOptions( removepaths.New().Execute, temporalsdk_activity.RegisterOptions{Name: removepaths.Name}, @@ -163,6 +167,19 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, ) + s.env.RegisterWorkflowWithOptions( + preprocessingChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "preprocessing"}, + ) + s.env.RegisterWorkflowWithOptions( + poststorageChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "poststorage_1"}, + ) + s.env.RegisterWorkflowWithOptions( + poststorageChildWorkflow, + temporalsdk_workflow.RegisterOptions{Name: "poststorage_2"}, + ) + s.workflow = NewProcessingWorkflow(cfg, rng, pkgsvc, wsvc) } @@ -919,7 +936,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() { s.NoError(s.env.GetWorkflowResult(nil)) } -func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { +func (s *ProcessingWorkflowTestSuite) TestChildWorkflows() { cfg := config.Configuration{ A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, Preservation: pres.Config{TaskQueue: temporal.A3mWorkerTaskQueue}, @@ -933,6 +950,18 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { WorkflowName: "preprocessing", }, }, + Poststorage: []poststorage.Config{ + { + Namespace: "default", + TaskQueue: "poststorage", + WorkflowName: "poststorage_1", + }, + { + Namespace: "default", + TaskQueue: "poststorage", + WorkflowName: "poststorage_2", + }, + }, Storage: storage.Config{ DefaultPermanentLocationID: locationID, }, @@ -948,6 +977,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") pkgsvc := s.workflow.pkgsvc + aipUUID := "56eebd45-5600-4768-a8c2-ec0114555a3d" downloadDir := strings.Replace(tempPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) prepDest := strings.Replace(extractPath, "/tmp/", cfg.Preprocessing.SharedPath, 1) @@ -1095,8 +1125,8 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ) s.env.OnActivity(a3m.CreateAIPActivityName, sessionCtx, mock.AnythingOfType("*a3m.CreateAIPActivityParams")). - Return(nil, nil). - Once() + Return(&a3m.CreateAIPActivityResult{UUID: aipUUID}, nil) + s.env.OnActivity(updatePackageLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.updatePackageLocalActivityParams")). Return(nil, nil). Times(2) @@ -1156,6 +1186,22 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { Return(nil, nil). Once() + s.env.OnWorkflow( + "poststorage_1", + mock.AnythingOfType("*internal.valueCtx"), + &poststorage.WorkflowParams{ + AIPUUID: aipUUID, + }, + ).Return(nil, nil) + + s.env.OnWorkflow( + "poststorage_2", + mock.AnythingOfType("*internal.valueCtx"), + &poststorage.WorkflowParams{ + AIPUUID: aipUUID, + }, + ).Return(nil, nil) + s.env.OnActivity( removepaths.Name, sessionCtx,