Skip to content

Commit

Permalink
Make task queues configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogenesoftoronto authored and sevein committed Nov 2, 2023
1 parent 7f59244 commit 07f2497
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 37 deletions.
4 changes: 2 additions & 2 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func main() {
// Set up the package service.
var pkgsvc package_.Service
{
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil)
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil, cfg.Temporal.TaskQueue)
}

// Set up the watcher service.
Expand All @@ -127,7 +127,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
7 changes: 4 additions & 3 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Address string
Name string
Name string
ShareDir string
TaskQueue string
Address string
Processing
ShareDir string
}

// The `Processing` struct represents a configuration for processing various tasks in the transferservice.
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("api.processing", a3m.ProcessingDefault)
v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetEnvPrefix("enduro")
Expand Down
1 change: 1 addition & 0 deletions internal/package_/goa.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (w *goaWrapper) Move(ctx context.Context, payload *goapackage.MovePayload)
ID: payload.ID,
AIPID: *goapkg.AipID,
LocationID: payload.LocationID,
TaskQueue: w.taskQueue,
})
if err != nil {
w.logger.Error(err, "error initializing move workflow")
Expand Down
4 changes: 3 additions & 1 deletion internal/package_/package_.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ type packageImpl struct {
evsvc event.EventService
tokenVerifier auth.TokenVerifier
ticketProvider *auth.TicketProvider
taskQueue string
}

var _ Service = (*packageImpl)(nil)

func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider) *packageImpl {
func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider, taskQueue string) *packageImpl {
return &packageImpl{
logger: logger,
db: sqlx.NewDb(db, "mysql"),
tc: tc,
evsvc: evsvc,
tokenVerifier: tokenVerifier,
ticketProvider: ticketProvider,
taskQueue: taskQueue,
}
}

Expand Down
11 changes: 7 additions & 4 deletions internal/package_/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand Down Expand Up @@ -60,6 +58,10 @@ type ProcessingWorkflowRequest struct {

// Location identifier for storing auto approved AIPs.
DefaultPermanentLocationID *uuid.UUID

// Task queues used for starting new workflows.
TaskQueue string
A3mTaskQueue string
}

