Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make task queues configurable #655

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func main() {
// Set up the package service.
var pkgsvc package_.Service
{
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil)
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil, cfg.Temporal.TaskQueue)
}

// Set up the watcher service.
Expand All @@ -127,7 +127,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
7 changes: 4 additions & 3 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Address string
Name string
Name string
ShareDir string
TaskQueue string
Address string
Processing
ShareDir string
}

// The `Processing` struct represents a configuration for processing various tasks in the transferservice.
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("api.processing", a3m.ProcessingDefault)
v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetEnvPrefix("enduro")
Expand Down
1 change: 1 addition & 0 deletions internal/package_/goa.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (w *goaWrapper) Move(ctx context.Context, payload *goapackage.MovePayload)
ID: payload.ID,
AIPID: *goapkg.AipID,
LocationID: payload.LocationID,
TaskQueue: w.taskQueue,
})
if err != nil {
w.logger.Error(err, "error initializing move workflow")
Expand Down
4 changes: 3 additions & 1 deletion internal/package_/package_.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ type packageImpl struct {
evsvc event.EventService
tokenVerifier auth.TokenVerifier
ticketProvider *auth.TicketProvider
taskQueue string
}

var _ Service = (*packageImpl)(nil)

func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider) *packageImpl {
func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider, taskQueue string) *packageImpl {
return &packageImpl{
logger: logger,
db: sqlx.NewDb(db, "mysql"),
tc: tc,
evsvc: evsvc,
tokenVerifier: tokenVerifier,
ticketProvider: ticketProvider,
taskQueue: taskQueue,
}
}

Expand Down
11 changes: 7 additions & 4 deletions internal/package_/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand Down Expand Up @@ -60,6 +58,10 @@ type ProcessingWorkflowRequest struct {

// Location identifier for storing auto approved AIPs.
DefaultPermanentLocationID *uuid.UUID

// Task queues used for starting new workflows.
TaskQueue string
A3mTaskQueue string
}

