From 30114964061680fbba86ad65e6e1f79418500775 Mon Sep 17 00:00:00 2001 From: Irven Aelbrecht Date: Wed, 1 Feb 2023 15:53:14 +0100 Subject: [PATCH] :bug: fix multiple args in workflow --- README.md | 8 ++++---- scenarios/basic-const12k.json | 4 ++-- scenarios/basic-payload.json | 4 ++-- scenarios/basic-spike.json | 4 ++-- scenarios/basic-test.json | 4 ++-- worker/bench/driver_activity.go | 7 ++++--- worker/bench/payload.go | 31 ++++++++++++++++++------------- worker/bench/payload_test.go | 22 ++++++++++++++-------- worker/bench/workflow.go | 2 +- 9 files changed, 49 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 5e81419..36abe35 100644 --- a/README.md +++ b/README.md @@ -148,10 +148,10 @@ You can tweak the parameters of the benchmark scenario by adjusting the JSON fil "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3, "parallelCount": 1 - } + }] }, "report": { "intervalInSeconds": 10 @@ -181,11 +181,11 @@ Here is the definition of the workflow in `./scenarios/basic-payload.json`: "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3, "payload": "$RANDOM(100)", "resultPayload": "$RANDOM_NORM(80,10)" - } + }] } ``` diff --git a/scenarios/basic-const12k.json b/scenarios/basic-const12k.json index ae45b5f..c48c339 100644 --- a/scenarios/basic-const12k.json +++ b/scenarios/basic-const12k.json @@ -7,9 +7,9 @@ "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3 - } + }] }, "report": { "intervalInSeconds": 10 diff --git a/scenarios/basic-payload.json b/scenarios/basic-payload.json index ad8bf03..8be105f 100644 --- a/scenarios/basic-payload.json +++ b/scenarios/basic-payload.json @@ -6,10 +6,10 @@ "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3, "payload": "$RANDOM(100)", "resultPayload": "$RANDOM_NORM(80,10)" - } + }] } } diff --git a/scenarios/basic-spike.json b/scenarios/basic-spike.json index bde022a..b0558a3 100644 --- a/scenarios/basic-spike.json +++ b/scenarios/basic-spike.json @@ -13,9 +13,9 @@ "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3 - } + }] }, "report": { "intervalInSeconds": 5 diff --git a/scenarios/basic-test.json b/scenarios/basic-test.json index f9fed33..0f8e9ba 100644 --- a/scenarios/basic-test.json +++ b/scenarios/basic-test.json @@ -6,8 +6,8 @@ "workflow": { "name": "basic-workflow", "taskQueue": "temporal-basic", - "args": { + "args": [{ "sequenceCount": 3 - } + }] } } diff --git a/worker/bench/driver_activity.go b/worker/bench/driver_activity.go index 81e682f..3cfb191 100644 --- a/worker/bench/driver_activity.go +++ b/worker/bench/driver_activity.go @@ -25,12 +25,13 @@ package bench import ( "context" "fmt" + "time" + "github.com/pkg/errors" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" "golang.org/x/time/rate" - "time" ) func (a *Activities) DriverActivity(ctx context.Context, request benchDriverActivityRequest) error { @@ -51,7 +52,7 @@ type ( BaseID string BatchSize int Rate int - Parameters interface{} + Parameters []interface{} } benchDriver struct { ctx context.Context @@ -118,7 +119,7 @@ func (d *benchDriver) execute(iterationID int) error { WorkflowExecutionTimeout: 30 * time.Minute, WorkflowTaskTimeout: defaultWorkflowTaskStartToCloseTimeoutDuration, } - _, err := d.client.ExecuteWorkflow(d.ctx, startOptions, d.request.WorkflowName, buildPayload(d.request.Parameters)) + _, err := d.client.ExecuteWorkflow(d.ctx, startOptions, d.request.WorkflowName, buildPayload(d.request.Parameters)...) if err != nil { d.logger.Error("failed to start workflow", "Error", err, "ID", workflowID) } diff --git a/worker/bench/payload.go b/worker/bench/payload.go index d1fd387..719993b 100644 --- a/worker/bench/payload.go +++ b/worker/bench/payload.go @@ -28,22 +28,27 @@ import ( "strconv" ) -func buildPayload(params interface{}) interface{} { - olds, ok := params.(map[string]interface{}) - if !ok { - return params - } - news := map[string]interface{}{} - for k, v := range olds { - if str, ok := v.(string); ok { - if newValue, ok := eval(str); ok { - news[k] = newValue - continue +func buildPayload(paramsArr []interface{}) []interface{} { + output := []interface{}{} + for _, params := range paramsArr { + olds, ok := params.(map[string]interface{}) + if !ok { + output = append(output, params) + continue + } + news := map[string]interface{}{} + for k, v := range olds { + if str, ok := v.(string); ok { + if newValue, ok := eval(str); ok { + news[k] = newValue + continue + } } + news[k] = v } - news[k] = v + output = append(output, news) } - return news + return output } var randomRegex = regexp.MustCompile(`\$RANDOM\(([0-9]+)\)`) diff --git a/worker/bench/payload_test.go b/worker/bench/payload_test.go index dbc446c..1dec480 100644 --- a/worker/bench/payload_test.go +++ b/worker/bench/payload_test.go @@ -28,28 +28,34 @@ import ( ) func TestBuildParametersNoChangeInUnrelatedStruct(t *testing.T) { - x := map[string]interface{}{"Hello": 123} + input := map[string]interface{}{"Hello": 123} + x := []interface{}{input} actual := buildPayload(x) assert.Equal(t, x, actual) } func TestBuildParametersNoChangeInFixedPayload(t *testing.T) { - x := map[string]interface{}{"payload": "123"} + input := map[string]interface{}{"payload": "123"} + x := []interface{}{input} actual := buildPayload(x) assert.Equal(t, x, actual) } func TestBuildParametersRandomPayload(t *testing.T) { - x := map[string]interface{}{"payload": "$RANDOM(10)"} - y := buildPayload(x).(map[string]interface{}) - assert.NotEqual(t, x["payload"], y["payload"]) + input := map[string]interface{}{"payload": "$RANDOM(10)"} + x := []interface{}{input} + temp := buildPayload(x) + y := temp[0].(map[string]interface{}) + assert.NotEqual(t, input["payload"], y["payload"]) assert.Equal(t, 10, len(y["payload"].(string))) } func TestBuildParametersRandomNormalPayload(t *testing.T) { - x := map[string]interface{}{"payload": "$RANDOM_NORM(80,10)"} - y := buildPayload(x).(map[string]interface{}) - assert.NotEqual(t, x["payload"], y["payload"]) + input := map[string]interface{}{"payload": "$RANDOM_NORM(80,10)"} + x := []interface{}{input} + temp := buildPayload(x) + y := temp[0].(map[string]interface{}) + assert.NotEqual(t, input["payload"], y["payload"]) actual := len(y["payload"].(string)) assert.Greater(t, actual, 0) assert.Less(t, actual, 160) diff --git a/worker/bench/workflow.go b/worker/bench/workflow.go index a862cbe..dfe4370 100644 --- a/worker/bench/workflow.go +++ b/worker/bench/workflow.go @@ -70,7 +70,7 @@ type ( // TaskQueue is the name of the task queue to use when executing the workflow. TaskQueue string `json:"taskqueue"` // Args is the argument that should be the input of all executions of the workflow under test. - Args interface{} `json:"args"` + Args []interface{} `json:"args"` } benchWorkflowRequestReporting struct { // IntervalInSeconds defines the granularity of the result histogram.