Skip to content

Commit

Permalink
WIP: Add poststorage child workflows
Browse files Browse the repository at this point in the history
[skip-codecov]
  • Loading branch information
jraddaoui committed Nov 7, 2024
1 parent cfb6643 commit ae5f3d1
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 0 deletions.
6 changes: 6 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"

# Temporal configurations to trigger poststorage child workflows, allows multiple sections.
# [[poststorage]]
# namespace = "default"
# taskQueue = "poststorage"
# workflowName = "poststorage"

[failedSips]
endpoint = "http://minio.enduro-sdps:9000"
pathStyle = true
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/poststorage"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
Expand All @@ -47,6 +48,7 @@ type Configuration struct {
Database db.Config
Event event.Config
ExtractActivity archiveextract.Config
Poststorage []poststorage.Config
Preprocessing preprocessing.Config
Preservation pres.Config
Storage storage.Config
Expand Down
11 changes: 11 additions & 0 deletions internal/poststorage/poststorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package poststorage

type Config struct {
Namespace string
TaskQueue string
WorkflowName string
}

type WorkflowParams struct {
AIPUUID string
}
39 changes: 39 additions & 0 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/enums"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/poststorage"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
Expand Down Expand Up @@ -760,6 +761,10 @@ func (w *ProcessingWorkflow) SessionHandler(
return err
}
}

if err := w.poststorage(sessCtx, tinfo.SIPID); err != nil {
return err
}
} else if !tinfo.req.AutoApproveAIP {
// Record package rejection in review preservation task
{
Expand Down Expand Up @@ -1020,6 +1025,10 @@ func (w *ProcessingWorkflow) transferAM(ctx temporalsdk_workflow.Context, tinfo
}
}

if err := w.poststorage(ctx, tinfo.SIPID); err != nil {
return err
}

// Delete transfer.
activityOpts = withActivityOptsForRequest(ctx)
err = temporalsdk_workflow.ExecuteActivity(activityOpts, am.DeleteTransferActivityName, am.DeleteTransferActivityParams{
Expand Down Expand Up @@ -1094,6 +1103,36 @@ func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tin
}
}

// poststorage executes the configured poststorage child workflows. It uses
// a disconnected context, abandon as parent close policy and only waits
// until the workflows are started, ignoring their results.
func (w *ProcessingWorkflow) poststorage(ctx temporalsdk_workflow.Context, aipUUID string) error {
var err error
disconnectedCtx, _ := temporalsdk_workflow.NewDisconnectedContext(ctx)

for _, cfg := range w.cfg.Poststorage {
psCtx := temporalsdk_workflow.WithChildOptions(
disconnectedCtx,
temporalsdk_workflow.ChildWorkflowOptions{
Namespace: cfg.Namespace,
TaskQueue: cfg.TaskQueue,
WorkflowID: fmt.Sprintf("%s-%s", cfg.WorkflowName, aipUUID),
ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_ABANDON,
},
)
err = errors.Join(
err,
temporalsdk_workflow.ExecuteChildWorkflow(
psCtx,
cfg.WorkflowName,
poststorage.WorkflowParams{AIPUUID: aipUUID},
).GetChildWorkflowExecution().Get(psCtx, nil),
)
}

return err
}

func (w *ProcessingWorkflow) createPreservationTask(
ctx temporalsdk_workflow.Context,
pt datatypes.PreservationTask,
Expand Down

0 comments on commit ae5f3d1

Please sign in to comment.