Skip to content

Commit

Permalink
Make task queues configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogenesoftoronto committed Sep 28, 2023
1 parent 7c969a9 commit f3b1e36
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 62 deletions.
12 changes: 10 additions & 2 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/pprof"
"os"
"os/signal"
"reflect"
"syscall"
"time"

Expand Down Expand Up @@ -56,6 +57,13 @@ func main() {
fmt.Printf("Failed to read configuration: %v\n", err)
os.Exit(1)
}
// Set Defaults for the a3m and global task queues
if reflect.ValueOf(cfg.A3m.TaskQueue).IsZero() {
cfg.A3m.TaskQueue = temporal.A3mWorkerTaskQueue
}
if reflect.ValueOf(cfg.Temporal.TaskQueue).IsZero() {
cfg.A3m.TaskQueue = temporal.GlobalTaskQueue
}

logger := log.New(os.Stderr,
log.WithName(appName),
Expand Down Expand Up @@ -102,7 +110,7 @@ func main() {
// Set up the package service.
var pkgsvc package_.Service
{
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil)
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil, cfg.Temporal.TaskQueue)
}

// Set up the watcher service.
Expand All @@ -126,7 +134,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
7 changes: 4 additions & 3 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Address string
Name string
Name string
ShareDir string
TaskQueue string
Address string
Processing
ShareDir string
}

// The `Processing` struct represents a configuration for processing various tasks in the transferservice.
Expand Down
7 changes: 4 additions & 3 deletions internal/package_/goa.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ func (w *goaWrapper) Move(ctx context.Context, payload *goapackage.MovePayload)
}

_, err = InitMoveWorkflow(ctx, w.tc, &MoveWorkflowRequest{
ID: payload.ID,
AIPID: *goapkg.AipID,
LocationID: payload.LocationID,
ID: payload.ID,
AIPID: *goapkg.AipID,
LocationID: payload.LocationID,
GlobalTaskQueue: w.TaskQueue,
})
if err != nil {
w.logger.Error(err, "error initializing move workflow")
Expand Down
4 changes: 3 additions & 1 deletion internal/package_/package_.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ type packageImpl struct {
evsvc event.EventService
tokenVerifier auth.TokenVerifier
ticketProvider *auth.TicketProvider
TaskQueue string
}

var _ Service = (*packageImpl)(nil)

func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider) *packageImpl {
func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider, taskqueue string) *packageImpl {
return &packageImpl{
logger: logger,
db: sqlx.NewDb(db, "mysql"),
tc: tc,
evsvc: evsvc,
tokenVerifier: tokenVerifier,
ticketProvider: ticketProvider,
TaskQueue: taskqueue,
}
}

Expand Down
17 changes: 10 additions & 7 deletions internal/package_/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand Down Expand Up @@ -60,6 +58,10 @@ type ProcessingWorkflowRequest struct {

// Location identifier for storing auto approved AIPs.
DefaultPermanentLocationID *uuid.UUID

// The global task queue used for starting the workflow
GlobalTaskQueue string
A3mTaskQueue string
}

func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *ProcessingWorkflowRequest) error {
Expand All @@ -72,7 +74,7 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r

opts := temporalsdk_client.StartWorkflowOptions{
ID: req.WorkflowID,
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.GlobalTaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
_, err := tc.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)
Expand All @@ -81,9 +83,10 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r
}

type MoveWorkflowRequest struct {
ID uint
AIPID string
LocationID uuid.UUID
ID uint
AIPID string
LocationID uuid.UUID
GlobalTaskQueue string
}

