Skip to content

Commit

Permalink
Make task queues configurable
Browse files Browse the repository at this point in the history
- Change fields in structs to taskQueue from Global
- Update internal/package_/workflow.go
- Remove empty string fields in structs
- Remove taskqueues from parameters
- Update service_test.go
- Move security comment to right place
  • Loading branch information
Diogenesoftoronto committed Oct 17, 2023
1 parent f91645f commit 9592b11
Show file tree
Hide file tree
Showing 21 changed files with 76 additions and 56 deletions.
4 changes: 2 additions & 2 deletions cmd/enduro-a3m-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func main() {
// Set up the package service.
var pkgsvc package_.Service
{
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil)
pkgsvc = package_.NewService(logger.WithName("package"), enduroDatabase, temporalClient, evsvc, &auth.NoopTokenVerifier{}, nil, cfg.Temporal.TaskQueue)
}

// Set up the watcher service.
Expand All @@ -126,7 +126,7 @@ func main() {
MaxConcurrentSessionExecutionSize: 1000,
MaxConcurrentActivityExecutionSize: 1,
}
w := temporalsdk_worker.New(temporalClient, temporal.A3mWorkerTaskQueue, workerOpts)
w := temporalsdk_worker.New(temporalClient, cfg.A3m.TaskQueue, workerOpts)
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
os.Exit(1)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ require (
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.143.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1467,8 +1467,8 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -1581,8 +1581,8 @@ golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -1772,8 +1772,8 @@ golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
16 changes: 6 additions & 10 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:o
github.com/apparentlymart/go-textseg v1.0.0 h1:rRmlIsPEEhUTIKQb7T++Nz/A5Q6C9IuX2wFoYVvnCs0=
github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/Nj9VFpLOpjS5yuumk=
github.com/armon/go-metrics v0.4.0/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/aws/aws-sdk-go-v2/service/kms v1.24.1/go.mod h1:yrlimpsAJc9fXj3jHC7Ig2Zb4iMAoSJ/VVzChf22dZk=
Expand Down Expand Up @@ -180,6 +181,7 @@ github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRC
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/microsoft/go-mssqldb v1.0.0/go.mod h1:+4wZTUnz/SV6nffv+RRRB/ss8jPng5Sho2SmM1l2ts4=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand All @@ -198,6 +200,7 @@ github.com/mutecomm/go-sqlcipher/v4 v4.4.0/go.mod h1:PyN04SaWalavxRGH9E8ZftG6Ju7
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nakagami/firebirdsql v0.0.0-20190310045651-3c02a58cfed8/go.mod h1:86wM1zFnC6/uDBfZGNwB65O+pR2OFi5q/YQaEUid1qA=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/neo4j/neo4j-go-driver v1.8.1-0.20200803113522-b626aa943eba/go.mod h1:ncO5VaFWh0Nrt+4KT4mOZboaczBZcLuHrG+/sUeP8gI=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
Expand Down Expand Up @@ -291,18 +294,12 @@ golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220826181053-bd7e27e6170d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.3.1-0.20221117191849-2c476679df9a/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/net v0.0.0-20180811021610-c39426892332/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20220826154423-83b083e8dc8b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -314,13 +311,12 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750=
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0=
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230720185612-659f7aaaa771/go.mod h1:3QoBVwTHkXbY1oRGzlhwhOykfcATQN43LJ6iT8Wy8kE=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:qDbnxtViX5J6CvFbxeNUSzKgVlDLJ/6L+caxye9+Flo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
7 changes: 4 additions & 3 deletions internal/a3m/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package a3m
import transferservice "buf.build/gen/go/artefactual/a3m/protocolbuffers/go/a3m/api/transferservice/v1beta1"

type Config struct {
Address string
Name string
Name string
ShareDir string
TaskQueue string
Address string
Processing
ShareDir string
}

// The `Processing` struct represents a configuration for processing various tasks in the transferservice.
Expand Down
2 changes: 2 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ func Read(config *Configuration, configFile string) (found bool, configFileUsed
v.AddConfigPath("/etc")
v.SetConfigName("enduro")
v.SetDefault("api.processing", a3m.ProcessingDefault)
v.SetDefault("a3m.taskqueue", temporal.A3mWorkerTaskQueue)
v.SetDefault("temporal.taskqueue", temporal.GlobalTaskQueue)
v.SetDefault("debugListen", "127.0.0.1:9001")
v.SetDefault("api.listen", "127.0.0.1:9000")
v.SetEnvPrefix("enduro")
Expand Down
1 change: 1 addition & 0 deletions internal/package_/goa.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func (w *goaWrapper) Move(ctx context.Context, payload *goapackage.MovePayload)
ID: payload.ID,
AIPID: *goapkg.AipID,
LocationID: payload.LocationID,
TaskQueue: w.taskQueue,
})
if err != nil {
w.logger.Error(err, "error initializing move workflow")
Expand Down
4 changes: 3 additions & 1 deletion internal/package_/package_.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ type packageImpl struct {
evsvc event.EventService
tokenVerifier auth.TokenVerifier
ticketProvider *auth.TicketProvider
taskQueue string
}

var _ Service = (*packageImpl)(nil)

func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider) *packageImpl {
func NewService(logger logr.Logger, db *sql.DB, tc temporalsdk_client.Client, evsvc event.EventService, tokenVerifier auth.TokenVerifier, ticketProvider *auth.TicketProvider, taskqueue string) *packageImpl {
return &packageImpl{
logger: logger,
db: sqlx.NewDb(db, "mysql"),
tc: tc,
evsvc: evsvc,
tokenVerifier: tokenVerifier,
ticketProvider: ticketProvider,
taskQueue: taskqueue,
}
}

Expand Down
11 changes: 7 additions & 4 deletions internal/package_/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand Down Expand Up @@ -60,6 +58,10 @@ type ProcessingWorkflowRequest struct {

// Location identifier for storing auto approved AIPs.
DefaultPermanentLocationID *uuid.UUID

// Task queues used for starting new workflows.
TaskQueue string
A3mTaskQueue string
}

func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *ProcessingWorkflowRequest) error {
Expand All @@ -72,7 +74,7 @@ func InitProcessingWorkflow(ctx context.Context, tc temporalsdk_client.Client, r

opts := temporalsdk_client.StartWorkflowOptions{
ID: req.WorkflowID,
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
_, err := tc.ExecuteWorkflow(ctx, opts, ProcessingWorkflowName, req)
Expand All @@ -84,6 +86,7 @@ type MoveWorkflowRequest struct {
ID uint
AIPID string
LocationID uuid.UUID
TaskQueue string
}

func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *MoveWorkflowRequest) (temporalsdk_client.WorkflowRun, error) {
Expand All @@ -92,7 +95,7 @@ func InitMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client, req *Mo

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", MoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
exec, err := tc.ExecuteWorkflow(ctx, opts, MoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

type Config struct {
TaskQueue string
EnduroAddress string
Internal LocationConfig
Database Database
Expand Down
1 change: 1 addition & 0 deletions internal/storage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (s *serviceImpl) Move(ctx context.Context, payload *goastorage.MovePayload)
_, err = InitStorageMoveWorkflow(ctx, s.tc, &StorageMoveWorkflowRequest{
AIPID: pkg.AipID,
LocationID: payload.LocationID,
TaskQueue: s.config.TaskQueue,
})
if err != nil {
s.logger.Error(err, "error initializing move workflow")
Expand Down
12 changes: 5 additions & 7 deletions internal/storage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func setUpService(t *testing.T, attrs *setUpAttrs) storage.Service {
params := setUpAttrs{
logger: ref.New(logr.Discard()),
config: &storage.Config{
TaskQueue: "global",
Internal: storage.LocationConfig{
URL: "file://" + td.Path(),
},
Expand Down Expand Up @@ -164,7 +165,6 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
Expand All @@ -191,14 +191,12 @@ func TestServiceSubmit(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()

attrs.temporalClientMock.
On(
"ExecuteWorkflow",
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
Expand Down Expand Up @@ -245,7 +243,6 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + AIPID.String(),
TaskQueue: "global",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
Expand Down Expand Up @@ -303,7 +300,6 @@ func TestServiceSubmit(t *testing.T) {
mock.AnythingOfType("*context.timerCtx"),
temporalsdk_client.StartWorkflowOptions{
ID: "storage-upload-workflow-" + aipID.String(),
TaskQueue: "global",
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-upload-workflow",
Expand Down Expand Up @@ -1085,7 +1081,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
nil,
Expand Down Expand Up @@ -1132,7 +1128,7 @@ func TestServiceMove(t *testing.T) {
WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
},
"storage-move-workflow",
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID},
&storage.StorageMoveWorkflowRequest{AIPID: AIPID, LocationID: LocationID, TaskQueue: "global"},
).
Return(
&temporalsdk_mocks.WorkflowRun{},
Expand Down Expand Up @@ -1212,6 +1208,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down Expand Up @@ -1253,6 +1250,7 @@ func TestServiceMoveStatus(t *testing.T) {
attrs := &setUpAttrs{}
svc := setUpService(t, attrs)
ctx := context.Background()
attrs.config.TaskQueue = "global"

attrs.temporalClientMock.
On(
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/google/uuid"
temporalsdk_api_enums "go.temporal.io/api/enums/v1"
temporalsdk_client "go.temporal.io/sdk/client"

"github.com/artefactual-sdps/enduro/internal/temporal"
)

const (
Expand All @@ -21,12 +19,14 @@ const (
)

type StorageUploadWorkflowRequest struct {
AIPID uuid.UUID
AIPID uuid.UUID
TaskQueue string
}

type StorageMoveWorkflowRequest struct {
AIPID uuid.UUID
LocationID uuid.UUID
TaskQueue string
}

type CopyToPermanentLocationActivityParams struct {
Expand All @@ -42,7 +42,7 @@ func InitStorageUploadWorkflow(ctx context.Context, tc temporalsdk_client.Client

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageUploadWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageUploadWorkflowName, req)
Expand All @@ -54,7 +54,7 @@ func InitStorageMoveWorkflow(ctx context.Context, tc temporalsdk_client.Client,

opts := temporalsdk_client.StartWorkflowOptions{
ID: fmt.Sprintf("%s-%s", StorageMoveWorkflowName, req.AIPID),
TaskQueue: temporal.GlobalTaskQueue,
TaskQueue: req.TaskQueue,
WorkflowIDReusePolicy: temporalsdk_api_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
return tc.ExecuteWorkflow(ctx, opts, StorageMoveWorkflowName, req)
Expand Down
1 change: 1 addition & 0 deletions internal/storage/workflows/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestStorageMoveWorkflow(t *testing.T) {
storage.StorageMoveWorkflowRequest{
AIPID: aipID,
LocationID: locationID,
TaskQueue: "global",
},
)

Expand Down
2 changes: 0 additions & 2 deletions internal/temporal/temporal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
// There are task queues used by our workflow and activity workers. It may
// be convenient to make these configurable in the future .
GlobalTaskQueue = "global"
A3mWorkerTaskQueue = "a3m"
)
Expand Down
Loading

0 comments on commit 9592b11

Please sign in to comment.