Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests #472

Merged
merged 6 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/temporalio/cli

go 1.21

replace go.temporal.io/api => go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc

require (
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.16.0
Expand All @@ -13,8 +15,8 @@ require (
github.com/stretchr/testify v1.8.4
github.com/temporalio/ui-server/v2 v2.23.1-0.20240212143757-2e990617b977
go.temporal.io/api v1.26.2-0.20231129165614-630d88440548
go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c
go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37
go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb
go.temporal.io/server v1.23.0-rc9
google.golang.org/grpc v1.61.0
google.golang.org/protobuf v1.32.0
gopkg.in/yaml.v3 v3.0.1
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,12 @@ go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8
go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.temporal.io/api v1.26.2-0.20231129165614-630d88440548 h1:9CV7l/WG7Nd4Ai8mSIUlCvf0W+Mdra5ei38I860Esac=
go.temporal.io/api v1.26.2-0.20231129165614-630d88440548/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4=
go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c h1:264a53/ghHOP1Z31yBvHeuXQ6eouE9HyIAjI96BWebI=
go.temporal.io/sdk v1.25.2-0.20231204212658-5fdbecc56c8c/go.mod h1:Qyy1qrOoMcMk7C+AW/LrqiDm+C0CRGGbVIPUAMCGJtQ=
go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37 h1:8Kanu4PKa4nHaPtC8uSARUn+RvFDt382myH0sl2M5dw=
go.temporal.io/server v1.23.0-rc2.0.20231212000105-51ea367f9f37/go.mod h1:mdtzSG++dTsWrJ8EmY7XTc4qhcCkrosHpo3ANySFyt8=
go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc h1:NOtmXBUd0heryLaBV7MJO67lfOHc/PD74zRwWAQ+AKE=
go.temporal.io/api v1.26.1-0.20240123194300-5b253c84a3cc/go.mod h1:ccaGRzYOx2iUq2e7zx0BEm0t2V3rmcliFalMCixn1/4=
go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb h1:rceOI1pCc9DtY3ASPZbl9XTHxbvHbYvZuGwGdZtYBLw=
go.temporal.io/sdk v1.25.2-0.20240108174654-c1744ee2c8cb/go.mod h1:nRT6pheoo7UXrrgMh26r7t4IuJFZmu277SkaiVp/tZE=
go.temporal.io/server v1.23.0-rc9 h1:XmwWMW57BL/WVH1XhquRwvkNsCUQyue0WkU8PiB9tCI=
go.temporal.io/server v1.23.0-rc9/go.mod h1:orkm00raKKZt7B0u1Z37DldLp8K9WjiImGL20fzR4Js=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
6 changes: 3 additions & 3 deletions temporalcli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ func (c *CommandContext) MarshalFriendlyFailureBodyText(f *failure.Failure, inde
// Takes payload shorthand into account, can use
// MarshalProtoJSONNoPayloadShorthand if needed
func (c *CommandContext) MarshalProtoJSON(m proto.Message) ([]byte, error) {
return MarshalProtoJSONWithOptions(m, c.JSONShorthandPayloads)
return c.MarshalProtoJSONWithOptions(m, c.JSONShorthandPayloads)
}

func MarshalProtoJSONWithOptions(m proto.Message, jsonShorthandPayloads bool) ([]byte, error) {
opts := temporalproto.CustomJSONMarshalOptions{Indent: " "}
func (c *CommandContext) MarshalProtoJSONWithOptions(m proto.Message, jsonShorthandPayloads bool) ([]byte, error) {
opts := temporalproto.CustomJSONMarshalOptions{Indent: c.Printer.JSONIndent}
if jsonShorthandPayloads {
opts.Metadata = map[string]any{common.EnablePayloadShorthandMetadataKey: true}
}
Expand Down
4 changes: 2 additions & 2 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
workflowID: run.GetID(),
runID: run.GetRunID(),
includeDetails: c.EventDetails,
follow: true,
follow: true,
}
if err := iter.print(cctx.Printer); err != nil && cctx.Err() == nil {
return fmt.Errorf("displaying history failed: %w", err)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *TemporalWorkflowExecuteCommand) printJSONResult(
}
// Do proto serialization here that would never do shorthand (i.e.
// auto-lift JSON) payloads
if result.History, err = MarshalProtoJSONWithOptions(&histProto, false); err != nil {
if result.History, err = cctx.MarshalProtoJSONWithOptions(&histProto, false); err != nil {
return fmt.Errorf("failed marshaling history: %w", err)
}
}
Expand Down
214 changes: 205 additions & 9 deletions temporalcli/commands.workflow_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/google/uuid"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
Expand All @@ -23,17 +25,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

// TODO(cretz): To test:
// * Start failure
// * Execute failure on start
// * Execute workflow failure (including nested failures)
// * Execute workflow cancel
// * Execute workflow timeout
// * Execute workflow continue as new
// * Workflow with proto JSON input

func (s *SharedServerSuite) TestWorkflow_Start_SimpleSuccess() {
// Text
s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
Expand Down Expand Up @@ -189,6 +183,208 @@ func (s *SharedServerSuite) TestWorkflow_Execute_SimpleFailure() {
jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "message"))
}

func (s *SharedServerSuite) TestWorkflow_Execute_NestedFailure() {
// Text
s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
err := workflow.ExecuteActivity(ctx, DevActivity).Get(ctx, nil)
return nil, err
})
s.Worker.OnDevActivity(func(ctx context.Context, input any) (any, error) {
return nil, fmt.Errorf("intentional activity failure")
})
res := s.Execute(
"workflow", "execute",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--workflow-id", "my-id1",
)
s.ErrorContains(res.Err, "workflow failed")
out := res.Stdout.String()
// Confirm failure
s.ContainsOnSameLine(out, "Status", "FAILED")
s.Contains(out, "Failure")
s.ContainsOnSameLine(out, "Message", "intentional activity failure")

// JSON
res = s.Execute(
"workflow", "execute",
"-o", "json",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--workflow-id", "my-id2",
)
s.ErrorContains(res.Err, "workflow failed")
var jsonOut map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal("FAILED", jsonOut["status"])
s.Equal("activity error",
jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "message"))
s.Equal("intentional activity failure",
jsonPath(jsonOut, "closeEvent", "workflowExecutionFailedEventAttributes", "failure", "cause", "message"))
}