func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *MoveWorkflowRequest) (temporalsdk_client.WorkflowRun, error) {
Expand All @@ -92,7 +95,7 @@ func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *Mo

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", MoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.GlobalTaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
exec, err := tc.ExecuteWorkflow(ctx, opts, MoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

type Config struct {
TaskQueue string
EnduroAddress string
Internal LocationConfig
Database Database
Expand Down
9 changes: 5 additions & 4 deletions internal/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *serviceImpl) Update(ctx context.Context, payload *goastorage.UpdatePayl

signal := UploadDoneSignal{}
workflowID := fmt.Sprintf("%s-%s", StorageUploadWorkflowName, aipID)
err = s.tc.SignalWorkflow(ctx, workflowID, "", UploadDoneSignalName, signal)
err = s.tc.SignalWorkflow(ctx, workflowID, s.config.TaskQueue, UploadDoneSignalName, signal)
if err != nil {
return goastorage.MakeNotAvailable(errors.New("cannot perform operation"))
}
Expand Down Expand Up @@ -196,8 +196,9 @@ func (s *serviceImpl) Move(ctx context.Context, payload *goastorage.MovePayload)
}

_, err = InitStorageMoveWorkflow(ctx, s.tc, &StorageMoveWorkflowRequest{
AIPID: pkg.AipID,
LocationID: payload.LocationID,
AIPID: pkg.AipID,
LocationID: payload.LocationID,
GlobalTaskQueue: s.config.TaskQueue,
})
if err != nil {
s.logger.Error(err, "error initializing move workflow")
Expand All @@ -218,7 +219,7 @@ func (s *serviceImpl) MoveStatus(ctx context.Context, payload *goastorage.MoveSt
return nil, err
}

resp, err := s.tc.DescribeWorkflowExecution(ctx, fmt.Sprintf("%s-%s", StorageMoveWorkflowName, p.AipID), "")
resp, err := s.tc.DescribeWorkflowExecution(ctx, fmt.Sprintf("%s-%s", StorageMoveWorkflowName, p.AipID), s.config.TaskQueue)
if err != nil {
return nil, goastorage.MakeFailedDependency(errors.New("cannot perform operation"))
}
Expand Down
39 changes: 21 additions & 18 deletions internal/storage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setUpService(t *testing.T, attrs *setUpAttrs) storage.Service {
params := setUpAttrs{
logger: ref.New(logr.Discard()),
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: "file://" + td.Path(),
},
Expand Down Expand Up @@ -164,11 +165,11 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
TaskQueue: "",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{AIPID: AIPID, GlobalTaskQueue: ""},
).
Return(
nil,
Expand All @@ -191,18 +192,18 @@ func TestServiceSubmit(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()

// attrs.config.TaskQueue = "global"
attrs.temporalClientMock.
On(
"ExecuteWorkflow",
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
TaskQueue: "",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{AIPID: AIPID, GlobalTaskQueue: ""},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -245,11 +246,11 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
TaskQueue: "",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: AIPID},
&storage.StorageUploadWorkflowRequest{AIPID: AIPID, GlobalTaskQueue: ""},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -303,11 +304,11 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + aipID.String(),
TaskQueue: "global",
TaskQueue: "",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
&storage.StorageUploadWorkflowRequest{AIPID: aipID},
&storage.StorageUploadWorkflowRequest{AIPID: aipID, GlobalTaskQueue: ""},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -923,7 +924,7 @@ func TestServiceUpdate(t *testing.T) {
"SignalWorkflow",
ctx,
"storage-upload-workflow-"+AIPID.String(),
"",
"global",
"upload-done-signal",
storage.UploadDoneSignal{},
).
Expand Down Expand Up @@ -952,7 +953,7 @@ func TestServiceUpdate(t *testing.T) {
"SignalWorkflow",
ctx,
"storage-upload-workflow-"+AIPID.String(),
"",
"global",
"upload-done-signal",
storage.UploadDoneSignal{},
).
Expand Down Expand Up @@ -993,7 +994,7 @@ func TestServiceUpdate(t *testing.T) {
"SignalWorkflow",
ctx,
"storage-upload-workflow-"+AIPID.String(),
"",
"global",
"upload-done-signal",
storage.UploadDoneSignal{},
).
Expand Down Expand Up @@ -1085,7 +1086,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, GlobalTaskQueue: "global"},
).
Return(
nil,
Expand Down Expand Up @@ -1132,7 +1133,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, GlobalTaskQueue: "global"},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1212,13 +1213,14 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
"DescribeWorkflowExecution",
ctx,
"storage-move-workflow-"+AIPID.String(),
"",
"global",
).
Return(
nil,
Expand Down Expand Up @@ -1253,13 +1255,14 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
"DescribeWorkflowExecution",
ctx,
"storage-move-workflow-"+AIPID.String(),
"",
"global",
).
Return(
&temporalapi_workflowservice.DescribeWorkflowExecutionResponse{
Expand Down Expand Up @@ -1304,7 +1307,7 @@ func TestServiceMoveStatus(t *testing.T) {
"DescribeWorkflowExecution",
ctx,
"storage-move-workflow-"+AIPID.String(),
"",
"global",
).
Return(
&temporalapi_workflowservice.DescribeWorkflowExecutionResponse{
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func TestServiceMoveStatus(t *testing.T) {
"DescribeWorkflowExecution",
ctx,
"storage-move-workflow-"+AIPID.String(),
"",
"global",
).
Return(
&temporalapi_workflowservice.DescribeWorkflowExecutionResponse{
Expand Down
14 changes: 7 additions & 7 deletions internal/storage/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand All @@ -21,12 +19,14 @@ const (
)

type StorageUploadWorkflowRequest struct {
AIPID uuid.UUID
AIPID uuid.UUID
GlobalTaskQueue string
}

type StorageMoveWorkflowRequest struct {
AIPID uuid.UUID
LocationID uuid.UUID
AIPID uuid.UUID
LocationID uuid.UUID
GlobalTaskQueue string
}

type CopyToPermanentLocationActivityParams struct {
Expand All @@ -42,7 +42,7 @@ func InitStorageUploadWorkflow(ctx context.Context, tc temporalsdk_client.Client

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageUploadWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.GlobalTaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageUploadWorkflowName, req)
Expand All @@ -54,7 +54,7 @@ func InitStorageMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client,

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageMoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.GlobalTaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageMoveWorkflowName, req)
Expand Down
Loading

0 comments on commit f3b1e36

Please sign in to comment.