Skip to content

Commit

Permalink
Merge pull request #1 from grantfuhr/new-worker-with-options
Browse files Browse the repository at this point in the history
New worker with options
  • Loading branch information
grantfuhr authored Jun 6, 2022
2 parents 0f25777 + 59193f0 commit 57ba03f
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 0 deletions.
6 changes: 6 additions & 0 deletions internal/examples/helloworld/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
Expand All @@ -31,7 +32,12 @@ func PickGreeting(ctx context.Context) (string, error) {
return "Hello", nil
}

func TestIntercept(ctx context.Context) (string, error) {
return "Ok", nil
}

func RegisterWorkflowsAndActivities(r worker.Registry) {
r.RegisterWorkflow(Greet)
r.RegisterActivity(PickGreeting)
r.RegisterActivityWithOptions(TestIntercept, activity.RegisterOptions{Name: "TestIntercept"})
}
57 changes: 57 additions & 0 deletions internal/examples/helloworld/testinterceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package helloworld

import (
"time"

"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/workflow"
)

var _ interceptor.Interceptor = &Interceptor{}

type Interceptor struct {
interceptor.InterceptorBase
}

type WorkflowInterceptor struct {
interceptor.WorkflowInboundInterceptorBase
}

func NewTestInterceptor() *Interceptor {
return &Interceptor{}
}

func (i *Interceptor) InterceptClient(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor {
return i.InterceptorBase.InterceptClient(next)
}

func (i *Interceptor) InterceptWorkflow(ctx workflow.Context, next interceptor.WorkflowInboundInterceptor) interceptor.WorkflowInboundInterceptor {
return &WorkflowInterceptor{
WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{
Next: next,
},
}
}

func (i *WorkflowInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error {
return i.Next.Init(outbound)
}

func (i *WorkflowInterceptor) ExecuteWorkflow(ctx workflow.Context, in *interceptor.ExecuteWorkflowInput) (interface{}, error) {
version := workflow.GetVersion(ctx, "version", workflow.DefaultVersion, 1)
var err error

if version != workflow.DefaultVersion {
var vpt string
err = workflow.ExecuteLocalActivity(
workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ScheduleToCloseTimeout: time.Second}),
"TestIntercept",
).Get(ctx, &vpt)

if err != nil {
return nil, err
}
}

return i.Next.ExecuteWorkflow(ctx, in)
}
33 changes: 33 additions & 0 deletions temporaltest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (ts *TestServer) fatal(err error) {
}

// Worker registers and starts a Temporal worker on the specified task queue.
// WorkflowPanicPolicy is set to worker.FailWorkflow
func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker.Registry)) worker.Worker {
w := worker.New(ts.Client(), taskQueue, worker.Options{
WorkflowPanicPolicy: worker.FailWorkflow,
Expand All @@ -51,6 +52,38 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker
return w
}

// NewWorkerWithOptions returns a Temporal worker on the specified task queue.
// WorkflowPanicPolicy is set to worker.FailWorkflow
func (ts *TestServer) NewWorkerWithOptions(taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker {
opts.WorkflowPanicPolicy = worker.FailWorkflow

w := worker.New(ts.Client(), taskQueue, opts)
registerFunc(w)
ts.workers = append(ts.workers, w)

if err := w.Start(); err != nil {
ts.fatal(err)
}

return w
}

// NewWorkerWithClient returns a Temporal worker on the specified task queue.
// WorkflowPanicPolicy is set to worker.FailWorkflow
func (ts *TestServer) NewWorkerWithClient(client client.Client, taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker {
opts.WorkflowPanicPolicy = worker.FailWorkflow

w := worker.New(client, taskQueue, opts)
registerFunc(w)
ts.workers = append(ts.workers, w)

if err := w.Start(); err != nil {
ts.fatal(err)
}

return w
}

// Client returns a Temporal client configured for making requests to the server.
// It is configured to use a pre-registered test namespace and will
// be closed on TestServer.Stop.
Expand Down
80 changes: 80 additions & 0 deletions temporaltest/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,86 @@ func TestNewServer(t *testing.T) {
}
}

func TestNewWorkerWithOptions(t *testing.T) {
ts := temporaltest.NewServer(temporaltest.WithT(t))

ts.NewWorkerWithOptions(
"hello_world",
func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
},
worker.Options{
MaxConcurrentActivityExecutionSize: 1,
MaxConcurrentLocalActivityExecutionSize: 1,
},
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

wfr, err := ts.Client().ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{TaskQueue: "hello_world"},
helloworld.Greet,
"world",
)
if err != nil {
t.Fatal(err)
}

var result string
if err := wfr.Get(ctx, &result); err != nil {
t.Fatal(err)
}

if result != "Hello world" {
t.Fatalf("unexpected result: %q", result)
}

}

// Tests creating a worker with a custom client. Embeds an interceptor in the worker.
func TestNewWorkerWithClient(t *testing.T) {
ts := temporaltest.NewServer(temporaltest.WithT(t))
var opts client.Options
opts.Interceptors = append(opts.Interceptors, helloworld.NewTestInterceptor())
c := ts.NewClientWithOptions(opts)

ts.NewWorkerWithClient(
c,
"hello_world",
func(registry worker.Registry) {
helloworld.RegisterWorkflowsAndActivities(registry)
},
worker.Options{
MaxConcurrentActivityExecutionSize: 1,
MaxConcurrentLocalActivityExecutionSize: 1,
},
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

wfr, err := ts.Client().ExecuteWorkflow(
ctx,
client.StartWorkflowOptions{TaskQueue: "hello_world"},
helloworld.Greet,
"world",
)
if err != nil {
t.Fatal(err)
}

var result string
if err := wfr.Get(ctx, &result); err != nil {
t.Fatal(err)
}

if result != "Hello world" {
t.Fatalf("unexpected result: %q", result)
}
}

func BenchmarkRunWorkflow(b *testing.B) {
ts := temporaltest.NewServer()
defer ts.Stop()
Expand Down

0 comments on commit 57ba03f

Please sign in to comment.