Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI Refresh: workflow update command #462

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,11 @@ func NewTemporalWorkflowTraceCommand(cctx *CommandContext, parent *TemporalWorkf
type TemporalWorkflowUpdateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
PayloadInputOptions
Name string
WorkflowId string
RunId string
FirstExecutionRunId string
}

func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateCommand {
Expand All @@ -958,8 +963,19 @@ func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWork
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "update [flags]"
s.Command.Short = "Updates a running workflow synchronously."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow update\x1b[0m command is used to synchronously Update a \nWorkflowExecution by ID.\n\n\x1b[1mtemporal workflow update \\\n\t\t--workflow-id MyWorkflowId \\\n\t\t--name MyUpdate \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\x1b[0m\n\nUse the options listed below to change the command's behavior."
} else {
s.Command.Long = "The `temporal workflow update` command is used to synchronously Update a \nWorkflowExecution by ID.\n\n```\ntemporal workflow update \\\n\t\t--workflow-id MyWorkflowId \\\n\t\t--name MyUpdate \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\n```\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.Name, "name", "", "Update Name.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id. If unset, the currently running Workflow Execution receives the Update.")
s.Command.Flags().StringVar(&s.FirstExecutionRunId, "first-execution-run-id", "", "Send the Update to the last Workflow Execution in the chain that started with this Run Id.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
44 changes: 41 additions & 3 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/query/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"

"github.com/temporalio/cli/temporalcli/internal/printer"
)

func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -223,8 +224,45 @@ func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (*TemporalWorkflowUpdateCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get raw input
input, err := c.buildRawInput()
if err != nil {
return err
}

request := &client.UpdateWorkflowWithOptionsRequest{
WorkflowID: c.WorkflowId,
RunID: c.RunId,
UpdateName: c.Name,
FirstExecutionRunID: c.FirstExecutionRunId,
Args: input,
}

updateHandle, err := cl.UpdateWorkflowWithOptions(cctx, request)
if err != nil {
return fmt.Errorf("unable to update workflow: %w", err)
}

var valuePtr interface{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think elsewhere I did:

Suggested change
var valuePtr interface{}
var valuePtr json.RawMessage

Though I think it's basically the same thing here (it'll just be a map[string]any for an object which is turned right back to JSON) so no worries.

err = updateHandle.Get(cctx, &valuePtr)
if err != nil {
return fmt.Errorf("unable to update workflow: %w", err)
}

return cctx.Printer.PrintStructured(
struct {
Name string `json:"name"`
UpdateID string `json:"updateId"`
Result interface{} `json:"result"`
}{Name: c.Name, UpdateID: updateHandle.UpdateID(), Result: valuePtr},
printer.StructuredOptions{})
}

func defaultReason() string {
Expand Down
78 changes: 78 additions & 0 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package temporalcli_test

import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -304,6 +307,81 @@ func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() {
s.Error(workflow.ErrCanceled, run.Get(s.Context, nil))
}

func (s *SharedServerSuite) TestWorkflow_Update() {
updateName := "test-update"

s.Worker.OnDevWorkflow(func(ctx workflow.Context, val any) (any, error) {
// setup a simple workflow which receives non-negative floats in updates and adds them to a running counter
counter, ok := val.(float64)
if !ok {
return nil, fmt.Errorf("update workflow received non-float input")
}
err := workflow.SetUpdateHandlerWithOptions(
ctx,
updateName,
func(ctx workflow.Context, i float64) (float64, error) {
tmp := counter
counter += i
workflow.GetLogger(ctx).Info("counter updated", "added", i, "new-value", counter)
return tmp, nil
},
workflow.UpdateHandlerOptions{
Validator: func(ctx workflow.Context, i float64) error {
if i < 0 {
return fmt.Errorf("add value must be non-negative (%v)", i)
}
return nil
}},
)
if err != nil {
return 0, err
}

// wait on a signal to indicate the test is complete
if ok := workflow.GetSignalChannel(ctx, "updates-done").Receive(ctx, nil); !ok {
return 0, fmt.Errorf("signal channel was closed")
}
return counter, nil
})

// Start the workflow
input := rand.Intn(100)
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
input,
)
s.NoError(err)

// Stop the workflow when the test is complete
defer func() {
err := s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "updates-done", nil)
s.NoError(err)
}()

// successful update, should show the result
res := s.Execute("workflow", "update", "--address", s.Address(), "-w", run.GetID(), "--name", updateName, "-i", strconv.Itoa(input))
s.NoError(res.Err)
s.Contains(res.Stdout.String(), strconv.Itoa(input))

// successful update passing first-execution-run-id
res = s.Execute("workflow", "update", "--address", s.Address(), "-w", run.GetID(), "--name", updateName, "-i", strconv.Itoa(input), "--first-execution-run-id", run.GetRunID())
s.NoError(res.Err)

// update rejected, when name is not available
res = s.Execute("workflow", "update", "--address", s.Address(), "-w", run.GetID(), "-i", strconv.Itoa(input))
s.ErrorContains(res.Err, "required flag(s) \"name\" not set")

// update rejected, wrong workflowID
res = s.Execute("workflow", "update", "--address", s.Address(), "-w", "nonexistent-wf-id", "--name", updateName, "-i", strconv.Itoa(input))
s.ErrorContains(res.Err, "unable to update workflow")

// update rejected, wrong update name
res = s.Execute("workflow", "update", "--address", s.Address(), "-w", run.GetID(), "--name", "nonexistent-update-name", "-i", strconv.Itoa(input))
s.ErrorContains(res.Err, "unable to update workflow")
}

func (s *SharedServerSuite) TestWorkflow_Cancel_BatchWorkflowSuccess() {
res := s.testCancelBatchWorkflow(false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
Expand Down
8 changes: 6 additions & 2 deletions temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/temporalio/cli/temporalcli"
"github.com/temporalio/cli/temporalcli/devserver"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"github.com/temporalio/cli/temporalcli"
"github.com/temporalio/cli/temporalcli/devserver"

"google.golang.org/grpc"
)

Expand Down Expand Up @@ -271,6 +273,8 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
d.Options.DynamicConfigValues = map[string]any{}
}
d.Options.DynamicConfigValues["system.forceSearchAttributesCacheRefreshOnRead"] = true
d.Options.DynamicConfigValues["frontend.enableUpdateWorkflowExecution"] = true

d.Options.GRPCInterceptors = append(
d.Options.GRPCInterceptors,
func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
Expand Down
22 changes: 21 additions & 1 deletion temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,4 +497,24 @@ TODO

### temporal workflow update: Updates a running workflow synchronously.

TODO
The `temporal workflow update` command is used to synchronously [Update](/concepts/what-is-an-update) a
[WorkflowExecution](/concepts/what-is-a-workflow-execution) by [ID](/concepts/what-is-a-workflow-id).

```
temporal workflow update \
--workflow-id MyWorkflowId \
--name MyUpdate \
--input '{"Input": "As-JSON"}'
```

Use the options listed below to change the command's behavior.

#### Options

* `--name` (string) - Update Name. Required.
* `--workflow-id`, `-w` (string) - Workflow Id. Required.
* `--run-id`, `-r` (string) - Run Id. If unset, the currently running Workflow Execution receives the Update.
* `--first-execution-run-id` (string) - Send the Update to the last Workflow Execution in the chain that started
with this Run Id.

Includes options set for [payload input](#options-set-for-payload-input).
Loading