func (s *SharedServerSuite) TestWorkflow_Execute_Cancel() {
// Very bad™️ channel tricks
doCancelChan := make(chan struct{})
s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
doCancelChan <- struct{}{}
err := workflow.Await(ctx, func() bool {
return false
})
return nil, err
})

// Text
go func() {
<-doCancelChan
_ = s.Client.CancelWorkflow(s.Context, "my-id1", "")
}()
res := s.Execute(
"workflow", "execute",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--workflow-id", "my-id1",
)
s.ErrorContains(res.Err, "workflow failed")
out := res.Stdout.String()
s.ContainsOnSameLine(out, "Status", "CANCELED")

// JSON
go func() {
<-doCancelChan
_ = s.Client.CancelWorkflow(s.Context, "my-id2", "")
}()
res = s.Execute(
"workflow", "execute",
"-o", "json",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--workflow-id", "my-id2",
)
s.ErrorContains(res.Err, "workflow failed")
var jsonOut map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal("CANCELED", jsonOut["status"])
}

func (s *SharedServerSuite) TestWorkflow_Execute_Timeout() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
err := workflow.Await(ctx, func() bool {
return false
})
return nil, err
})

// Text
res := s.Execute(
"workflow", "execute",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--execution-timeout", "1ms",
"--workflow-id", "my-id1",
)
s.ErrorContains(res.Err, "workflow failed")
out := res.Stdout.String()
s.ContainsOnSameLine(out, "Status", "TIMEOUT")

// JSON
res = s.Execute(
"workflow", "execute",
"-o", "json",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--execution-timeout", "1ms",
"--workflow-id", "my-id2",
)
s.ErrorContains(res.Err, "workflow failed")
var jsonOut map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal("TIMEOUT", jsonOut["status"])
}

func (s *SharedServerSuite) TestWorkflow_Execute_ContinueAsNew() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, input any) (any, error) {
if input.(float64) < 2 {
return nil, workflow.NewContinueAsNewError(ctx, "DevWorkflow", input.(float64)+1)
}
return nil, nil
})

// Text
res := s.Execute(
"workflow", "execute",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"-i", "1",
"--workflow-id", "my-id1",
)
out := res.Stdout.String()
s.ContainsOnSameLine(out, "Status", "COMPLETED")
s.Contains(out, "WorkflowExecutionContinuedAsNew")
}

func (s *SharedServerSuite) TestWorkflow_Execute_ProtoJSON_Input() {
// Very meta, use a start workflow request proto as the input to the workflow.
startWorkflowReq := &workflowservice.StartWorkflowExecutionRequest{
// Just fill in a few different types of fields to make sure everything is [de]serialized
WorkflowId: "enchi-cat",
WorkflowRunTimeout: &durationpb.Duration{
Seconds: 1,
Nanos: 2,
},
Input: &common.Payloads{
Payloads: []*common.Payload{
{Data: []byte("meow")},
},
},
}
startWorkflowReqSerialized, err := protojson.Marshal(startWorkflowReq)
s.NoError(err)

s.Worker.Worker.RegisterWorkflowWithOptions(func(
ctx workflow.Context,
input *workflowservice.StartWorkflowExecutionRequest,
) (*workflowservice.StartWorkflowExecutionRequest, error) {
return input, nil
}, workflow.RegisterOptions{Name: "ProtoJSONWorkflow"})

// Text
res := s.Execute(
"workflow", "execute",
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "ProtoJSONWorkflow",
"--input-meta", "encoding=json/protobuf",
"-i", string(startWorkflowReqSerialized),
"--workflow-id", "my-id1",
)
out := res.Stdout.String()
s.ContainsOnSameLine(out, "Status", "COMPLETED")
s.Contains(out, "enchi")
}

func (s *SharedServerSuite) TestWorkflow_Failure_On_Start() {
// Use too-long of an ID to force a failure on start
veryLongID := string(bytes.Repeat([]byte("a"), 1024))
for _, cmd := range []string{"start", "execute"} {
res := s.Execute(
"workflow", cmd,
"--address", s.Address(),
"--task-queue", s.Worker.Options.TaskQueue,
"--type", "DevWorkflow",
"--workflow-id", veryLongID,
)
s.ErrorContains(res.Err, "failed starting workflow")
}
}

func (s *SharedServerSuite) TestWorkflow_Execute_ClientHeaders() {
// Capture headers
var lastHeadersClient metadata.MD
Expand Down
Loading
Loading