Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a3m shareDir config value in processing #865

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/temporal"
Expand All @@ -26,18 +27,23 @@ import (
)

type ProcessingWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
wsvc watcher.Service
taskQueue string
logger logr.Logger
cfg config.Configuration
djjuhasz marked this conversation as resolved.
Show resolved Hide resolved
pkgsvc package_.Service
wsvc watcher.Service
}

func NewProcessingWorkflow(logger logr.Logger, pkgsvc package_.Service, wsvc watcher.Service, taskQueue string) *ProcessingWorkflow {
func NewProcessingWorkflow(
logger logr.Logger,
cfg config.Configuration,
pkgsvc package_.Service,
wsvc watcher.Service,
) *ProcessingWorkflow {
return &ProcessingWorkflow{
logger: logger,
pkgsvc: pkgsvc,
wsvc: wsvc,
taskQueue: taskQueue,
logger: logger,
cfg: cfg,
pkgsvc: pkgsvc,
wsvc: wsvc,
}
}

Expand Down Expand Up @@ -176,7 +182,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack

activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: w.taskQueue,
TaskQueue: w.cfg.Preservation.TaskQueue,
})
for attempt := 1; attempt <= maxAttempts; attempt++ {
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
Expand Down Expand Up @@ -317,8 +323,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// For the a3m workflow bundle the transfer to a directory shared with
// the a3m container.
var transferDir string
if w.taskQueue == temporal.A3mWorkerTaskQueue {
transferDir = "/home/a3m/.local/share/a3m/share"
if w.cfg.Preservation.TaskQueue == temporal.A3mWorkerTaskQueue {
transferDir = w.cfg.A3m.ShareDir
}

activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
Expand Down Expand Up @@ -356,7 +362,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
// Do preservation activities.
{
var err error
if w.taskQueue == temporal.AmWorkerTaskQueue {
if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue {
err = w.transferAM(sessCtx, tinfo)
} else {
err = w.transferA3m(sessCtx, tinfo)
Expand All @@ -379,7 +385,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}

// Stop here for the Archivematica workflow.
if w.taskQueue == temporal.AmWorkerTaskQueue {
if w.cfg.Preservation.TaskQueue == temporal.AmWorkerTaskQueue {
return nil
}

Expand Down
25 changes: 18 additions & 7 deletions internal/workflow/processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (

"github.com/artefactual-sdps/enduro/internal/a3m"
"github.com/artefactual-sdps/enduro/internal/am"
"github.com/artefactual-sdps/enduro/internal/config"
"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/pres"
sftp_fake "github.com/artefactual-sdps/enduro/internal/sftp/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
watcherfake "github.com/artefactual-sdps/enduro/internal/watcher/fake"
Expand All @@ -32,7 +34,11 @@ type ProcessingWorkflowTestSuite struct {

env *temporalsdk_testsuite.TestWorkflowEnvironment

// Each test registers the workflow with a different name to avoid dups.
// Each test creates its own temporary transfer directory.
transferDir string

// Each test registers the workflow with a different name to avoid
// duplicates.
workflow *ProcessingWorkflow
}

Expand All @@ -47,10 +53,15 @@ func TestTransferInfo_Name(t *testing.T) {
func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env = s.NewTestWorkflowEnvironment()
s.env.SetWorkerOptions(temporalsdk_worker.Options{EnableSessionWorker: true})
s.transferDir = s.T().TempDir()

clock := clockwork.NewFakeClock()
ctrl := gomock.NewController(s.T())
logger := logr.Discard()
cfg := config.Configuration{
Preservation: pres.Config{TaskQueue: taskQueue},
A3m: a3m.Config{ShareDir: s.transferDir},
}
pkgsvc := packagefake.NewMockService(ctrl)
wsvc := watcherfake.NewMockService(ctrl)
sftpc := sftp_fake.NewMockClient(ctrl)
Expand Down Expand Up @@ -87,7 +98,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env.RegisterActivityWithOptions(
am.NewPollTransferActivity(
logger,
&am.Config{},
&cfg.AM,
clock,
amclienttest.NewMockTransferService(ctrl),
amclienttest.NewMockJobsService(ctrl),
Expand All @@ -98,7 +109,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
s.env.RegisterActivityWithOptions(
am.NewPollIngestActivity(
logger,
&am.Config{},
&cfg.AM,
clock,
amclienttest.NewMockIngestService(ctrl),
amclienttest.NewMockJobsService(ctrl),
Expand All @@ -107,7 +118,7 @@ func (s *ProcessingWorkflowTestSuite) SetupWorkflowTest(taskQueue string) {
temporalsdk_activity.RegisterOptions{Name: am.PollIngestActivityName},
)

s.workflow = NewProcessingWorkflow(logger, pkgsvc, wsvc, taskQueue)
s.workflow = NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc)
}

func (s *ProcessingWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -154,7 +165,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageConfirmation() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down Expand Up @@ -229,7 +240,7 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down Expand Up @@ -412,7 +423,7 @@ func (s *ProcessingWorkflowTestSuite) TestPackageRejection() {
s.env.OnActivity(activities.BundleActivityName, sessionCtx,
&activities.BundleActivityParams{
WatcherName: watcherName,
TransferDir: "/home/a3m/.local/share/a3m/share",
TransferDir: s.transferDir,
Key: key,
TempFile: "/tmp/enduro123456/" + key,
},
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@
os.Exit(1)
}

w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, pkgsvc, wsvc, cfg.Preservation.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})
w.RegisterWorkflowWithOptions(workflow.NewProcessingWorkflow(logger, cfg, pkgsvc, wsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.ProcessingWorkflowName})

Check warning on line 336 in main.go

View check run for this annotation

Codecov / codecov/patch

main.go#L336

Added line #L336 was not covered by tests
w.RegisterActivityWithOptions(activities.NewDeleteOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DeleteOriginalActivityName})
w.RegisterActivityWithOptions(activities.NewDisposeOriginalActivity(wsvc).Execute, temporalsdk_activity.RegisterOptions{Name: activities.DisposeOriginalActivityName})

Expand Down
Loading