From 89421769e844d13020f15df297a84b35ee730b1a Mon Sep 17 00:00:00 2001 From: Mike Cantelon Date: Wed, 28 Aug 2024 14:02:13 -0700 Subject: [PATCH] Check PREMIS validity before preservation [skip-codecov] --- .dockerignore | 1 + Dockerfile | 15 + cmd/enduro-a3m-worker/main.go | 5 + cmd/enduro-am-worker/main.go | 5 + enduro.toml | 4 + go.mod | 2 +- go.sum | 4 +- hack/kube/base/enduro.yaml | 2 + hack/xsd/premis.xsd | 1223 ++++++++++++++++++++++++++ internal/config/config.go | 3 + internal/config/config_test.go | 2 + internal/premis/config.go | 26 + internal/premis/config_test.go | 68 ++ internal/workflow/processing.go | 94 ++ internal/workflow/processing_test.go | 152 +++- 15 files changed, 1584 insertions(+), 22 deletions(-) create mode 100644 hack/xsd/premis.xsd create mode 100644 internal/premis/config.go create mode 100644 internal/premis/config_test.go diff --git a/.dockerignore b/.dockerignore index 205d1379..12cf9203 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,3 +5,4 @@ !go.mod !go.sum !main.go +!hack/xsd/premis.xsd diff --git a/Dockerfile b/Dockerfile index fb09f437..998b65fe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,9 @@ ARG TARGET=enduro ARG GO_VERSION +FROM alpine:3.20 AS build-libxml +RUN apk add --no-cache libxml2-utils + FROM golang:${GO_VERSION}-alpine AS build-go WORKDIR /src ENV CGO_ENABLED=0 @@ -61,10 +64,22 @@ FROM base AS enduro-a3m-worker COPY --from=build-enduro-a3m-worker --link /out/enduro-a3m-worker /home/enduro/bin/enduro-a3m-worker COPY --from=build-enduro-a3m-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml CMD ["/home/enduro/bin/enduro-a3m-worker", "--config", "/home/enduro/.config/enduro.toml"] +COPY hack/xsd/premis.xsd /home/enduro/premis.xsd +COPY --from=build-libxml /usr/bin/xmllint /usr/bin/xmllint +COPY --from=build-libxml /usr/lib/libxml2.so.2 /usr/lib/libxml2.so.2 +COPY --from=build-libxml /lib/ld-musl-x86_64.so.1 /lib/ld-musl-x86_64.so.1 +COPY --from=build-libxml /lib/libz.so.1 /lib/libz.so.1 +COPY --from=build-libxml /usr/lib/liblzma.so.5 /usr/lib/liblzma.so.5 FROM base AS enduro-am-worker COPY --from=build-enduro-am-worker --link /out/enduro-am-worker /home/enduro/bin/enduro-am-worker COPY --from=build-enduro-am-worker --link /src/enduro.toml /home/enduro/.config/enduro.toml CMD ["/home/enduro/bin/enduro-am-worker", "--config", "/home/enduro/.config/enduro.toml"] +COPY hack/xsd/premis.xsd /home/enduro/premis.xsd +COPY --from=build-libxml /usr/bin/xmllint /usr/bin/xmllint +COPY --from=build-libxml /usr/lib/libxml2.so.2 /usr/lib/libxml2.so.2 +COPY --from=build-libxml /lib/ld-musl-x86_64.so.1 /lib/ld-musl-x86_64.so.1 +COPY --from=build-libxml /lib/libz.so.1 /lib/libz.so.1 +COPY --from=build-libxml /usr/lib/liblzma.so.5 /usr/lib/liblzma.so.5 FROM ${TARGET} diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 3bd0134a..2a4a63ef 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -19,6 +19,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" + "github.com/artefactual-sdps/temporal-activities/xmlvalidate" "github.com/hashicorp/go-cleanhttp" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -252,6 +253,10 @@ func main() { archiveextract.New(cfg.ExtractActivity).Execute, temporalsdk_activity.RegisterOptions{Name: archiveextract.Name}, ) + w.RegisterActivityWithOptions( + xmlvalidate.New(xmlvalidate.NewXMLLintValidator()).Execute, + temporalsdk_activity.RegisterOptions{Name: xmlvalidate.Name}, + ) w.RegisterActivityWithOptions( activities.NewClassifyPackageActivity().Execute, temporalsdk_activity.RegisterOptions{Name: activities.ClassifyPackageActivityName}, diff --git a/cmd/enduro-am-worker/main.go b/cmd/enduro-am-worker/main.go index ae325bdb..b8e48a73 100644 --- a/cmd/enduro-am-worker/main.go +++ b/cmd/enduro-am-worker/main.go @@ -20,6 +20,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" + "github.com/artefactual-sdps/temporal-activities/xmlvalidate" "github.com/hashicorp/go-cleanhttp" "github.com/jonboulle/clockwork" "github.com/oklog/run" @@ -329,6 +330,10 @@ func main() { bucketupload.New(failedPIPs).Execute, temporalsdk_activity.RegisterOptions{Name: activities.SendToFailedPIPsName}, ) + w.RegisterActivityWithOptions( + xmlvalidate.New(xmlvalidate.NewXMLLintValidator()).Execute, + temporalsdk_activity.RegisterOptions{Name: xmlvalidate.Name}, + ) g.Add( func() error { diff --git a/enduro.toml b/enduro.toml index fe07ccc2..e6027002 100644 --- a/enduro.toml +++ b/enduro.toml @@ -232,6 +232,10 @@ namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" +[validatePremis] +enabled = true +xsdPath = "/home/enduro/premis.xsd" + [failedSips] endpoint = "http://minio.enduro-sdps:9000" pathStyle = true diff --git a/go.mod b/go.mod index f352e0ca..55a67746 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/XSAM/otelsql v0.29.0 github.com/alicebob/miniredis/v2 v2.32.1 github.com/artefactual-labs/bagit-gython v0.2.0 - github.com/artefactual-sdps/temporal-activities v0.0.0-20240821162351-47302711bc7b + github.com/artefactual-sdps/temporal-activities v0.0.0-20241018212855-8ea34d29bdf4 github.com/coreos/go-oidc/v3 v3.10.0 github.com/cyphar/filepath-securejoin v0.2.4 github.com/dolmen-go/contextio v1.0.0 diff --git a/go.sum b/go.sum index 48ccbb6b..072bcca2 100644 --- a/go.sum +++ b/go.sum @@ -442,8 +442,8 @@ github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= github.com/artefactual-labs/bagit-gython v0.2.0 h1:Zje4Lb1goZVUPoxpc/k65sWtYpNgK9Rvphvaok5cYzE= github.com/artefactual-labs/bagit-gython v0.2.0/go.mod h1:C+hFZQMDnji1hjGt3nrlMK3BahaBhvo/hU2uqd+Q9Z4= -github.com/artefactual-sdps/temporal-activities v0.0.0-20240821162351-47302711bc7b h1:kTOc2pbkdII6/Z84Bus1q52z5KAOaT8vLpfRoOs1l1I= -github.com/artefactual-sdps/temporal-activities v0.0.0-20240821162351-47302711bc7b/go.mod h1:FVh79rCGNlUU1QnioAU+lrSjLqrA1PJFYKIhWPsmyug= +github.com/artefactual-sdps/temporal-activities v0.0.0-20241018212855-8ea34d29bdf4 h1:WF95IOkZRVSCST/26SAqPYsUrtUuJpavBht6lvdeKl0= +github.com/artefactual-sdps/temporal-activities v0.0.0-20241018212855-8ea34d29bdf4/go.mod h1:FVh79rCGNlUU1QnioAU+lrSjLqrA1PJFYKIhWPsmyug= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY= diff --git a/hack/kube/base/enduro.yaml b/hack/kube/base/enduro.yaml index 836bdeaf..0c640a7c 100644 --- a/hack/kube/base/enduro.yaml +++ b/hack/kube/base/enduro.yaml @@ -70,6 +70,8 @@ spec: value: "grafana-agent.enduro-sdps:4317" - name: ENDURO_TELEMETRY_TRACES_SAMPLING_RATIO value: "1.0" + - name: ENDURO_VALIDATEPREMIS_ENABLED + value: "false" ports: - containerPort: 9000 - containerPort: 9002 diff --git a/hack/xsd/premis.xsd b/hack/xsd/premis.xsd new file mode 100644 index 00000000..be6122b6 --- /dev/null +++ b/hack/xsd/premis.xsd @@ -0,0 +1,1223 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/internal/config/config.go b/internal/config/config.go index 51df5cba..0c301b21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,6 +22,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/premis" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" "github.com/artefactual-sdps/enduro/internal/storage" @@ -54,6 +55,7 @@ type Configuration struct { Upload package_.UploadConfig Watcher watcher.Config Telemetry telemetry.Config + ValidatePREMIS premis.Config FailedSIPs bucket.Config FailedPIPs bucket.Config @@ -67,6 +69,7 @@ func (c *Configuration) Validate() error { c.BagIt.Validate(), c.Preprocessing.Validate(), c.Upload.Validate(), + c.ValidatePREMIS.Validate(), ) } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 85d16a28..f1fdee60 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -87,5 +87,7 @@ func TestConfig(t *testing.T) { assert.Equal(t, c.Preservation.TaskQueue, temporal.A3mWorkerTaskQueue) assert.Equal(t, c.Storage.TaskQueue, temporal.GlobalTaskQueue) assert.Equal(t, c.Temporal.TaskQueue, temporal.GlobalTaskQueue) + assert.Equal(t, c.ValidatePREMIS.Enabled, false) + assert.Equal(t, c.ValidatePREMIS.XSDPath, "") }) } diff --git a/internal/premis/config.go b/internal/premis/config.go new file mode 100644 index 00000000..e8ce84eb --- /dev/null +++ b/internal/premis/config.go @@ -0,0 +1,26 @@ +package premis + +import ( + "errors" + "fmt" + "os" +) + +type Config struct { + Enabled bool + XSDPath string +} + +// Validate implements config.ConfigurationValidator. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + if c.XSDPath == "" { + return errors.New("xsdPath is required in the [validatePremis] configuration when enabled") + } + if _, err := os.Stat(c.XSDPath); err != nil { + return fmt.Errorf("xsdPath in [validatePremis] not found: %v", err) + } + return nil +} diff --git a/internal/premis/config_test.go b/internal/premis/config_test.go new file mode 100644 index 00000000..d91607ce --- /dev/null +++ b/internal/premis/config_test.go @@ -0,0 +1,68 @@ +package premis_test + +import ( + "fmt" + "testing" + + "gotest.tools/v3/assert" + "gotest.tools/v3/fs" + + "github.com/artefactual-sdps/enduro/internal/premis" +) + +func TestConfig(t *testing.T) { + t.Parallel() + + type test struct { + name string + config *premis.Config + wantErr string + } + + // Non-existent XSD path. + badXSDfs := fs.NewDir(t, "", fs.WithFile("missing.xsd", "")) + badXSDPath := badXSDfs.Join("missing.xsd") + badXSDfs.Remove() + + for _, tt := range []test{ + { + name: "Passes validation (disabled)", + config: &premis.Config{ + Enabled: false, + }, + }, + { + name: "Fails validation (missing XSD path)", + config: &premis.Config{ + Enabled: true, + }, + wantErr: "xsdPath is required in the [validatePremis] configuration when enabled", + }, + { + name: "Fails validation (missing XSD file)", + config: &premis.Config{ + Enabled: true, + XSDPath: badXSDPath, + }, + wantErr: fmt.Sprintf("xsdPath in [validatePremis] not found: stat %s: no such file or directory", badXSDPath), + }, + { + name: "Passes validation (enabled)", + config: &premis.Config{ + Enabled: true, + XSDPath: fs.NewDir(t, "", fs.WithFile("empty.xsd", "")).Join("empty.xsd"), + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + err := tt.config.Validate() + if tt.wantErr != "" { + assert.Error(t, err, tt.wantErr) + return + } + assert.NilError(t, err) + }) + } +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index c3d9dd74..f66bc7df 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -12,6 +12,7 @@ import ( "io" "path/filepath" "slices" + "strings" "time" "github.com/artefactual-sdps/temporal-activities/archiveextract" @@ -20,6 +21,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" + "github.com/artefactual-sdps/temporal-activities/xmlvalidate" "github.com/google/uuid" "go.artefactual.dev/tools/ref" temporal_tools "go.artefactual.dev/tools/temporal" @@ -834,6 +836,15 @@ func (w *ProcessingWorkflow) transferA3m( tinfo.SendToFailed.ActivityName = activities.SendToFailedPIPsName tinfo.SendToFailed.NeedsZipping = true + err := w.validatePREMIS( + sessCtx, + filepath.Join(tinfo.Bundle.FullPath, "metadata", "premis.xml"), + tinfo.PreservationActionID, + ) + if err != nil { + return err + } + // Send PIP to a3m for preservation. { activityOpts := temporalsdk_workflow.WithActivityOptions(sessCtx, temporalsdk_workflow.ActivityOptions{ @@ -883,6 +894,15 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo tinfo.PackageType = enums.PackageTypeBagIt } + err = w.validatePREMIS( + ctx, + filepath.Join(tinfo.TempPath, "data", "metadata", "premis.xml"), + tinfo.PreservationActionID, + ) + if err != nil { + return err + } + // Zip PIP. activityOpts := withActivityOptsForLocalAction(ctx) var zipResult archivezip.Result @@ -1188,3 +1208,77 @@ func (w *ProcessingWorkflow) sendToFailedBucket( return nil } + +func (w *ProcessingWorkflow) validatePREMIS( + ctx temporalsdk_workflow.Context, + xmlPath string, + paID int, +) error { + if !w.cfg.ValidatePREMIS.Enabled { + return nil + } + + // Create preservation task for PREMIS validation. + id, err := w.createPreservationTask( + ctx, + datatypes.PreservationTask{ + Name: "Validate PREMIS", + Status: enums.PreservationTaskStatusInProgress, + PreservationActionID: paID, + }, + ) + if err != nil { + return fmt.Errorf("create validate PREMIS task: %v", err) + } + + // Set preservation task default status and note. + pt := datatypes.PreservationTask{ + ID: id, + Status: enums.PreservationTaskStatusDone, + Note: "PREMIS is valid", + } + + // Attempt to validate PREMIS. + var xmlvalidateResult xmlvalidate.Result + activityOpts := withActivityOptsForLocalAction(ctx) + err = temporalsdk_workflow.ExecuteActivity(activityOpts, xmlvalidate.Name, xmlvalidate.Params{ + XMLPath: xmlPath, + XSDPath: w.cfg.ValidatePREMIS.XSDPath, + }).Get(activityOpts, &xmlvalidateResult) + + if err != nil { + if strings.Contains(err.Error(), fmt.Sprintf("%s: no such file or directory", xmlPath)) { + // Allow PREMIS XML to not exist without failing workflow. + err = nil + } else { + pt.Status = enums.PreservationTaskStatusError + pt.Note = "System error" + } + } + + // Set preservation task status to error and add PREMIS validation failures to note. + if len(xmlvalidateResult.Failures) > 0 { + pt.Status = enums.PreservationTaskStatusError + pt.Note = "PREMIS is invalid" + + for _, failure := range xmlvalidateResult.Failures { + pt.Note += fmt.Sprintf("\n%s", failure) + } + + err = errors.New(pt.Note) + } + + // Mark preservation task as complete. + if e := w.completePreservationTask(ctx, pt); e != nil { + return errors.Join( + err, + fmt.Errorf("complete validate PREMIS task: %v", e), + ) + } + + if err != nil { + return fmt.Errorf("validate PREMIS: %v", err) + } + + return nil +} diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 9c61fac3..0c4af23e 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "math/rand" + "path/filepath" "strings" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/artefactual-sdps/temporal-activities/bagvalidate" "github.com/artefactual-sdps/temporal-activities/bucketupload" "github.com/artefactual-sdps/temporal-activities/removepaths" + "github.com/artefactual-sdps/temporal-activities/xmlvalidate" "github.com/google/uuid" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/mock" @@ -37,6 +39,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/premis" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake" @@ -115,6 +118,10 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(cfg config.Configuration archiveextract.New(cfg.ExtractActivity).Execute, temporalsdk_activity.RegisterOptions{Name: archiveextract.Name}, ) + s.env.RegisterActivityWithOptions( + xmlvalidate.New(xmlvalidate.NewXMLLintValidator()).Execute, + temporalsdk_activity.RegisterOptions{Name: xmlvalidate.Name}, + ) s.env.RegisterActivityWithOptions( bagvalidate.New(bagvalidate.NewNoopValidator()).Execute, temporalsdk_activity.RegisterOptions{Name: bagvalidate.Name}, @@ -258,10 +265,16 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { Storage: storage.Config{ DefaultPermanentLocationID: locationID, }, + ValidatePREMIS: premis.Config{ + Enabled: true, + XSDPath: "/home/enduro/premis.xsd", + }, } s.SetupWorkflowTest(cfg) pkgID := 1 + paID := 1 + valPREMISTaskID := 102 ctx := mock.AnythingOfType("*context.valueCtx") sessionCtx := mock.AnythingOfType("*context.timerCtx") key := "transfer.zip" @@ -296,7 +309,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { StartedAt: startTime, PackageID: 1, }, - ).Return(0, nil) + ).Return(paID, nil) s.env.OnActivity(activities.DownloadActivityName, sessionCtx, &activities.DownloadActivityParams{Key: key, WatcherName: watcherName}, @@ -321,6 +334,47 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() { nil, ) + s.env.OnActivity( + createPreservationTaskLocalActivity, + ctx, + &createPreservationTaskLocalActivityParams{ + PkgSvc: pkgsvc, + RNG: s.workflow.rng, + PreservationTask: datatypes.PreservationTask{ + Name: "Validate PREMIS", + Status: enums.PreservationTaskStatusInProgress, + StartedAt: sql.NullTime{ + Time: startTime, + Valid: true, + }, + PreservationActionID: paID, + }, + }, + ).Return(valPREMISTaskID, nil) + + s.env.OnActivity( + xmlvalidate.Name, + sessionCtx, + &xmlvalidate.Params{ + XMLPath: filepath.Join(transferPath, "metadata", "premis.xml"), + XSDPath: cfg.ValidatePREMIS.XSDPath, + }, + ).Return( + &xmlvalidate.Result{Failures: []string{}}, nil, + ) + + s.env.OnActivity( + completePreservationTaskLocalActivity, + ctx, + pkgsvc, + &completePreservationTaskLocalActivityParams{ + ID: valPREMISTaskID, + Status: enums.PreservationTaskStatusDone, + CompletedAt: startTime, + Note: ref.New("PREMIS is valid"), + }, + ).Return(&completePreservationTaskLocalActivityResult{}, nil) + s.env.OnActivity(a3m.CreateAIPActivityName, mock.Anything, mock.Anything).Return(nil, nil) s.env.OnActivity(updatePackageLocalActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil, nil) @@ -376,6 +430,9 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { s.SetupWorkflowTest(cfg) pkgID := 1 + paID := 1 + valBagTaskID := 101 + moveAIPTaskID := 103 watcherName := "watcher" key := "transfer.zip" retentionPeriod := 1 * time.Second @@ -410,7 +467,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { StartedAt: startTime, PackageID: 1, }, - ).Return(0, nil) + ).Return(paID, nil) s.env.OnActivity(activities.DownloadActivityName, sessionCtx, &activities.DownloadActivityParams{Key: key, WatcherName: watcherName}, @@ -445,10 +502,10 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { Time: startTime, Valid: true, }, - PreservationActionID: 0, + PreservationActionID: paID, }, }, - ).Return(101, nil) + ).Return(valBagTaskID, nil) s.env.OnActivity( bagvalidate.Name, @@ -464,7 +521,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { ctx, pkgsvc, &completePreservationTaskLocalActivityParams{ - ID: 101, + ID: valBagTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: startTime, Note: ref.New("Bag is valid"), @@ -500,10 +557,10 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { Status: enums.PreservationTaskStatusInProgress, StartedAt: sql.NullTime{Time: startTime, Valid: true}, Note: "Moving to permanent storage", - PreservationActionID: 0, + PreservationActionID: paID, }, }, - ).Return(102, nil) + ).Return(moveAIPTaskID, nil) s.env.OnActivity(activities.UploadActivityName, sessionCtx, mock.AnythingOfType("*activities.UploadActivityParams")). Return(nil, nil). @@ -520,7 +577,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { ctx, pkgsvc, &completePreservationTaskLocalActivityParams{ - ID: 102, + ID: moveAIPTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: startTime, Note: ref.New("Moved to location f2cc963f-c14d-4eaa-b950-bd207189a1f1"), @@ -561,6 +618,8 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { pkgID := 1 + paID := 1 + valPREMISTaskID := 102 watcherName := "watcher" key := "transfer.zip" retentionPeriod := 1 * time.Second @@ -571,6 +630,10 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { A3m: a3m.Config{ShareDir: s.CreateTransferDir()}, Preservation: pres.Config{TaskQueue: temporal.AmWorkerTaskQueue}, Storage: storage.Config{DefaultPermanentLocationID: amssLocationID}, + ValidatePREMIS: premis.Config{ + Enabled: true, + XSDPath: "/home/enduro/premis.xsd", + }, } s.SetupWorkflowTest(cfg) @@ -587,7 +650,7 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { s.env.OnActivity(createPreservationActionLocalActivity, ctx, pkgsvc, mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams"), - ).Return(0, nil) + ).Return(paID, nil) s.env.OnActivity(activities.DownloadActivityName, sessionCtx, &activities.DownloadActivityParams{Key: key, WatcherName: watcherName}, @@ -616,6 +679,47 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { &bagcreate.Result{BagPath: extractPath}, nil, ) + s.env.OnActivity( + createPreservationTaskLocalActivity, + ctx, + &createPreservationTaskLocalActivityParams{ + PkgSvc: pkgsvc, + RNG: s.workflow.rng, + PreservationTask: datatypes.PreservationTask{ + Name: "Validate PREMIS", + Status: enums.PreservationTaskStatusInProgress, + StartedAt: sql.NullTime{ + Time: startTime, + Valid: true, + }, + PreservationActionID: paID, + }, + }, + ).Return(valPREMISTaskID, nil) + + s.env.OnActivity( + xmlvalidate.Name, + sessionCtx, + &xmlvalidate.Params{ + XMLPath: filepath.Join(extractPath, "data", "metadata", "premis.xml"), + XSDPath: "/home/enduro/premis.xsd", + }, + ).Return( + &xmlvalidate.Result{Failures: []string{}}, nil, + ) + + s.env.OnActivity( + completePreservationTaskLocalActivity, + ctx, + pkgsvc, + &completePreservationTaskLocalActivityParams{ + ID: valPREMISTaskID, + Status: enums.PreservationTaskStatusDone, + CompletedAt: startTime, + Note: ref.New("PREMIS is valid"), + }, + ).Return(&completePreservationTaskLocalActivityResult{}, nil) + s.env.OnActivity(archivezip.Name, sessionCtx, &archivezip.Params{SourceDir: extractPath}, ).Return( @@ -638,13 +742,13 @@ func (s *ProcessingWorkflowTestSuite) TestAMWorkflow() { ) s.env.OnActivity(am.PollTransferActivityName, sessionCtx, - &am.PollTransferActivityParams{TransferID: transferID.String()}, + &am.PollTransferActivityParams{TransferID: transferID.String(), PresActionID: paID}, ).Return( &am.PollTransferActivityResult{SIPID: sipID.String()}, nil, ) s.env.OnActivity(am.PollIngestActivityName, sessionCtx, - &am.PollIngestActivityParams{SIPID: sipID.String()}, + &am.PollIngestActivityParams{SIPID: sipID.String(), PresActionID: paID}, ).Return( &am.PollIngestActivityResult{Status: "COMPLETE"}, nil, ) @@ -836,6 +940,8 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { s.SetupWorkflowTest(cfg) pkgID := 1 + valBagTaskID := 101 + moveAIPTaskID := 103 watcherName := "watcher" key := "transfer.zip" retentionPeriod := 1 * time.Second @@ -954,7 +1060,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { PreservationActionID: 1, }, }, - ).Return(101, nil) + ).Return(valBagTaskID, nil) s.env.OnActivity( bagvalidate.Name, @@ -970,7 +1076,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ctx, pkgsvc, &completePreservationTaskLocalActivityParams{ - ID: 101, + ID: valBagTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: startTime, Note: ref.New("Bag is valid"), @@ -1009,7 +1115,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { Note: "Moving to permanent storage", }, }, - ).Return(102, nil) + ).Return(moveAIPTaskID, nil) s.env.OnActivity( activities.UploadActivityName, @@ -1022,7 +1128,7 @@ func (s *ProcessingWorkflowTestSuite) TestPreprocessingChildWorkflow() { ctx, pkgsvc, &completePreservationTaskLocalActivityParams{ - ID: 102, + ID: moveAIPTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: startTime, Note: ref.New("Moved to location f2cc963f-c14d-4eaa-b950-bd207189a1f1"), @@ -1207,6 +1313,7 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPA3m() { s.SetupWorkflowTest(cfg) pkgID := 1 + valBagTaskID := 101 watcherName := "watcher" key := "transfer.zip" retentionPeriod := 1 * time.Second @@ -1281,7 +1388,7 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPA3m() { PreservationActionID: 1, }, }, - ).Return(101, nil) + ).Return(valBagTaskID, nil) s.env.OnActivity( bagvalidate.Name, @@ -1294,7 +1401,7 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPA3m() { ctx, pkgsvc, &completePreservationTaskLocalActivityParams{ - ID: 101, + ID: valBagTaskID, Status: enums.PreservationTaskStatusDone, CompletedAt: startTime, Note: ref.New("Bag is valid"), @@ -1370,6 +1477,7 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPAM() { s.SetupWorkflowTest(cfg) pkgID := 1 + paID := 1 watcherName := "watcher" key := "transfer.zip" retentionPeriod := 1 * time.Second @@ -1391,8 +1499,14 @@ func (s *ProcessingWorkflowTestSuite) TestFailedPIPAM() { createPreservationActionLocalActivity, ctx, pkgsvc, - mock.AnythingOfType("*workflow.createPreservationActionLocalActivityParams"), - ).Return(0, nil) + &createPreservationActionLocalActivityParams{ + WorkflowID: "default-test-workflow-id", + Type: enums.PreservationActionTypeCreateAip, + Status: enums.PreservationActionStatusInProgress, + StartedAt: startTime, + PackageID: 1, + }, + ).Return(paID, nil) s.env.OnActivity( activities.DownloadActivityName,