Skip to content

Commit

Permalink
WIP: Add preprocessing child workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jraddaoui committed Feb 8, 2024
1 parent 987c5ca commit 7e71df8
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 23 deletions.
28 changes: 27 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,34 @@ KUBE_OVERLAY = 'hack/kube/overlays/dev-a3m'
if PRES_SYS == 'am':
KUBE_OVERLAY = 'hack/kube/overlays/dev-am'

# Load Kustomize YAML
yaml = kustomize(KUBE_OVERLAY)

# Preprocessing
PREP_PATH = os.environ.get("PREPROCESSING_PATH", "")
if PREP_PATH != "":
# Load preprocessing Tiltfile for Enduro
load_dynamic(PREP_PATH + "/Tiltfile.enduro")
# Deploying with shared filesystem
PREP_SHARE = os.environ.get("PREPROCESSING_SHARING_FS", "").lower() in true
if PREP_SHARE:
# Get Enduro a3m/am worker k8s manifest
if PRES_SYS == "a3m":
pres_yaml, yaml = filter_yaml(yaml, name="^enduro-a3m$", kind="StatefulSet")
else:
pres_yaml, yaml = filter_yaml(yaml, name="^enduro-am$", kind="Deployment")
# Append preprocessing volume and volume mount to worker container,
# this will only work in single node k8s cluster deployments
volume = {"name": "shared-dir", "persistentVolumeClaim": {"claimName": "preprocessing-pvc"}}
volume_mount = {"name": "shared-dir", "mountPath": "/tmp"}
pres_obj = decode_yaml(pres_yaml)
pres_obj["spec"]["template"]["spec"]["volumes"].append(volume)
pres_obj["spec"]["template"]["spec"]["containers"][0]["volumeMounts"].append(volume_mount)
pres_yaml = encode_yaml(pres_obj)
yaml = [yaml, pres_yaml]

# Load Kubernetes resources
k8s_yaml(kustomize(KUBE_OVERLAY))
k8s_yaml(yaml)

# Configure trigger mode
trigger_mode = TRIGGER_MODE_MANUAL
Expand Down
9 changes: 9 additions & 0 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,12 @@ bucket = "sips"
enabled = false
address = ""
samplingRatio = 1.0

[preprocessing]
enabled = true
sharedPath = "/tmp"

[preprocessing.temporal]
namespace = "default"
taskQueue = "preprocessing"
workflowName = "preprocessing"
24 changes: 13 additions & 11 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/api"
"github.com/artefactual-sdps/enduro/internal/db"
"github.com/artefactual-sdps/enduro/internal/event"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/pres"
"github.com/artefactual-sdps/enduro/internal/storage"
"github.com/artefactual-sdps/enduro/internal/telemetry"
Expand All @@ -30,17 +31,18 @@ type Configuration struct {
Debug bool
DebugListen string

A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Preservation pres.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config
A3m a3m.Config
AM am.Config
API api.Config
Database db.Config
Event event.Config
Preservation pres.Config
Storage storage.Config
Temporal temporal.Config
Upload upload.Config
Watcher watcher.Config
Telemetry telemetry.Config
Preprocessing preprocessing.Config
}

func (c Configuration) Validate() error {
Expand Down
33 changes: 33 additions & 0 deletions internal/preprocessing/preprocessing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package preprocessing

type Config struct {
Enabled bool
SharedPath string
Temporal Temporal
InputBucket Bucket
OutputBucket Bucket
}

type Temporal struct {
Namespace string
TaskQueue string
WorkflowName string
}

type Bucket struct {
URL string
Address string
Bucket string
AccessKey string
SecretKey string
}

type Params struct {
RelativePath string
ObjectKey string
}

type Result struct {
RelativePath string
ObjectKey string
}
64 changes: 55 additions & 9 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,41 @@ package workflow
import (
"errors"
"fmt"
"path/filepath"
"time"

"github.com/go-logr/logr"
"github.com/google/uuid"
"go.artefactual.dev/tools/ref"
temporalapi_enums "go.temporal.io/api/enums/v1"
temporalsdk_temporal "go.temporal.io/sdk/temporal"
temporalsdk_workflow "go.temporal.io/sdk/workflow"

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)

type ProcessingWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
taskQueue string
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
taskQueue string
prepConfig preprocessing.Config
}

func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, taskQueue string) *ProcessingWorkflow {
func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, taskQueue string, prepConfig preprocessing.Config) *ProcessingWorkflow {
return &ProcessingWorkflow{
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
taskQueue: taskQueue,
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
taskQueue: taskQueue,
prepConfig: prepConfig,
}
}

Expand Down Expand Up @@ -312,6 +317,11 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}
}

// Preprocessing child workflow.
if err := w.preprocessing(sessCtx, tinfo); err != nil {
return err
}

// Bundle.
{
// For the a3m workflow bundle the transfer to a directory shared with
Expand Down Expand Up @@ -763,3 +773,39 @@ func (w *ProcessingWorkflow) transferAM(sessCtx temporalsdk_workflow.Context, ti

return nil
}

// TODO:
// - Allow using a different Temporal instance?
// - Make retry policy and timeouts configurable?
// - Enable remote options.
// - Move transfer if tinfo.TempFile is not inside w.prepConfig.SharedPath.
func (w *ProcessingWorkflow) preprocessing(ctx temporalsdk_workflow.Context, tinfo *TransferInfo) error {
if !w.prepConfig.Enabled {
return nil
}

realPath, err := filepath.Rel(w.prepConfig.SharedPath, tinfo.TempFile)
if err != nil {
return err
}

preCtx := temporalsdk_workflow.WithChildOptions(ctx, temporalsdk_workflow.ChildWorkflowOptions{
Namespace: w.prepConfig.Temporal.Namespace,
TaskQueue: w.prepConfig.Temporal.TaskQueue,
WorkflowID: fmt.Sprintf("%s-%s", w.prepConfig.Temporal.WorkflowName, uuid.New().String()),
ParentClosePolicy: temporalapi_enums.PARENT_CLOSE_POLICY_TERMINATE,
})
var result preprocessing.Result
err = temporalsdk_workflow.ExecuteChildWorkflow(
preCtx,
w.prepConfig.Temporal.WorkflowName,
preprocessing.Params{RelativePath: realPath},
).Get(preCtx, &result)
if err != nil {
return err
}

tinfo.TempFile = filepath.Join(w.prepConfig.SharedPath, filepath.Clean(result.RelativePath))

return nil
}
3 changes: 2 additions & 1 deletion internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/preprocessing"
sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName},
)

s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue)
s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue, preprocessing.Config{Enabled: false})
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func main() {
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue, cfg.Preprocessing).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})

Expand Down

0 comments on commit 7e71df8

Please sign in to comment.