func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *ProcessingWorkflowRequest) error {
Expand All @@ -72,7 +74,7 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r

opts := temporalsdk_client.StartWorkflowOptions{
ID: req.WorkflowID,
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
_, err := tc.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)
Expand All @@ -84,6 +86,7 @@ type MoveWorkflowRequest struct {
ID uint
AIPID string
LocationID uuid.UUID
TaskQueue string
}

func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *MoveWorkflowRequest) (temporalsdk_client.WorkflowRun, error) {
Expand All @@ -92,7 +95,7 @@ func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *Mo

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", MoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
exec, err := tc.ExecuteWorkflow(ctx, opts, MoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

type Config struct {
TaskQueue string
EnduroAddress string
Internal LocationConfig
Database Database
Expand Down
6 changes: 5 additions & 1 deletion internal/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (s *serviceImpl) Submit(ctx context.Context, payload *goastorage.SubmitPayl
return nil, goastorage.MakeNotValid(errors.New("cannot perform operation"))
}

_, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{AIPID: aipID})
_, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{
AIPID: aipID,
TaskQueue: s.config.TaskQueue,
})
if err != nil {
return nil, goastorage.MakeNotAvailable(errors.New("cannot perform operation"))
}
Expand Down Expand Up @@ -198,6 +201,7 @@ func (s *serviceImpl) Move(ctx context.Context, payload *goastorage.MovePayload)
_, err = InitStorageMoveWorkflow(ctx, s.tc, &StorageMoveWorkflowRequest{
AIPID: pkg.AipID,
LocationID: payload.LocationID,
TaskQueue: s.config.TaskQueue,
})
if err != nil {
s.logger.Error(err, "error initializing move workflow")
Expand Down
29 changes: 22 additions & 7 deletions internal/storage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setUpService(t *testing.T, attrs *setUpAttrs) storage.Service {
params := setUpAttrs{
logger: ref.New(logr.Discard()),
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: "file://" + td.Path(),
},
Expand Down Expand Up @@ -168,7 +169,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
nil,
Expand All @@ -191,7 +195,6 @@ func TestServiceSubmit(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()

attrs.temporalClientMock.
On(
"ExecuteWorkflow",
Expand All @@ -202,7 +205,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -249,7 +255,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -285,6 +294,7 @@ func TestServiceSubmit(t *testing.T) {

attrs := setUpAttrs{
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: fmt.Sprintf(
"file://%s?base_url=file://tmp/dir&secret_key_path=fake/signing.key",
Expand All @@ -307,7 +317,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: aipID},
&storage.StorageUploadWorkflowRequest{
AIPID: aipID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1085,7 +1098,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
nil,
Expand Down Expand Up @@ -1132,7 +1145,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1212,6 +1225,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down Expand Up @@ -1253,6 +1267,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand All @@ -21,12 +19,14 @@ const (
)

type StorageUploadWorkflowRequest struct {
AIPID uuid.UUID
AIPID uuid.UUID
TaskQueue string
}

type StorageMoveWorkflowRequest struct {
AIPID uuid.UUID
LocationID uuid.UUID
TaskQueue string
}

type CopyToPermanentLocationActivityParams struct {
Expand All @@ -42,7 +42,7 @@ func InitStorageUploadWorkflow(ctx context.Context, tc temporalsdk_client.Client

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageUploadWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageUploadWorkflowName, req)
Expand All @@ -54,7 +54,7 @@ func InitStorageMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client,

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageMoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageMoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestStorageMoveWorkflow(t *testing.T) {
storage.StorageMoveWorkflowRequest{
AIPID: aipID,
LocationID: locationID,
TaskQueue: "global",
},
)

Expand Down
2 changes: 0 additions & 2 deletions internal/temporal/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
// There are task queues used by our workflow and activity workers. It may
// be convenient to make these configurable in the future .
GlobalTaskQueue = "global"
A3mWorkerTaskQueue = "a3m"
)
Expand Down
12 changes: 7 additions & 5 deletions internal/workflow/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
)

type MoveWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
logger logr.Logger
pkgsvc package_.Service
taskQueue string
}

func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service) *MoveWorkflow {
func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service, taskQueue string) *MoveWorkflow {
return &MoveWorkflow{
logger: logger,
pkgsvc: pkgsvc,
logger: logger,
pkgsvc: pkgsvc,
taskQueue: taskQueue,
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/workflow/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)

Expand Down Expand Up @@ -45,7 +46,7 @@ func (s *MoveWorkflowTestSuite) SetupTest() {
temporalsdk_activity.RegisterOptions{Name: activities.PollMoveToPermanentStorageActivityName},
)

s.workflow = NewMoveWorkflow(logger, pkgsvc)
s.workflow = NewMoveWorkflow(logger, pkgsvc, "global")
}

func (s *MoveWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() {
ID: pkgID,
AIPID: AIPID,
LocationID: locationID,
TaskQueue: temporal.GlobalTaskQueue,
},
)

Expand Down
13 changes: 10 additions & 3 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/ref"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)
Expand Down Expand Up @@ -74,6 +73,12 @@ type TransferInfo struct {
//
// It is populated by createPreservationActionLocalActivity .
PreservationActionID uint

// Identifier of the preservation system task queue name
//
// It is populated by the workflow request.
GlobalTaskQueue string
A3mTaskQueue string
}

func (t *TransferInfo) Name() string {
Expand All @@ -92,7 +97,9 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
logger = temporalsdk_workflow.GetLogger(ctx)

tinfo = &TransferInfo{
req: *req,
req: *req,
GlobalTaskQueue: req.TaskQueue,
A3mTaskQueue: req.A3mTaskQueue,
}

// Package status. All packages start in queued status.
Expand Down Expand Up @@ -166,7 +173,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
for attempt := 1; attempt <= maxAttempts; attempt++ {
activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: temporal.A3mWorkerTaskQueue,
TaskQueue: tinfo.A3mTaskQueue,
})
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
CreationTimeout: forever,
Expand Down
Loading

0 comments on commit 07f2497

Please sign in to comment.