Skip to content
This repository has been archived by the owner on Oct 9, 2024. It is now read-only.

fix multiple args in workflow #60

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ You can tweak the parameters of the benchmark scenario by adjusting the JSON fil
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3,
"parallelCount": 1
}
}]
},
"report": {
"intervalInSeconds": 10
Expand Down Expand Up @@ -181,11 +181,11 @@ Here is the definition of the workflow in `./scenarios/basic-payload.json`:
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3,
"payload": "$RANDOM(100)",
"resultPayload": "$RANDOM_NORM(80,10)"
}
}]
}
```

Expand Down
4 changes: 2 additions & 2 deletions scenarios/basic-const12k.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3
}
}]
},
"report": {
"intervalInSeconds": 10
Expand Down
4 changes: 2 additions & 2 deletions scenarios/basic-payload.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3,
"payload": "$RANDOM(100)",
"resultPayload": "$RANDOM_NORM(80,10)"
}
}]
}
}
4 changes: 2 additions & 2 deletions scenarios/basic-spike.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3
}
}]
},
"report": {
"intervalInSeconds": 5
Expand Down
4 changes: 2 additions & 2 deletions scenarios/basic-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
"workflow": {
"name": "basic-workflow",
"taskQueue": "temporal-basic",
"args": {
"args": [{
"sequenceCount": 3
}
}]
}
}
7 changes: 4 additions & 3 deletions worker/bench/driver_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ package bench
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/log"
"golang.org/x/time/rate"
"time"
)

func (a *Activities) DriverActivity(ctx context.Context, request benchDriverActivityRequest) error {
Expand All @@ -51,7 +52,7 @@ type (
BaseID string
BatchSize int
Rate int
Parameters interface{}
Parameters []interface{}
}
benchDriver struct {
ctx context.Context
Expand Down Expand Up @@ -118,7 +119,7 @@ func (d *benchDriver) execute(iterationID int) error {
WorkflowExecutionTimeout: 30 * time.Minute,
WorkflowTaskTimeout: defaultWorkflowTaskStartToCloseTimeoutDuration,
}
_, err := d.client.ExecuteWorkflow(d.ctx, startOptions, d.request.WorkflowName, buildPayload(d.request.Parameters))
_, err := d.client.ExecuteWorkflow(d.ctx, startOptions, d.request.WorkflowName, buildPayload(d.request.Parameters)...)
if err != nil {
d.logger.Error("failed to start workflow", "Error", err, "ID", workflowID)
}
Expand Down
31 changes: 18 additions & 13 deletions worker/bench/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,27 @@ import (
"strconv"
)

func buildPayload(params interface{}) interface{} {
olds, ok := params.(map[string]interface{})
if !ok {
return params
}
news := map[string]interface{}{}
for k, v := range olds {
if str, ok := v.(string); ok {
if newValue, ok := eval(str); ok {
news[k] = newValue
continue
func buildPayload(paramsArr []interface{}) []interface{} {
output := []interface{}{}
for _, params := range paramsArr {
olds, ok := params.(map[string]interface{})
if !ok {
output = append(output, params)
continue
}
news := map[string]interface{}{}
for k, v := range olds {
if str, ok := v.(string); ok {
if newValue, ok := eval(str); ok {
news[k] = newValue
continue
}
}
news[k] = v
}
news[k] = v
output = append(output, news)
}
return news
return output
}

var randomRegex = regexp.MustCompile(`\$RANDOM\(([0-9]+)\)`)
Expand Down
22 changes: 14 additions & 8 deletions worker/bench/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,34 @@ import (
)

func TestBuildParametersNoChangeInUnrelatedStruct(t *testing.T) {
x := map[string]interface{}{"Hello": 123}
input := map[string]interface{}{"Hello": 123}
x := []interface{}{input}
actual := buildPayload(x)
assert.Equal(t, x, actual)
}

func TestBuildParametersNoChangeInFixedPayload(t *testing.T) {
x := map[string]interface{}{"payload": "123"}
input := map[string]interface{}{"payload": "123"}
x := []interface{}{input}
actual := buildPayload(x)
assert.Equal(t, x, actual)
}

func TestBuildParametersRandomPayload(t *testing.T) {
x := map[string]interface{}{"payload": "$RANDOM(10)"}
y := buildPayload(x).(map[string]interface{})
assert.NotEqual(t, x["payload"], y["payload"])
input := map[string]interface{}{"payload": "$RANDOM(10)"}
x := []interface{}{input}
temp := buildPayload(x)
y := temp[0].(map[string]interface{})
assert.NotEqual(t, input["payload"], y["payload"])
assert.Equal(t, 10, len(y["payload"].(string)))
}

func TestBuildParametersRandomNormalPayload(t *testing.T) {
x := map[string]interface{}{"payload": "$RANDOM_NORM(80,10)"}
y := buildPayload(x).(map[string]interface{})
assert.NotEqual(t, x["payload"], y["payload"])
input := map[string]interface{}{"payload": "$RANDOM_NORM(80,10)"}
x := []interface{}{input}
temp := buildPayload(x)
y := temp[0].(map[string]interface{})
assert.NotEqual(t, input["payload"], y["payload"])
actual := len(y["payload"].(string))
assert.Greater(t, actual, 0)
assert.Less(t, actual, 160)
Expand Down
2 changes: 1 addition & 1 deletion worker/bench/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type (
// TaskQueue is the name of the task queue to use when executing the workflow.
TaskQueue string `json:"taskqueue"`
// Args is the argument that should be the input of all executions of the workflow under test.
Args interface{} `json:"args"`
Args []interface{} `json:"args"`
}
benchWorkflowRequestReporting struct {
// IntervalInSeconds defines the granularity of the result histogram.
Expand Down