From ae5f3d157e11aff6850184c1849b6b9e5140e9cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Raddaoui=20Mar=C3=ADn?= Date: Thu, 7 Nov 2024 08:14:44 +0100 Subject: [PATCH] WIP: Add poststorage child workflows [skip-codecov] --- enduro.toml | 6 +++++ internal/config/config.go | 2 ++ internal/poststorage/poststorage.go | 11 ++++++++ internal/workflow/processing.go | 39 +++++++++++++++++++++++++++++ 4 files changed, 58 insertions(+) create mode 100644 internal/poststorage/poststorage.go diff --git a/enduro.toml b/enduro.toml index fe07ccc28..fc642c06f 100644 --- a/enduro.toml +++ b/enduro.toml @@ -232,6 +232,12 @@ namespace = "default" taskQueue = "preprocessing" workflowName = "preprocessing" +# Temporal configurations to trigger poststorage child workflows, allows multiple sections. +# [[poststorage]] +# namespace = "default" +# taskQueue = "poststorage" +# workflowName = "poststorage" + [failedSips] endpoint = "http://minio.enduro-sdps:9000" pathStyle = true diff --git a/internal/config/config.go b/internal/config/config.go index 51df5cba0..6e4a0b8da 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,6 +22,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/db" "github.com/artefactual-sdps/enduro/internal/event" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/pres" "github.com/artefactual-sdps/enduro/internal/storage" @@ -47,6 +48,7 @@ type Configuration struct { Database db.Config Event event.Config ExtractActivity archiveextract.Config + Poststorage []poststorage.Config Preprocessing preprocessing.Config Preservation pres.Config Storage storage.Config diff --git a/internal/poststorage/poststorage.go b/internal/poststorage/poststorage.go new file mode 100644 index 000000000..f41b657d8 --- /dev/null +++ b/internal/poststorage/poststorage.go @@ -0,0 +1,11 @@ +package poststorage + +type Config struct { + Namespace string + TaskQueue string + WorkflowName string +} + +type WorkflowParams struct { + AIPUUID string +} diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index c3d9dd742..76fd00fb8 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -35,6 +35,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/enums" "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" + "github.com/artefactual-sdps/enduro/internal/poststorage" "github.com/artefactual-sdps/enduro/internal/preprocessing" "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" @@ -760,6 +761,10 @@ func (w *ProcessingWorkflow) SessionHandler( return err } } + + if err := w.poststorage(sessCtx, tinfo.SIPID); err != nil { + return err + } } else if !tinfo.req.AutoApproveAIP { // Record package rejection in review preservation task { @@ -1020,6 +1025,10 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo } } + if err := w.poststorage(ctx, tinfo.SIPID); err != nil { + return err + } + // Delete transfer. activityOpts = withActivityOptsForRequest(ctx) err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{ @@ -1094,6 +1103,36 @@ func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tin } } +// poststorage executes the configured poststorage child workflows. It uses +// a disconnected context, abandon as parent close policy and only waits +// until the workflows are started, ignoring their results. +func (w *ProcessingWorkflow) poststorage(ctx temporalsdk_workflow.Context, aipUUID string) error { + var err error + disconnectedCtx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx) + + for _, cfg := range w.cfg.Poststorage { + psCtx := temporalsdk_workflow.WithChildOptions( + disconnectedCtx, + temporalsdk_workflow.ChildWorkflowOptions{ + Namespace: cfg.Namespace, + TaskQueue: cfg.TaskQueue, + WorkflowID: fmt.Sprintf("%s-%s", cfg.WorkflowName, aipUUID), + ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_ABANDON, + }, + ) + err = errors.Join( + err, + temporalsdk_workflow.ExecuteChildWorkflow( + psCtx, + cfg.WorkflowName, + poststorage.WorkflowParams{AIPUUID: aipUUID}, + ).GetChildWorkflowExecution().Get(psCtx, nil), + ) + } + + return err +} + func (w *ProcessingWorkflow) createPreservationTask( ctx temporalsdk_workflow.Context, pt datatypes.PreservationTask,