diff --git a/cmd/enduro-a3m-worker/main.go b/cmd/enduro-a3m-worker/main.go index 98625b378..45a315237 100644 --- a/cmd/enduro-a3m-worker/main.go +++ b/cmd/enduro-a3m-worker/main.go @@ -103,7 +103,7 @@ func main() { // Set up the package service. var pkgsvc package_.Service { - pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil) + pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil, cfg.Temporal.TaskQueue) } // Set up the watcher service. @@ -127,7 +127,7 @@ func main() { MaxConcurrentSessionExecutionSize: 1000, MaxConcurrentActivityExecutionSize: 1, } - w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts) + w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts) if err != nil { logger.Error(err, "Error creating Temporal worker.") os.Exit(1) diff --git a/internal/a3m/config.go b/internal/a3m/config.go index dd34157e1..2dec6f110 100644 --- a/internal/a3m/config.go +++ b/internal/a3m/config.go @@ -3,10 +3,11 @@ package a3m import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1" type Config struct { - Address string - Name string + Name string + ShareDir string + TaskQueue string + Address string Processing - ShareDir string } // The `Processing` struct represents a configuration for processing various tasks in the transferservice. diff --git a/internal/config/config.go b/internal/config/config.go index 940fc62cd..7bf8d984d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -60,6 +60,8 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed v.AddConfigPath("/etc") v.SetConfigName("enduro") v.SetDefault("api.processing", a3m.ProcessingDefault) + v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue) + v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue) v.SetDefault("debugListen", "127.0.0.1:9001") v.SetDefault("api.listen", "127.0.0.1:9000") v.SetEnvPrefix("enduro") diff --git a/internal/package_/goa.go b/internal/package_/goa.go index 53e0ee62e..b065b2af1 100644 --- a/internal/package_/goa.go +++ b/internal/package_/goa.go @@ -254,6 +254,7 @@ func (w *goaWrapper) Move(ctx context.Context, payload *goapackage.MovePayload) ID: payload.ID, AIPID: *goapkg.AipID, LocationID: payload.LocationID, + TaskQueue: w.taskQueue, }) if err != nil { w.logger.Error(err, "error initializing move workflow") diff --git a/internal/package_/package_.go b/internal/package_/package_.go index 016bc44c5..a6688cdee 100644 --- a/internal/package_/package_.go +++ b/internal/package_/package_.go @@ -39,11 +39,12 @@ type packageImpl struct { evsvc event.EventService tokenVerifier auth.TokenVerifier ticketProvider *auth.TicketProvider + taskQueue string } var _ Service = (*packageImpl)(nil) -func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider) *packageImpl { +func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider, taskQueue string) *packageImpl { return &packageImpl{ logger: logger, db: sqlx.NewDb(db, "mysql"), @@ -51,6 +52,7 @@ func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, ev evsvc: evsvc, tokenVerifier: tokenVerifier, ticketProvider: ticketProvider, + taskQueue: taskQueue, } } diff --git a/internal/package_/workflow.go b/internal/package_/workflow.go index 3101a418c..587373c93 100644 --- a/internal/package_/workflow.go +++ b/internal/package_/workflow.go @@ -8,8 +8,6 @@ import ( "github.com/google/uuid" temporalsdk_api_enums "go.temporal.io/api/enums/v1" temporalsdk_client "go.temporal.io/sdk/client" - - "github.com/artefactual-sdps/enduro/internal/temporal" ) const ( @@ -60,6 +58,10 @@ type ProcessingWorkflowRequest struct { // Location identifier for storing auto approved AIPs. DefaultPermanentLocationID *uuid.UUID + + // Task queues used for starting new workflows. + TaskQueue string + A3mTaskQueue string } func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *ProcessingWorkflowRequest) error { @@ -72,7 +74,7 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r opts := temporalsdk_client.StartWorkflowOptions{ ID: req.WorkflowID, - TaskQueue: temporal.GlobalTaskQueue, + TaskQueue: req.TaskQueue, WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, } _, err := tc.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req) @@ -84,6 +86,7 @@ type MoveWorkflowRequest struct { ID uint AIPID string LocationID uuid.UUID + TaskQueue string } func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *MoveWorkflowRequest) (temporalsdk_client.WorkflowRun, error) { @@ -92,7 +95,7 @@ func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *Mo opts := temporalsdk_client.StartWorkflowOptions{ ID: fmt.Sprintf("%s-%s", MoveWorkflowName, req.AIPID), - TaskQueue: temporal.GlobalTaskQueue, + TaskQueue: req.TaskQueue, WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, } exec, err := tc.ExecuteWorkflow(ctx, opts, MoveWorkflowName, req) diff --git a/internal/storage/config.go b/internal/storage/config.go index f70bfc706..5dc3c4d12 100644 --- a/internal/storage/config.go +++ b/internal/storage/config.go @@ -1,6 +1,7 @@ package storage type Config struct { + TaskQueue string EnduroAddress string Internal LocationConfig Database Database diff --git a/internal/storage/service.go b/internal/storage/service.go index a605528e7..dd2955b6d 100644 --- a/internal/storage/service.go +++ b/internal/storage/service.go @@ -117,7 +117,10 @@ func (s *serviceImpl) Submit(ctx context.Context, payload *goastorage.SubmitPayl return nil, goastorage.MakeNotValid(errors.New("cannot perform operation")) } - _, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{AIPID: aipID}) + _, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{ + AIPID: aipID, + TaskQueue: s.config.TaskQueue, + }) if err != nil { return nil, goastorage.MakeNotAvailable(errors.New("cannot perform operation")) } @@ -198,6 +201,7 @@ func (s *serviceImpl) Move(ctx context.Context, payload *goastorage.MovePayload) _, err = InitStorageMoveWorkflow(ctx, s.tc, &StorageMoveWorkflowRequest{ AIPID: pkg.AipID, LocationID: payload.LocationID, + TaskQueue: s.config.TaskQueue, }) if err != nil { s.logger.Error(err, "error initializing move workflow") diff --git a/internal/storage/service_test.go b/internal/storage/service_test.go index 3a2c552eb..6aeeb2258 100644 --- a/internal/storage/service_test.go +++ b/internal/storage/service_test.go @@ -56,6 +56,7 @@ func setUpService(t *testing.T, attrs *setUpAttrs) storage.Service { params := setUpAttrs{ logger: ref.New(logr.Discard()), config: &storage.Config{ + TaskQueue: "global", Internal: storage.LocationConfig{ URL: "file://" + td.Path(), }, @@ -168,7 +169,10 @@ func TestServiceSubmit(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-upload-workflow", - &storage.StorageUploadWorkflowRequest{AIPID: AIPID}, + &storage.StorageUploadWorkflowRequest{ + AIPID: AIPID, + TaskQueue: "global", + }, ). Return( nil, @@ -191,7 +195,6 @@ func TestServiceSubmit(t *testing.T) { attrs := &setUpAttrs{} svc := setUpService(t, attrs) ctx := context.Background() - attrs.temporalClientMock. On( "ExecuteWorkflow", @@ -202,7 +205,10 @@ func TestServiceSubmit(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-upload-workflow", - &storage.StorageUploadWorkflowRequest{AIPID: AIPID}, + &storage.StorageUploadWorkflowRequest{ + AIPID: AIPID, + TaskQueue: "global", + }, ). Return( &temporalsdk_mocks.WorkflowRun{}, @@ -249,7 +255,10 @@ func TestServiceSubmit(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-upload-workflow", - &storage.StorageUploadWorkflowRequest{AIPID: AIPID}, + &storage.StorageUploadWorkflowRequest{ + AIPID: AIPID, + TaskQueue: "global", + }, ). Return( &temporalsdk_mocks.WorkflowRun{}, @@ -285,6 +294,7 @@ func TestServiceSubmit(t *testing.T) { attrs := setUpAttrs{ config: &storage.Config{ + TaskQueue: "global", Internal: storage.LocationConfig{ URL: fmt.Sprintf( "file://%s?base_url=file://tmp/dir&secret_key_path=fake/signing.key", @@ -307,7 +317,10 @@ func TestServiceSubmit(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-upload-workflow", - &storage.StorageUploadWorkflowRequest{AIPID: aipID}, + &storage.StorageUploadWorkflowRequest{ + AIPID: aipID, + TaskQueue: "global", + }, ). Return( &temporalsdk_mocks.WorkflowRun{}, @@ -1085,7 +1098,7 @@ func TestServiceMove(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-move-workflow", - &storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID}, + &storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"}, ). Return( nil, @@ -1132,7 +1145,7 @@ func TestServiceMove(t *testing.T) { WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, }, "storage-move-workflow", - &storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID}, + &storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"}, ). Return( &temporalsdk_mocks.WorkflowRun{}, @@ -1212,6 +1225,7 @@ func TestServiceMoveStatus(t *testing.T) { attrs := &setUpAttrs{} svc := setUpService(t, attrs) ctx := context.Background() + attrs.config.TaskQueue = "global" attrs.temporalClientMock. On( @@ -1253,6 +1267,7 @@ func TestServiceMoveStatus(t *testing.T) { attrs := &setUpAttrs{} svc := setUpService(t, attrs) ctx := context.Background() + attrs.config.TaskQueue = "global" attrs.temporalClientMock. On( diff --git a/internal/storage/workflow.go b/internal/storage/workflow.go index b788d3e68..ad5c7df92 100644 --- a/internal/storage/workflow.go +++ b/internal/storage/workflow.go @@ -8,8 +8,6 @@ import ( "github.com/google/uuid" temporalsdk_api_enums "go.temporal.io/api/enums/v1" temporalsdk_client "go.temporal.io/sdk/client" - - "github.com/artefactual-sdps/enduro/internal/temporal" ) const ( @@ -21,12 +19,14 @@ const ( ) type StorageUploadWorkflowRequest struct { - AIPID uuid.UUID + AIPID uuid.UUID + TaskQueue string } type StorageMoveWorkflowRequest struct { AIPID uuid.UUID LocationID uuid.UUID + TaskQueue string } type CopyToPermanentLocationActivityParams struct { @@ -42,7 +42,7 @@ func InitStorageUploadWorkflow(ctx context.Context, tc temporalsdk_client.Client opts := temporalsdk_client.StartWorkflowOptions{ ID: fmt.Sprintf("%s-%s", StorageUploadWorkflowName, req.AIPID), - TaskQueue: temporal.GlobalTaskQueue, + TaskQueue: req.TaskQueue, WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, } return tc.ExecuteWorkflow(ctx, opts, StorageUploadWorkflowName, req) @@ -54,7 +54,7 @@ func InitStorageMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, opts := temporalsdk_client.StartWorkflowOptions{ ID: fmt.Sprintf("%s-%s", StorageMoveWorkflowName, req.AIPID), - TaskQueue: temporal.GlobalTaskQueue, + TaskQueue: req.TaskQueue, WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, } return tc.ExecuteWorkflow(ctx, opts, StorageMoveWorkflowName, req) diff --git a/internal/storage/workflows/move_test.go b/internal/storage/workflows/move_test.go index cd6c22a05..1af0c8706 100644 --- a/internal/storage/workflows/move_test.go +++ b/internal/storage/workflows/move_test.go @@ -40,6 +40,7 @@ func TestStorageMoveWorkflow(t *testing.T) { storage.StorageMoveWorkflowRequest{ AIPID: aipID, LocationID: locationID, + TaskQueue: "global", }, ) diff --git a/internal/temporal/temporal.go b/internal/temporal/temporal.go index c8d3f8674..30200e1d2 100644 --- a/internal/temporal/temporal.go +++ b/internal/temporal/temporal.go @@ -7,8 +7,6 @@ import ( ) const ( - // There are task queues used by our workflow and activity workers. It may - // be convenient to make these configurable in the future . GlobalTaskQueue = "global" A3mWorkerTaskQueue = "a3m" ) diff --git a/internal/workflow/move.go b/internal/workflow/move.go index b575de7b5..e89ae81bb 100644 --- a/internal/workflow/move.go +++ b/internal/workflow/move.go @@ -9,14 +9,16 @@ import ( ) type MoveWorkflow struct { - logger logr.Logger - pkgsvc package_.Service + logger logr.Logger + pkgsvc package_.Service + taskQueue string } -func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service) *MoveWorkflow { +func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service, taskQueue string) *MoveWorkflow { return &MoveWorkflow{ - logger: logger, - pkgsvc: pkgsvc, + logger: logger, + pkgsvc: pkgsvc, + taskQueue: taskQueue, } } diff --git a/internal/workflow/move_test.go b/internal/workflow/move_test.go index 37d0dca07..b6ed7ab23 100644 --- a/internal/workflow/move_test.go +++ b/internal/workflow/move_test.go @@ -15,6 +15,7 @@ import ( "github.com/artefactual-sdps/enduro/internal/package_" packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake" + "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/workflow/activities" ) @@ -45,7 +46,7 @@ func (s *MoveWorkflowTestSuite) SetupTest() { temporalsdk_activity.RegisterOptions{Name: activities.PollMoveToPermanentStorageActivityName}, ) - s.workflow = NewMoveWorkflow(logger, pkgsvc) + s.workflow = NewMoveWorkflow(logger, pkgsvc, "global") } func (s *MoveWorkflowTestSuite) AfterTest(suiteName, testName string) { @@ -103,6 +104,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() { ID: pkgID, AIPID: AIPID, LocationID: locationID, + TaskQueue: temporal.GlobalTaskQueue, }, ) diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 6623f8359..514857858 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -19,7 +19,6 @@ import ( "github.com/artefactual-sdps/enduro/internal/fsutil" "github.com/artefactual-sdps/enduro/internal/package_" "github.com/artefactual-sdps/enduro/internal/ref" - "github.com/artefactual-sdps/enduro/internal/temporal" "github.com/artefactual-sdps/enduro/internal/watcher" "github.com/artefactual-sdps/enduro/internal/workflow/activities" ) @@ -74,6 +73,12 @@ type TransferInfo struct { // // It is populated by createPreservationActionLocalActivity . PreservationActionID uint + + // Identifier of the preservation system task queue name + // + // It is populated by the workflow request. + GlobalTaskQueue string + A3mTaskQueue string } func (t *TransferInfo) Name() string { @@ -92,7 +97,9 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack logger = temporalsdk_workflow.GetLogger(ctx) tinfo = &TransferInfo{ - req: *req, + req: *req, + GlobalTaskQueue: req.TaskQueue, + A3mTaskQueue: req.A3mTaskQueue, } // Package status. All packages start in queued status. @@ -166,7 +173,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack for attempt := 1; attempt <= maxAttempts; attempt++ { activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{ StartToCloseTimeout: time.Minute, - TaskQueue: temporal.A3mWorkerTaskQueue, + TaskQueue: tinfo.A3mTaskQueue, }) sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{ CreationTimeout: forever, diff --git a/internal/workflow/processing_test.go b/internal/workflow/processing_test.go index 7539e31c1..07a8c89a3 100644 --- a/internal/workflow/processing_test.go +++ b/internal/workflow/processing_test.go @@ -162,6 +162,8 @@ func (s *ProcessingWorkflowTestSuite) TestAutoApprovedAIP() { RetentionPeriod: &retentionPeriod, AutoApproveAIP: true, DefaultPermanentLocationID: &locationID, + TaskQueue: "global", + A3mTaskQueue: "a3m", }, ) diff --git a/main.go b/main.go index 696e9e5ec..4f35d28d4 100644 --- a/main.go +++ b/main.go @@ -173,7 +173,7 @@ func main() { // Set up the package service. var pkgsvc package_.Service { - pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, tokenVerifier, ticketProvider) + pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, tokenVerifier, ticketProvider, cfg.Temporal.TaskQueue) } // Set up the ent db client. @@ -270,6 +270,8 @@ func main() { IsDir: event.IsDir, AutoApproveAIP: autoApproveAIP, DefaultPermanentLocationID: &defaultPermanentLocationID, + TaskQueue: cfg.Temporal.TaskQueue, + A3mTaskQueue: cfg.A3m.TaskQueue, } if err := package_.InitProcessingWorkflow(ctx, temporalClient, &req); err != nil { logger.Error(err, "Error initializing processing workflow.") @@ -291,7 +293,7 @@ func main() { { done := make(chan struct{}) workerOpts := temporalsdk_worker.Options{} - w := temporalsdk_worker.New(temporalClient, temporal.GlobalTaskQueue, workerOpts) + w := temporalsdk_worker.New(temporalClient, cfg.Temporal.TaskQueue, workerOpts) if err != nil { logger.Error(err, "Error creating Temporal worker.") os.Exit(1) @@ -306,7 +308,7 @@ func main() { w.RegisterActivityWithOptions(storage_activities.NewCopyToPermanentLocationActivity(storagesvc).Execute, temporalsdk_activity.RegisterOptions{Name: storage.CopyToPermanentLocationActivityName}) - w.RegisterWorkflowWithOptions(workflow.NewMoveWorkflow(logger, pkgsvc).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.MoveWorkflowName}) + w.RegisterWorkflowWithOptions(workflow.NewMoveWorkflow(logger, pkgsvc, cfg.Temporal.TaskQueue).Execute, temporalsdk_workflow.RegisterOptions{Name: package_.MoveWorkflowName}) httpClient := cleanhttp.DefaultPooledClient() storageHttpClient := goahttpstorage.NewClient("http", cfg.Storage.EnduroAddress, httpClient, goahttp.RequestEncoder, goahttp.ResponseDecoder, false)