Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workflow update start and workflow update execute subcommands #642

Merged
merged 5 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 79 additions & 18 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,9 +2156,9 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor
s.Command.Use = "execute [flags]"
s.Command.Short = "Start new Workflow Execution"
if hasHighlighting {
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n\x1b[1mtemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"Input\": \"As-JSON\"}'\x1b[0m\n\nUse \x1b[1m--event-details\x1b[0m to relay updates to the command-line output in JSON\nformat. When using JSON output (\x1b[1m--output json\x1b[0m), this includes the entire\n\"history\" JSON key for the run."
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n\x1b[1mtemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m\n\nUse \x1b[1m--event-details\x1b[0m to relay updates to the command-line output in JSON\nformat. When using JSON output (\x1b[1m--output json\x1b[0m), this includes the entire\n\"history\" JSON key for the run."
} else {
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n```\ntemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"Input\": \"As-JSON\"}'\n```\n\nUse `--event-details` to relay updates to the command-line output in JSON\nformat. When using JSON output (`--output json`), this includes the entire\n\"history\" JSON key for the run."
s.Command.Long = "Establish a new Workflow Execution and direct its progress to stdout. The\ncommand blocks and returns when the Workflow Execution completes. If your\nWorkflow requires input, pass valid JSON:\n\n```\ntemporal workflow execute\n --workflow-id YourWorkflowId \\\n --type YourWorkflow \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```\n\nUse `--event-details` to relay updates to the command-line output in JSON\nformat. When using JSON output (`--output json`), this includes the entire\n\"history\" JSON key for the run."
}
s.Command.Args = cobra.NoArgs
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
Expand Down Expand Up @@ -2506,9 +2506,9 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.Use = "start [flags]"
s.Command.Short = "Initiate a Workflow Execution"
if hasHighlighting {
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n\x1b[1mtemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\x1b[0m"
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n\x1b[1mtemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n```\ntemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"Input\": \"As-JSON\"}'\n```"
s.Command.Long = "Start a new Workflow Execution. Returns the Workflow- and Run-IDs.\n\n```\ntemporal workflow start \\\n\t\t--workflow-id YourWorkflowId \\\n\t\t--type YourWorkflow \\\n\t\t--task-queue YourTaskQueue \\\n\t\t--input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
Expand Down Expand Up @@ -2598,34 +2598,95 @@ func NewTemporalWorkflowTraceCommand(cctx *CommandContext, parent *TemporalWorkf
type TemporalWorkflowUpdateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
PayloadInputOptions
}

func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateCommand {
var s TemporalWorkflowUpdateCommand
s.Parent = parent
s.Command.Use = "update"
s.Command.Short = "Start and wait for Updates"
s.Command.Long = "An Update is a synchronous call to a Workflow Execution that can change its\nstate, control its flow, and return a result."
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalWorkflowUpdateExecuteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowUpdateStartCommand(cctx, &s).Command)
return &s
}

type UpdateOptions struct {
Name string
WorkflowId string
UpdateId string
RunId string
FirstExecutionRunId string
}

func NewTemporalWorkflowUpdateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowUpdateCommand {
var s TemporalWorkflowUpdateCommand
func (v *UpdateOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
f.StringVar(&v.Name, "name", "", "Handler method name. Required. Aliased as \"--type\".")
_ = cobra.MarkFlagRequired(f, "name")
f.StringVarP(&v.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required.")
_ = cobra.MarkFlagRequired(f, "workflow-id")
f.StringVar(&v.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID. Must be unique per Workflow Execution.")
f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. If unset, updates the currently-running Workflow Execution.")
f.StringVar(&v.FirstExecutionRunId, "first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
}

type TemporalWorkflowUpdateExecuteCommand struct {
Parent *TemporalWorkflowUpdateCommand
Command cobra.Command
UpdateOptions
PayloadInputOptions
}

func NewTemporalWorkflowUpdateExecuteCommand(cctx *CommandContext, parent *TemporalWorkflowUpdateCommand) *TemporalWorkflowUpdateExecuteCommand {
var s TemporalWorkflowUpdateExecuteCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "update [flags]"
s.Command.Short = "Synchronously run a Workflow update handler"
s.Command.Use = "execute [flags]"
s.Command.Short = "Send an Update and wait for it to complete"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an update handler. An update\ncan change the state of a Workflow Execution and return a response:\n\n\x1b[1mtemporal workflow update \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"Input\": \"As-JSON\"}'\x1b[0m"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete or fail. You can also use this to wait for an existing\nupdate to complete, by submitting an existing update ID.\n\n\x1b[1mtemporal workflow update execute \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an update handler. An update\ncan change the state of a Workflow Execution and return a response:\n\n```\ntemporal workflow update \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"Input\": \"As-JSON\"}'\n```"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete or fail. You can also use this to wait for an existing\nupdate to complete, by submitting an existing update ID.\n\n```\ntemporal workflow update execute \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.UpdateOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().StringVar(&s.Name, "name", "", "Handler method name. Required. Aliased as \"--type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow ID. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID. Must be unique per Workflow Execution.")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, updates the currently-running Workflow Execution.")
s.Command.Flags().StringVar(&s.FirstExecutionRunId, "first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"type": "name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowUpdateStartCommand struct {
Parent *TemporalWorkflowUpdateCommand
Command cobra.Command
UpdateOptions
PayloadInputOptions
WaitForStage StringEnum
}

func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *TemporalWorkflowUpdateCommand) *TemporalWorkflowUpdateStartCommand {
var s TemporalWorkflowUpdateStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "start [flags]"
s.Command.Short = "Send an Update and wait for it to be accepted or rejected"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.UpdateOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.WaitForStage = NewStringEnum([]string{"accepted"}, "")
s.Command.Flags().Var(&s.WaitForStage, "wait-for-stage", "Update stage to wait for. The only option is `accepted`, but this option is required. This is to allow a future version of the CLI to choose a default value. Accepted values: accepted. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "wait-for-stage")
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"type": "name",
}))
Expand Down
52 changes: 39 additions & 13 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string)
}
defer cl.Close()

// Get input payloads
input, err := c.buildRawInputPayloads()
if err != nil {
return err
Expand Down Expand Up @@ -187,33 +186,61 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string)
return nil
}

func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
func (c *TemporalWorkflowUpdateStartCommand) run(cctx *CommandContext, args []string) error {
waitForStage := client.WorkflowUpdateStageUnspecified
switch c.WaitForStage.Value {
case "accepted":
waitForStage = client.WorkflowUpdateStageAccepted
}
if waitForStage != client.WorkflowUpdateStageAccepted {
return fmt.Errorf("invalid wait for stage: %v, valid values are: 'accepted'", c.WaitForStage)
}
return workflowUpdateHelper(cctx, c.Parent.Parent.ClientOptions, c.PayloadInputOptions, c.UpdateOptions, waitForStage)
}

func (c *TemporalWorkflowUpdateExecuteCommand) run(cctx *CommandContext, args []string) error {
return workflowUpdateHelper(cctx, c.Parent.Parent.ClientOptions, c.PayloadInputOptions, c.UpdateOptions, client.WorkflowUpdateStageCompleted)
}

func workflowUpdateHelper(cctx *CommandContext,
clientOpts ClientOptions,
inputOpts PayloadInputOptions,
updateOpts UpdateOptions,
waitForStage client.WorkflowUpdateStage,
dandavison marked this conversation as resolved.
Show resolved Hide resolved
) error {
cl, err := clientOpts.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

// Get raw input
input, err := c.buildRawInput()
input, err := inputOpts.buildRawInput()
if err != nil {
return err
}

request := client.UpdateWorkflowOptions{
WorkflowID: c.WorkflowId,
RunID: c.RunId,
UpdateName: c.Name,
UpdateID: c.UpdateId,
FirstExecutionRunID: c.FirstExecutionRunId,
WorkflowID: updateOpts.WorkflowId,
RunID: updateOpts.RunId,
UpdateName: updateOpts.Name,
UpdateID: updateOpts.UpdateId,
FirstExecutionRunID: updateOpts.FirstExecutionRunId,
Args: input,
WaitForStage: client.WorkflowUpdateStageCompleted,
WaitForStage: waitForStage,
}

updateHandle, err := cl.UpdateWorkflow(cctx, request)
if err != nil {
return fmt.Errorf("unable to update workflow: %w", err)
}
if waitForStage == client.WorkflowUpdateStageAccepted {
return cctx.Printer.PrintStructured(
struct {
Name string `json:"name"`
UpdateID string `json:"updateId"`
}{Name: updateOpts.Name, UpdateID: updateHandle.UpdateID()},
printer.StructuredOptions{})
}

var valuePtr interface{}
err = updateHandle.Get(cctx, &valuePtr)
dandavison marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -226,7 +253,7 @@ func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string)
Name string `json:"name"`
UpdateID string `json:"updateId"`
Result interface{} `json:"result"`
}{Name: c.Name, UpdateID: updateHandle.UpdateID(), Result: valuePtr},
}{Name: updateOpts.Name, UpdateID: updateHandle.UpdateID(), Result: valuePtr},
printer.StructuredOptions{})
}

Expand Down Expand Up @@ -330,7 +357,6 @@ func queryHelper(cctx *CommandContext,
}
defer cl.Close()

// Get input payloads
input, err := inputOpts.buildRawInputPayloads()
if err != nil {
return err
Expand Down
Loading
Loading