Skip to content
This repository has been archived by the owner on Oct 9, 2024. It is now read-only.

Commit

Permalink
Separate workers and queues, parallel activities
Browse files Browse the repository at this point in the history
  • Loading branch information
mikhailshilkov committed Jul 12, 2021
1 parent 40d2d1b commit 17ae311
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 26 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ You can tweak the parameters of the benchmark scenario by adjusting the JSON fil
"workflow": {
"name": "basic-workflow",
"args": {
"sequenceCount": 3
"sequenceCount": 3,
"parallelCount": 1
}
},
"report": {
Expand Down
2 changes: 2 additions & 0 deletions helm-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ spec:
value: "{{ .Values.tests.numDecisionPollers }}"
- name: SKIP_NAMESPACE_CREATION
value: "{{ .Values.tests.skipNamespaceCreation }}"
- name: RUN_WORKERS
value: "{{ .Values.workers }}"

{{- with .Values.nodeSelector }}
nodeSelector:
Expand Down
3 changes: 3 additions & 0 deletions helm-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ image:
tag: latest
pullPolicy: IfNotPresent

# Which application workers to run
workers: "bench,basic,basic-act"

tests:
namespaceName: benchtest
namespaceRetention: "1"
Expand Down
2 changes: 1 addition & 1 deletion worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ COPY --from=builder /temporal-bench/bins/temporal-bench /usr/local/bin/

ENV NAMESPACE default
ENV FRONTEND_ADDRESS 127.0.0.1:7233
ENV NUM_DECISION_POLLERS 10
ENV NUM_DECISION_POLLERS 100
ENV TLS_CA_CERT_FILE ""
ENV TLS_CLIENT_CERT_FILE ""
ENV TLS_CLIENT_CERT_PRIVATE_KEY_FILE ""
Expand Down
7 changes: 5 additions & 2 deletions worker/bench/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ func NewActivities(temporalClient client.Client) *Activities {
return &Activities{temporalClient: temporalClient}
}

// taskQueue is the queue used by worker to pull workflow and activity tasks
const taskQueue = "temporal-bench"
// benchTaskQueue is the queue used by worker to pull workflow and activity tasks
const benchTaskQueue = "temporal-bench"

// targetTaskQueue is the queue used by worker to schedule target workflows
const targetTaskQueue = "temporal-basic"

// TestError represents an error that should abort / fail the whole bench test
type TestError struct {
Expand Down
4 changes: 2 additions & 2 deletions worker/bench/driver_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (d *benchDriver) execute(iterationID int) error {
workflowID := fmt.Sprintf("%s-%s-%d", d.request.WorkflowName, d.request.BaseID, iterationID)
startOptions := client.StartWorkflowOptions{
ID: workflowID,
TaskQueue: taskQueue,
WorkflowExecutionTimeout: 168 * time.Hour,
TaskQueue: targetTaskQueue,
WorkflowExecutionTimeout: 30 * time.Minute,
WorkflowTaskTimeout: defaultWorkflowTaskStartToCloseTimeoutDuration,
}
_, err := d.client.ExecuteWorkflow(d.ctx, startOptions, d.request.WorkflowName, buildPayload(d.request.Parameters))
Expand Down
2 changes: 1 addition & 1 deletion worker/bench/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (w *benchWorkflow) withActivityOptions() workflow.Context {
ao := workflow.ActivityOptions{
HeartbeatTimeout: 60 * time.Second,
StartToCloseTimeout: w.deadline.Sub(workflow.Now(w.ctx)),
TaskQueue: taskQueue,
TaskQueue: benchTaskQueue,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 50 * time.Millisecond,
BackoffCoefficient: 1.2,
Expand Down
39 changes: 31 additions & 8 deletions worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,45 @@ func startNamespaceWorker(
if err != nil {
logger.Fatal("failed to build temporal client", zap.Error(err))
}
defaultWorker := constructWorker(context.Background(), serviceClient, logger, "temporal-bench")

err = defaultWorker.Start()
if err != nil {
logger.Fatal("Unable to start default worker", zap.Error(err))
workersString := getEnvOrDefaultString(logger, "RUN_WORKERS", "bench,basic,basic-act")
workers := strings.Split(workersString, ",")

for _, workerName := range workers {
var worker worker.Worker
switch workerName {
case "bench":
worker = constructBenchWorker(context.Background(), serviceClient, logger, "temporal-bench")
case "basic":
worker = constructBasicWorker(context.Background(), serviceClient, logger, "temporal-basic")
case "basic-act":
worker = constructBasicActWorker(context.Background(), serviceClient, logger, "temporal-basic-act")
default:
panic(fmt.Sprintf("unknown worker %q", worker))
}
err = worker.Start()
if err != nil {
logger.Fatal("Unable to start worker " + workerName, zap.Error(err))
}
}
}

func constructWorker(ctx context.Context, serviceClient client.Client, logger *zap.Logger, taskQueue string) worker.Worker {
func constructBenchWorker(ctx context.Context, serviceClient client.Client, logger *zap.Logger, taskQueue string) worker.Worker {
w := worker.New(serviceClient, taskQueue, buildWorkerOptions(ctx, logger))
w.RegisterWorkflowWithOptions(bench.Workflow, workflow.RegisterOptions{Name: "bench-workflow"})
w.RegisterActivityWithOptions(bench.NewActivities(serviceClient), activity.RegisterOptions{Name: "bench-"})
return w
}

func constructBasicWorker(ctx context.Context, serviceClient client.Client, logger *zap.Logger, taskQueue string) worker.Worker {
w := worker.New(serviceClient, taskQueue, buildWorkerOptions(ctx, logger))
w.RegisterWorkflowWithOptions(basic.Workflow, workflow.RegisterOptions{Name: "basic-workflow"})
w.RegisterActivityWithOptions(basic.Activity, activity.RegisterOptions{Name: "basic-activity"})
return w
}

w.RegisterWorkflowWithOptions(bench.Workflow, workflow.RegisterOptions{Name: "bench-workflow"})
w.RegisterActivityWithOptions(bench.NewActivities(serviceClient), activity.RegisterOptions{Name: "bench-"})
func constructBasicActWorker(ctx context.Context, serviceClient client.Client, logger *zap.Logger, taskQueue string) worker.Worker {
w := worker.New(serviceClient, taskQueue, buildWorkerOptions(ctx, logger))
w.RegisterActivityWithOptions(basic.Activity, activity.RegisterOptions{Name: "basic-activity"})
return w
}

Expand Down
4 changes: 2 additions & 2 deletions worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.18.0
github.com/stretchr/testify v1.7.0
go.temporal.io/api v1.4.0
go.temporal.io/sdk v1.5.0
go.temporal.io/api v1.4.1-0.20210420220407-6f00f7f98373
go.temporal.io/sdk v1.7.0
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
Expand Down
Loading

0 comments on commit 17ae311

Please sign in to comment.