From eb275a1ed090702b6b2bbadd6838c95f2bae2496 Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 14:23:32 -0400 Subject: [PATCH 1/6] Added method to create new test worker with options Signed-off-by: grantfuhr --- temporaltest/server.go | 15 +++++++++++++++ temporaltest/server_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/temporaltest/server.go b/temporaltest/server.go index b7f67512..76dfd15f 100644 --- a/temporaltest/server.go +++ b/temporaltest/server.go @@ -51,6 +51,21 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker return w } +// NewWorkerWithOptions returns a Temporal wroker on the specified task queue. +// WorkflowPanicPolicy is set to worker.FailWorkflow +func (ts *TestServer) NewWorkerWithOptions(taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker { + opts.WorkflowPanicPolicy = worker.FailWorkflow + w := worker.New(ts.Client(), taskQueue, opts) + registerFunc(w) + ts.workers = append(ts.workers, w) + + if err := w.Start(); err != nil { + ts.fatal(err) + } + + return w +} + // Client returns a Temporal client configured for making requests to the server. // It is configured to use a pre-registered test namespace and will // be closed on TestServer.Stop. diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index 7df05bd3..05bd750b 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -82,6 +82,43 @@ func TestNewServer(t *testing.T) { } } +func TestWorkerWithOptions(t *testing.T) { + ts := temporaltest.NewServer(temporaltest.WithT(t)) + + ts.NewWorkerWithOptions( + "hello_world", + func(registry worker.Registry) { + helloworld.RegisterWorkflowsAndActivities(registry) + }, + worker.Options{ + MaxConcurrentActivityExecutionSize: 1, + MaxConcurrentLocalActivityExecutionSize: 1, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wfr, err := ts.Client().ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{TaskQueue: "hello_world"}, + helloworld.Greet, + "world", + ) + if err != nil { + t.Fatal(err) + } + + var result string + if err := wfr.Get(ctx, &result); err != nil { + t.Fatal(err) + } + + if result != "Hello world" { + t.Fatalf("unexpected result: %q", result) + } + +} + func BenchmarkRunWorkflow(b *testing.B) { ts := temporaltest.NewServer() defer ts.Stop() From c68d48f722a399090d6f934c7dd1918154217219 Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 14:29:09 -0400 Subject: [PATCH 2/6] Formatting and a comment Signed-off-by: grantfuhr --- temporaltest/server.go | 2 ++ temporaltest/server_test.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/temporaltest/server.go b/temporaltest/server.go index 76dfd15f..e00d7568 100644 --- a/temporaltest/server.go +++ b/temporaltest/server.go @@ -37,6 +37,7 @@ func (ts *TestServer) fatal(err error) { } // Worker registers and starts a Temporal worker on the specified task queue. +// WorkflowPanicPolicy is set to worker.FailWorkflow func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker.Registry)) worker.Worker { w := worker.New(ts.Client(), taskQueue, worker.Options{ WorkflowPanicPolicy: worker.FailWorkflow, @@ -55,6 +56,7 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker // WorkflowPanicPolicy is set to worker.FailWorkflow func (ts *TestServer) NewWorkerWithOptions(taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker { opts.WorkflowPanicPolicy = worker.FailWorkflow + w := worker.New(ts.Client(), taskQueue, opts) registerFunc(w) ts.workers = append(ts.workers, w) diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index 05bd750b..878188c4 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -93,7 +93,8 @@ func TestWorkerWithOptions(t *testing.T) { worker.Options{ MaxConcurrentActivityExecutionSize: 1, MaxConcurrentLocalActivityExecutionSize: 1, - }) + }, + ) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() From be0f44151d1eb234a03145d7609ebb4b7cf907b8 Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 15:50:56 -0400 Subject: [PATCH 3/6] Added interceptor, replicated local activity replay Signed-off-by: grantfuhr --- internal/examples/helloworld/helloworld.go | 6 + .../examples/helloworld/testinterceptor.go | 57 +++++++++ temporaltest/server.go | 18 ++- temporaltest/server_test.go | 108 +++++++++++++++++- 4 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 internal/examples/helloworld/testinterceptor.go diff --git a/internal/examples/helloworld/helloworld.go b/internal/examples/helloworld/helloworld.go index a5aa55f2..51c94723 100644 --- a/internal/examples/helloworld/helloworld.go +++ b/internal/examples/helloworld/helloworld.go @@ -9,6 +9,7 @@ import ( "fmt" "time" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -31,7 +32,12 @@ func PickGreeting(ctx context.Context) (string, error) { return "Hello", nil } +func TestIntercept(ctx context.Context) (string, error) { + return "Ok", nil +} + func RegisterWorkflowsAndActivities(r worker.Registry) { r.RegisterWorkflow(Greet) r.RegisterActivity(PickGreeting) + r.RegisterActivityWithOptions(TestIntercept, activity.RegisterOptions{Name: "TestIntercept"}) } diff --git a/internal/examples/helloworld/testinterceptor.go b/internal/examples/helloworld/testinterceptor.go new file mode 100644 index 00000000..29147ca7 --- /dev/null +++ b/internal/examples/helloworld/testinterceptor.go @@ -0,0 +1,57 @@ +package helloworld + +import ( + "time" + + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +var _ interceptor.Interceptor = &Interceptor{} + +type Interceptor struct { + interceptor.InterceptorBase +} + +type WorkflowInterceptor struct { + interceptor.WorkflowInboundInterceptorBase +} + +func NewTestInterceptor() *Interceptor { + return &Interceptor{} +} + +func (i *Interceptor) InterceptClient(next interceptor.ClientOutboundInterceptor) interceptor.ClientOutboundInterceptor { + return i.InterceptorBase.InterceptClient(next) +} + +func (i *Interceptor) InterceptWorkflow(ctx workflow.Context, next interceptor.WorkflowInboundInterceptor) interceptor.WorkflowInboundInterceptor { + return &WorkflowInterceptor{ + WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{ + Next: next, + }, + } +} + +func (i *WorkflowInterceptor) Init(outbound interceptor.WorkflowOutboundInterceptor) error { + return i.Next.Init(outbound) +} + +func (i *WorkflowInterceptor) ExecuteWorkflow(ctx workflow.Context, in *interceptor.ExecuteWorkflowInput) (interface{}, error) { + version := workflow.GetVersion(ctx, "version", workflow.DefaultVersion, 1) + var err error + + if version != workflow.DefaultVersion { + var vpt string + err = workflow.ExecuteLocalActivity( + workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ScheduleToCloseTimeout: time.Second}), + "TestIntercept", + ).Get(ctx, &vpt) + + if err != nil { + return nil, err + } + } + + return i.Next.ExecuteWorkflow(ctx, in) +} diff --git a/temporaltest/server.go b/temporaltest/server.go index e00d7568..104f4668 100644 --- a/temporaltest/server.go +++ b/temporaltest/server.go @@ -52,7 +52,7 @@ func (ts *TestServer) Worker(taskQueue string, registerFunc func(registry worker return w } -// NewWorkerWithOptions returns a Temporal wroker on the specified task queue. +// NewWorkerWithOptions returns a Temporal worker on the specified task queue. // WorkflowPanicPolicy is set to worker.FailWorkflow func (ts *TestServer) NewWorkerWithOptions(taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker { opts.WorkflowPanicPolicy = worker.FailWorkflow @@ -68,6 +68,22 @@ func (ts *TestServer) NewWorkerWithOptions(taskQueue string, registerFunc func(r return w } +// NewWorkerWithClient returns a Temporal worker on the specified task queue. +// WorkflowPanicPolicy is set to worker.FailWorkflow +func (ts *TestServer) NewWorkerWithClient(client client.Client, taskQueue string, registerFunc func(registry worker.Registry), opts worker.Options) worker.Worker { + opts.WorkflowPanicPolicy = worker.FailWorkflow + + w := worker.New(client, taskQueue, opts) + registerFunc(w) + ts.workers = append(ts.workers, w) + + if err := w.Start(); err != nil { + ts.fatal(err) + } + + return w +} + // Client returns a Temporal client configured for making requests to the server. // It is configured to use a pre-registered test namespace and will // be closed on TestServer.Stop. diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index 878188c4..ecefb76f 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -10,8 +10,11 @@ import ( "testing" "time" + enumspb "go.temporal.io/api/enums/v1" + historypb "go.temporal.io/api/history/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "github.com/DataDog/temporalite/internal/examples/helloworld" "github.com/DataDog/temporalite/temporaltest" @@ -82,7 +85,7 @@ func TestNewServer(t *testing.T) { } } -func TestWorkerWithOptions(t *testing.T) { +func TestNewWorkerWithOptions(t *testing.T) { ts := temporaltest.NewServer(temporaltest.WithT(t)) ts.NewWorkerWithOptions( @@ -120,6 +123,109 @@ func TestWorkerWithOptions(t *testing.T) { } +// Tests creating a worker with a custom client. Embeds an interceptor in the worker. +func TestNewWorkerWithClient(t *testing.T) { + ts := temporaltest.NewServer(temporaltest.WithT(t)) + var opts client.Options + opts.Interceptors = append(opts.Interceptors, helloworld.NewTestInterceptor()) + c := ts.NewClientWithOptions(opts) + + ts.NewWorkerWithClient( + c, + "hello_world", + func(registry worker.Registry) { + helloworld.RegisterWorkflowsAndActivities(registry) + }, + worker.Options{ + MaxConcurrentActivityExecutionSize: 1, + MaxConcurrentLocalActivityExecutionSize: 1, + }, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wfr, err := ts.Client().ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{TaskQueue: "hello_world"}, + helloworld.Greet, + "world", + ) + if err != nil { + t.Fatal(err) + } + + var result string + if err := wfr.Get(ctx, &result); err != nil { + t.Fatal(err) + } + + if result != "Hello world" { + t.Fatalf("unexpected result: %q", result) + } +} + +func TestReplayInterceptor(t *testing.T) { + ts := temporaltest.NewServer(temporaltest.WithT(t)) + var opts client.Options + + opts.Interceptors = append(opts.Interceptors, helloworld.NewTestInterceptor()) + c := ts.NewClientWithOptions(opts) + + ts.NewWorkerWithClient( + c, + "hello_world", + func(registry worker.Registry) { + helloworld.RegisterWorkflowsAndActivities(registry) + }, + worker.Options{ + MaxConcurrentActivityExecutionSize: 1, + MaxConcurrentLocalActivityExecutionSize: 1, + }, + ) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wfr, err := ts.Client().ExecuteWorkflow( + ctx, + client.StartWorkflowOptions{TaskQueue: "hello_world"}, + helloworld.Greet, + "world", + ) + if err != nil { + t.Fatal(err) + } + + var result string + if err := wfr.Get(ctx, &result); err != nil { + t.Fatal(err) + } + wid := wfr.GetID() + rid := wfr.GetRunID() + + // Replay workflow on the same task queue + var history historypb.History + iter := c.GetWorkflowHistory(ctx, wid, rid, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + + for iter.HasNext() { + e, err := iter.Next() + if err != nil { + t.Fatal(err) + } + history.Events = append(history.Events, e) + fmt.Println(e) + } + + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflowWithOptions(helloworld.Greet, workflow.RegisterOptions{Name: "Greet"}) + err = replayer.ReplayWorkflowHistory(nil, &history) + fmt.Println(err) + if err != nil { + t.Fatal(err) + } +} + func BenchmarkRunWorkflow(b *testing.B) { ts := temporaltest.NewServer() defer ts.Stop() From 181e25a9f357f26e6d000e740181a500c8c5c1c9 Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 16:00:46 -0400 Subject: [PATCH 4/6] remove log Signed-off-by: grantfuhr --- temporaltest/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index ecefb76f..1eaded27 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -165,6 +165,7 @@ func TestNewWorkerWithClient(t *testing.T) { } } +// Demonstrates potential issue with replay when executing local activities in an interceptor func TestReplayInterceptor(t *testing.T) { ts := temporaltest.NewServer(temporaltest.WithT(t)) var opts client.Options @@ -214,7 +215,6 @@ func TestReplayInterceptor(t *testing.T) { t.Fatal(err) } history.Events = append(history.Events, e) - fmt.Println(e) } replayer := worker.NewWorkflowReplayer() From 1e6cc2a27984b9857a68152b4aea65bbf8699fff Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 16:01:48 -0400 Subject: [PATCH 5/6] remove log Signed-off-by: grantfuhr --- temporaltest/server_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index 1eaded27..7724fdb5 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -220,7 +220,6 @@ func TestReplayInterceptor(t *testing.T) { replayer := worker.NewWorkflowReplayer() replayer.RegisterWorkflowWithOptions(helloworld.Greet, workflow.RegisterOptions{Name: "Greet"}) err = replayer.ReplayWorkflowHistory(nil, &history) - fmt.Println(err) if err != nil { t.Fatal(err) } From 59193f0c5a06caeaa7ea0ffd89d8e6e5a080515a Mon Sep 17 00:00:00 2001 From: grantfuhr Date: Mon, 6 Jun 2022 16:06:38 -0400 Subject: [PATCH 6/6] removed broken test Signed-off-by: grantfuhr --- temporaltest/server_test.go | 63 ------------------------------------- 1 file changed, 63 deletions(-) diff --git a/temporaltest/server_test.go b/temporaltest/server_test.go index 7724fdb5..ea826b64 100644 --- a/temporaltest/server_test.go +++ b/temporaltest/server_test.go @@ -10,11 +10,8 @@ import ( "testing" "time" - enumspb "go.temporal.io/api/enums/v1" - historypb "go.temporal.io/api/history/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" "github.com/DataDog/temporalite/internal/examples/helloworld" "github.com/DataDog/temporalite/temporaltest" @@ -165,66 +162,6 @@ func TestNewWorkerWithClient(t *testing.T) { } } -// Demonstrates potential issue with replay when executing local activities in an interceptor -func TestReplayInterceptor(t *testing.T) { - ts := temporaltest.NewServer(temporaltest.WithT(t)) - var opts client.Options - - opts.Interceptors = append(opts.Interceptors, helloworld.NewTestInterceptor()) - c := ts.NewClientWithOptions(opts) - - ts.NewWorkerWithClient( - c, - "hello_world", - func(registry worker.Registry) { - helloworld.RegisterWorkflowsAndActivities(registry) - }, - worker.Options{ - MaxConcurrentActivityExecutionSize: 1, - MaxConcurrentLocalActivityExecutionSize: 1, - }, - ) - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - wfr, err := ts.Client().ExecuteWorkflow( - ctx, - client.StartWorkflowOptions{TaskQueue: "hello_world"}, - helloworld.Greet, - "world", - ) - if err != nil { - t.Fatal(err) - } - - var result string - if err := wfr.Get(ctx, &result); err != nil { - t.Fatal(err) - } - wid := wfr.GetID() - rid := wfr.GetRunID() - - // Replay workflow on the same task queue - var history historypb.History - iter := c.GetWorkflowHistory(ctx, wid, rid, false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - - for iter.HasNext() { - e, err := iter.Next() - if err != nil { - t.Fatal(err) - } - history.Events = append(history.Events, e) - } - - replayer := worker.NewWorkflowReplayer() - replayer.RegisterWorkflowWithOptions(helloworld.Greet, workflow.RegisterOptions{Name: "Greet"}) - err = replayer.ReplayWorkflowHistory(nil, &history) - if err != nil { - t.Fatal(err) - } -} - func BenchmarkRunWorkflow(b *testing.B) { ts := temporaltest.NewServer() defer ts.Stop()