func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *ProcessingWorkflowRequest) error {
Expand All @@ -72,7 +74,7 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r

opts := temporalsdk_client.StartWorkflowOptions{
ID: req.WorkflowID,
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
_, err := tc.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)
Expand All @@ -84,6 +86,7 @@ type MoveWorkflowRequest struct {
ID uint
AIPID string
LocationID uuid.UUID
TaskQueue string
}

func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *MoveWorkflowRequest) (temporalsdk_client.WorkflowRun, error) {
Expand All @@ -92,7 +95,7 @@ func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *Mo

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", MoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
exec, err := tc.ExecuteWorkflow(ctx, opts, MoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

type Config struct {
TaskQueue string
EnduroAddress string
Internal LocationConfig
Database Database
Expand Down
6 changes: 5 additions & 1 deletion internal/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (s *serviceImpl) Submit(ctx context.Context, payload *goastorage.SubmitPayl
return nil, goastorage.MakeNotValid(errors.New("cannot perform operation"))
}

_, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{AIPID: aipID})
_, err = InitStorageUploadWorkflow(ctx, s.tc, &StorageUploadWorkflowRequest{
AIPID: aipID,
TaskQueue: s.config.TaskQueue,
})
if err != nil {
return nil, goastorage.MakeNotAvailable(errors.New("cannot perform operation"))
}
Expand Down Expand Up @@ -198,6 +201,7 @@ func (s *serviceImpl) Move(ctx context.Context, payload *goastorage.MovePayload)
_, err = InitStorageMoveWorkflow(ctx, s.tc, &StorageMoveWorkflowRequest{
AIPID: pkg.AipID,
LocationID: payload.LocationID,
TaskQueue: s.config.TaskQueue,
})
if err != nil {
s.logger.Error(err, "error initializing move workflow")
Expand Down
29 changes: 22 additions & 7 deletions internal/storage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setUpService(t *testing.T, attrs *setUpAttrs) storage.Service {
params := setUpAttrs{
logger: ref.New(logr.Discard()),
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: "file://" + td.Path(),
},
Expand Down Expand Up @@ -168,7 +169,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
nil,
Expand All @@ -191,7 +195,6 @@ func TestServiceSubmit(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()

attrs.temporalClientMock.
On(
"ExecuteWorkflow",
Expand All @@ -202,7 +205,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -249,7 +255,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{
AIPID: AIPID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -285,6 +294,7 @@ func TestServiceSubmit(t *testing.T) {

attrs := setUpAttrs{
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: fmt.Sprintf(
"file://%s?base_url=file://tmp/dir&secret_key_path=fake/signing.key",
Expand All @@ -307,7 +317,10 @@ func TestServiceSubmit(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: aipID},
&storage.StorageUploadWorkflowRequest{
AIPID: aipID,
TaskQueue: "global",
},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1085,7 +1098,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
nil,
Expand Down Expand Up @@ -1132,7 +1145,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1212,6 +1225,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down Expand Up @@ -1253,6 +1267,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand All @@ -21,12 +19,14 @@ const (
)

type StorageUploadWorkflowRequest struct {
AIPID uuid.UUID
AIPID uuid.UUID
TaskQueue string
}

type StorageMoveWorkflowRequest struct {
AIPID uuid.UUID
LocationID uuid.UUID
TaskQueue string
}

type CopyToPermanentLocationActivityParams struct {
Expand All @@ -42,7 +42,7 @@ func InitStorageUploadWorkflow(ctx context.Context, tc temporalsdk_client.Client

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageUploadWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageUploadWorkflowName, req)
Expand All @@ -54,7 +54,7 @@ func InitStorageMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client,

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageMoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageMoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestStorageMoveWorkflow(t *testing.T) {
storage.StorageMoveWorkflowRequest{
AIPID: aipID,
LocationID: locationID,
TaskQueue: "global",
},
)

Expand Down
2 changes: 0 additions & 2 deletions internal/temporal/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
// There are task queues used by our workflow and activity workers. It may
// be convenient to make these configurable in the future .
GlobalTaskQueue = "global"
A3mWorkerTaskQueue = "a3m"
)
Expand Down
12 changes: 7 additions & 5 deletions internal/workflow/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
)

type MoveWorkflow struct {
logger logr.Logger
pkgsvc package_.Service
logger logr.Logger
pkgsvc package_.Service
taskQueue string
}

func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service) *MoveWorkflow {
func NewMoveWorkflow(logger logr.Logger, pkgsvc package_.Service, taskQueue string) *MoveWorkflow {
return &MoveWorkflow{
logger: logger,
pkgsvc: pkgsvc,
logger: logger,
pkgsvc: pkgsvc,
taskQueue: taskQueue,
}
}

Expand Down
4 changes: 3 additions & 1 deletion internal/workflow/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/artefactual-sdps/enduro/internal/package_"
packagefake "github.com/artefactual-sdps/enduro/internal/package_/fake"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)

Expand Down Expand Up @@ -45,7 +46,7 @@ func (s *MoveWorkflowTestSuite) SetupTest() {
temporalsdk_activity.RegisterOptions{Name: activities.PollMoveToPermanentStorageActivityName},
)

s.workflow = NewMoveWorkflow(logger, pkgsvc)
s.workflow = NewMoveWorkflow(logger, pkgsvc, "global")
}

func (s *MoveWorkflowTestSuite) AfterTest(suiteName, testName string) {
Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *MoveWorkflowTestSuite) TestSuccessfulMove() {
ID: pkgID,
AIPID: AIPID,
LocationID: locationID,
TaskQueue: temporal.GlobalTaskQueue,
},
)

Expand Down
13 changes: 10 additions & 3 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/artefactual-sdps/enduro/internal/fsutil"
"github.com/artefactual-sdps/enduro/internal/package_"
"github.com/artefactual-sdps/enduro/internal/ref"
"github.com/artefactual-sdps/enduro/internal/temporal"
"github.com/artefactual-sdps/enduro/internal/watcher"
"github.com/artefactual-sdps/enduro/internal/workflow/activities"
)
Expand Down Expand Up @@ -74,6 +73,12 @@ type TransferInfo struct {
//
// It is populated by createPreservationActionLocalActivity .
PreservationActionID uint

// Identifier of the preservation system task queue name
//
// It is populated by the workflow request.
GlobalTaskQueue string
A3mTaskQueue string
Diogenesoftoronto marked this conversation as resolved.
Show resolved Hide resolved
}

func (t *TransferInfo) Name() string {
Expand All @@ -92,7 +97,9 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
logger = temporalsdk_workflow.GetLogger(ctx)

tinfo = &TransferInfo{
req: *req,
req: *req,
GlobalTaskQueue: req.TaskQueue,
A3mTaskQueue: req.A3mTaskQueue,
}

// Package status. All packages start in queued status.
Expand Down Expand Up @@ -166,7 +173,7 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *pack
for attempt := 1; attempt <= maxAttempts; attempt++ {
activityOpts := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
TaskQueue: temporal.A3mWorkerTaskQueue,
TaskQueue: tinfo.A3mTaskQueue,
})
sessCtx, err := temporalsdk_workflow.CreateSession(activityOpts, &temporalsdk_workflow.SessionOptions{
CreationTimeout: forever,
Expand Down
Loading