From d8e3cf8803ae66298ded50f5fff8d31efa61c31b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 20 Feb 2024 16:29:01 -0800 Subject: [PATCH 1/6] Add workflow exec tests (json input failing) --- temporalcli/commands.workflow_exec_test.go | 213 ++++++++++++++++++++- 1 file changed, 204 insertions(+), 9 deletions(-) diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index 5fc40a7f..d5ae67c8 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "google.golang.org/protobuf/encoding/protojson" "net/http/httptest" "os" "strconv" @@ -23,17 +24,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" ) -// TODO(cretz): To test: -// * Start failure -// * Execute failure on start -// * Execute workflow failure (including nested failures) -// * Execute workflow cancel -// * Execute workflow timeout -// * Execute workflow continue as new -// * Workflow with proto JSON input - func (s *SharedServerSuite) TestWorkflow_Start_SimpleSuccess() { // Text s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { @@ -189,6 +182,208 @@ func (s *SharedServerSuite) TestWorkflow_Execute_SimpleFailure() { jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "message")) } +func (s *SharedServerSuite) TestWorkflow_Execute_NestedFailure() { + // Text + s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + err := workflow.ExecuteActivity(ctx, DevActivity).Get(ctx, nil) + return nil, err + }) + s.Worker.OnDevActivity(func(ctx context.Context, input any) (any, error) { + return nil, fmt.Errorf("intentional activity failure") + }) + res := s.Execute( + "workflow", "execute", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", "my-id1", + ) + s.ErrorContains(res.Err, "workflow failed") + out := res.Stdout.String() + // Confirm failure + s.ContainsOnSameLine(out, "Status", "FAILED") + s.Contains(out, "Failure") + s.ContainsOnSameLine(out, "Message", "intentional activity failure") + + // JSON + res = s.Execute( + "workflow", "execute", + "-o", "json", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", "my-id2", + ) + s.ErrorContains(res.Err, "workflow failed") + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("FAILED", jsonOut["status"]) + s.Equal("activity error", + jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "message")) + s.Equal("intentional activity failure", + jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "cause", "message")) +} + +func (s *SharedServerSuite) TestWorkflow_Execute_Cancel() { + // Very bad™️ channel tricks + doCancelChan := make(chan struct{}) + s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + doCancelChan <- struct{}{} + err := workflow.Await(ctx, func() bool { + return false + }) + return nil, err + }) + + // Text + go func() { + <-doCancelChan + _ = s.Client.CancelWorkflow(s.Context, "my-id1", "") + }() + res := s.Execute( + "workflow", "execute", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", "my-id1", + ) + s.ErrorContains(res.Err, "workflow failed") + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Status", "CANCELED") + + // JSON + go func() { + <-doCancelChan + _ = s.Client.CancelWorkflow(s.Context, "my-id2", "") + }() + res = s.Execute( + "workflow", "execute", + "-o", "json", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", "my-id2", + ) + s.ErrorContains(res.Err, "workflow failed") + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("CANCELED", jsonOut["status"]) +} + +func (s *SharedServerSuite) TestWorkflow_Execute_Timeout() { + s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + err := workflow.Await(ctx, func() bool { + return false + }) + return nil, err + }) + + // Text + res := s.Execute( + "workflow", "execute", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--execution-timeout", "1ms", + "--workflow-id", "my-id1", + ) + s.ErrorContains(res.Err, "workflow failed") + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Status", "TIMEOUT") + + // JSON + res = s.Execute( + "workflow", "execute", + "-o", "json", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--execution-timeout", "1ms", + "--workflow-id", "my-id2", + ) + s.ErrorContains(res.Err, "workflow failed") + var jsonOut map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.Equal("TIMEOUT", jsonOut["status"]) +} + +func (s *SharedServerSuite) TestWorkflow_Execute_ContinueAsNew() { + s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) { + if input.(float64) < 2 { + return nil, workflow.NewContinueAsNewError(ctx, "DevWorkflow", input.(float64)+1) + } + return nil, nil + }) + + // Text + res := s.Execute( + "workflow", "execute", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "-i", "1", + "--workflow-id", "my-id1", + ) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Status", "COMPLETED") + s.Contains(out, "WorkflowExecutionContinuedAsNew") +} + +func (s *SharedServerSuite) TestWorkflow_Execute_ProtoJSON_Input() { + // Very meta, use a start workflow request proto as the input to the workflow. + startWorkflowReq := &workflowservice.StartWorkflowExecutionRequest{ + // Just fill in a few different types of fields to make sure everything is [de]serialized + WorkflowId: "enchi-cat", + WorkflowRunTimeout: &durationpb.Duration{ + Seconds: 1, + Nanos: 2, + }, + Input: &common.Payloads{ + Payloads: []*common.Payload{ + {Data: []byte("meow")}, + }, + }, + } + startWorkflowReqSerialized, err := protojson.Marshal(startWorkflowReq) + s.NoError(err) + + s.Worker.Worker.RegisterWorkflowWithOptions(func( + ctx workflow.Context, + input *workflowservice.StartWorkflowExecutionRequest, + ) (*workflowservice.StartWorkflowExecutionRequest, error) { + return input, nil + }, workflow.RegisterOptions{Name: "ProtoJSONWorkflow"}) + + // Text + res := s.Execute( + "workflow", "execute", + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "ProtoJSONWorkflow", + "--input-meta", "encoding=json/protobuf", + "-i", string(startWorkflowReqSerialized), + "--workflow-id", "my-id1", + ) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Status", "COMPLETED") + s.Contains(out, "enchi") +} + +func (s *SharedServerSuite) TestWorkflow_Failure_On_Start() { + // Use too-long of an ID to force a failure on start + veryLongID := string(bytes.Repeat([]byte("a"), 1024)) + for _, cmd := range []string{"start", "execute"} { + res := s.Execute( + "workflow", cmd, + "--address", s.Address(), + "--task-queue", s.Worker.Options.TaskQueue, + "--type", "DevWorkflow", + "--workflow-id", veryLongID, + ) + s.ErrorContains(res.Err, "failed starting workflow") + } +} + func (s *SharedServerSuite) TestWorkflow_Execute_ClientHeaders() { // Capture headers var lastHeadersClient metadata.MD From 73fa443966cf41acccbc13c31d3c44d0042f51b5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 20 Feb 2024 16:57:10 -0800 Subject: [PATCH 2/6] Defer handling returned result correctly for now --- temporalcli/commands.workflow_exec_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index d5ae67c8..1f9f8a6b 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -366,7 +366,9 @@ func (s *SharedServerSuite) TestWorkflow_Execute_ProtoJSON_Input() { ) out := res.Stdout.String() s.ContainsOnSameLine(out, "Status", "COMPLETED") - s.Contains(out, "enchi") + // TODO: Currently the workflow result fails to get stringified properly. Looks to be some issue + // in the protojson marshaller from the api-go repo not understanding `json/protobuf` encoding + //s.Contains(out, "enchi") } func (s *SharedServerSuite) TestWorkflow_Failure_On_Start() { From 7f9bc365d50f8ed0cc39ea67455d7059f52e437a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 20 Feb 2024 17:09:33 -0800 Subject: [PATCH 3/6] Resolve remaining test TODOs --- temporalcli/commands.workflow_view_test.go | 84 ++++++++++++++++++++-- 1 file changed, 80 insertions(+), 4 deletions(-) diff --git a/temporalcli/commands.workflow_view_test.go b/temporalcli/commands.workflow_view_test.go index 108ee667..3bc6a5bf 100644 --- a/temporalcli/commands.workflow_view_test.go +++ b/temporalcli/commands.workflow_view_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "time" "github.com/temporalio/cli/temporalcli" @@ -12,10 +13,6 @@ import ( "go.temporal.io/sdk/workflow" ) -// TODO(cretz): To test: -// * Workflow list -// * Workflow describe with just auto-reset points - func (s *SharedServerSuite) TestWorkflow_Describe_ActivityFailing() { // Set activity to just continually error s.Worker.OnDevActivity(func(ctx context.Context, a any) (any, error) { @@ -107,6 +104,45 @@ func (s *SharedServerSuite) TestWorkflow_Describe_Completed() { s.Equal(map[string]any{"foo": "bar"}, jsonOut["result"]) } +func (s *SharedServerSuite) TestWorkflow_Describe_ResetPoints() { + // Start the workflow and wait until it has at least reached activity failure + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue}, + DevWorkflow, + map[string]string{"foo": "bar"}, + ) + s.NoError(err) + s.NoError(run.Get(s.Context, nil)) + + // Text + res := s.Execute( + "workflow", "describe", + "--address", s.Address(), + "-w", run.GetID(), + "--reset-points", + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.NotContains(out, "Status") + s.NotContains(out, "Result") + s.Contains(out, "Auto Reset Points") + + // JSON + res = s.Execute( + "workflow", "describe", + "-o", "json", + "--address", s.Address(), + "-w", run.GetID(), + "--reset-points", + ) + s.NoError(res.Err) + var jsonOut []map[string]any + s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut)) + s.NotNil(jsonOut[0]) + s.NotNil(jsonOut[0]["EventId"]) +} + func (s *SharedServerSuite) TestWorkflow_Show_Follow() { s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { sigs := 0 @@ -252,3 +288,43 @@ func (s *SharedServerSuite) TestWorkflow_Show_JSON() { s.Contains(out, `"signalName": "my-signal"`) s.NotContains(out, "Results:") } + +func (s *SharedServerSuite) TestWorkflow_List() { + s.Worker.OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { + return a, nil + }) + + // Start the workflow + for i := 0; i < 3; i++ { + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue}, + DevWorkflow, + strconv.Itoa(i), + ) + s.NoError(err) + s.NoError(run.Get(s.Context, nil)) + } + + res := s.Execute( + "workflow", "list", + "--address", s.Address(), + "--query", fmt.Sprintf(`TaskQueue="%s"`, s.Worker.Options.TaskQueue), + ) + s.NoError(res.Err) + out := res.Stdout.String() + s.ContainsOnSameLine(out, "Completed", "DevWorkflow") + + // JSON + res = s.Execute( + "workflow", "list", + "--address", s.Address(), + "--query", fmt.Sprintf(`TaskQueue="%s"`, s.Worker.Options.TaskQueue), + "-o", "json", + ) + s.NoError(res.Err) + // Output is currently a series of JSON objects + out = res.Stdout.String() + s.ContainsOnSameLine(out, "name", "DevWorkflow") + s.ContainsOnSameLine(out, "status", "WORKFLOW_EXECUTION_STATUS_COMPLETED") +} From 465846be031b27ed128fc4fa18b48b1623bdc4bf Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart Date: Wed, 21 Feb 2024 11:32:53 -0800 Subject: [PATCH 4/6] Pin api-go in order to resolve protojson.Marshal bug This was fixed by https://github.com/temporalio/temporal/pull/5396 but because there are incompatible changes between api-go, our sdk, and the temporal server itself we haven't pulled this change into the server. Those should be resolved soon but in the meantime we're going to pin the version of the API used. --- go.mod | 6 ++++-- go.sum | 12 ++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 5eae9b9d..0287c5d2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/temporalio/cli go 1.21 +replace go.temporal.io/api => go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc + require ( github.com/dustin/go-humanize v1.0.1 github.com/fatih/color v1.16.0 @@ -13,8 +15,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/temporalio/ui-server/v2 v2.23.1-0.20240212143757-2e990617b977 go.temporal.io/api v1.26.2-0.20231129165614-630d88440548 - go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c - go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37 + go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb + go.temporal.io/server v1.23.0-rc9 google.golang.org/grpc v1.61.0 google.golang.org/protobuf v1.32.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index d482c3f8..6991117b 100644 --- a/go.sum +++ b/go.sum @@ -394,12 +394,12 @@ go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8 go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= -go.temporal.io/api v1.26.2-0.20231129165614-630d88440548 h1:9CV7l/WG7Nd4Ai8mSIUlCvf0W+Mdra5ei38I860Esac= -go.temporal.io/api v1.26.2-0.20231129165614-630d88440548/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= -go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c h1:264a53/ghHOP1Z31yBvHeuXQ6eouE9HyIAjI96BWebI= -go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c/go.mod h1:Qyy1qrOoMcMk7C+AW/LrqiDm+C0CRGGbVIPUAMCGJtQ= -go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37 h1:8Kanu4PKa4nHaPtC8uSARUn+RvFDt382myH0sl2M5dw= -go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37/go.mod h1:mdtzSG++dTsWrJ8EmY7XTc4qhcCkrosHpo3ANySFyt8= +go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc h1:NOtmXBUd0heryLaBV7MJO67lfOHc/PD74zRwWAQ+AKE= +go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc/go.mod h1:ccaGRzYOx2iUq2e7zx0BEm0t2V3rmcliFalMCixn1/4= +go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb h1:rceOI1pCc9DtY3ASPZbl9XTHxbvHbYvZuGwGdZtYBLw= +go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb/go.mod h1:nRT6pheoo7UXrrgMh26r7t4IuJFZmu277SkaiVp/tZE= +go.temporal.io/server v1.23.0-rc9 h1:XmwWMW57BL/WVH1XhquRwvkNsCUQyue0WkU8PiB9tCI= +go.temporal.io/server v1.23.0-rc9/go.mod h1:orkm00raKKZt7B0u1Z37DldLp8K9WjiImGL20fzR4Js= go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig= go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= From 0c9963f0cb3d8a5a59997ceed04ed1929c674c4e Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart Date: Wed, 21 Feb 2024 11:34:13 -0800 Subject: [PATCH 5/6] Pass configured json indent into protojson marshalling code --- temporalcli/commands.go | 6 +++--- temporalcli/commands.workflow_exec.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/temporalcli/commands.go b/temporalcli/commands.go index c5291d7b..6f85347d 100644 --- a/temporalcli/commands.go +++ b/temporalcli/commands.go @@ -206,11 +206,11 @@ func (c *CommandContext) MarshalFriendlyFailureBodyText(f *failure.Failure, inde // Takes payload shorthand into account, can use // MarshalProtoJSONNoPayloadShorthand if needed func (c *CommandContext) MarshalProtoJSON(m proto.Message) ([]byte, error) { - return MarshalProtoJSONWithOptions(m, c.JSONShorthandPayloads) + return c.MarshalProtoJSONWithOptions(m, c.JSONShorthandPayloads) } -func MarshalProtoJSONWithOptions(m proto.Message, jsonShorthandPayloads bool) ([]byte, error) { - opts := temporalproto.CustomJSONMarshalOptions{Indent: " "} +func (c *CommandContext) MarshalProtoJSONWithOptions(m proto.Message, jsonShorthandPayloads bool) ([]byte, error) { + opts := temporalproto.CustomJSONMarshalOptions{Indent: c.Printer.JSONIndent} if jsonShorthandPayloads { opts.Metadata = map[string]any{common.EnablePayloadShorthandMetadataKey: true} } diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index 0c97ba2a..9991bd11 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -52,7 +52,7 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string workflowID: run.GetID(), runID: run.GetRunID(), includeDetails: c.EventDetails, - follow: true, + follow: true, } if err := iter.print(cctx.Printer); err != nil && cctx.Err() == nil { return fmt.Errorf("displaying history failed: %w", err) @@ -152,7 +152,7 @@ func (c *TemporalWorkflowExecuteCommand) printJSONResult( } // Do proto serialization here that would never do shorthand (i.e. // auto-lift JSON) payloads - if result.History, err = MarshalProtoJSONWithOptions(&histProto, false); err != nil { + if result.History, err = cctx.MarshalProtoJSONWithOptions(&histProto, false); err != nil { return fmt.Errorf("failed marshaling history: %w", err) } } From d4a775fef6d57e3b9f2efc095ddda942b98616b5 Mon Sep 17 00:00:00 2001 From: Tim Deeb-Swihart Date: Wed, 21 Feb 2024 11:34:50 -0800 Subject: [PATCH 6/6] Re-enable disabled test --- temporalcli/commands.workflow_exec_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index 1f9f8a6b..f76301d1 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -6,13 +6,14 @@ import ( "encoding/base64" "encoding/json" "fmt" - "google.golang.org/protobuf/encoding/protojson" "net/http/httptest" "os" "strconv" "sync" "time" + "google.golang.org/protobuf/encoding/protojson" + "github.com/google/uuid" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" @@ -366,9 +367,7 @@ func (s *SharedServerSuite) TestWorkflow_Execute_ProtoJSON_Input() { ) out := res.Stdout.String() s.ContainsOnSameLine(out, "Status", "COMPLETED") - // TODO: Currently the workflow result fails to get stringified properly. Looks to be some issue - // in the protojson marshaller from the api-go repo not understanding `json/protobuf` encoding - //s.Contains(out, "enchi") + s.Contains(out, "enchi") } func (s *SharedServerSuite) TestWorkflow_Failure_On_Start() {