Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for batch resets. #473

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 10 additions & 27 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,6 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowQueryCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowResetCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowResetBatchCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
Expand Down Expand Up @@ -1000,6 +999,9 @@ type TemporalWorkflowResetCommand struct {
Reason string
ReapplyType StringEnum
Type StringEnum
BuildId string
Query string
Yes bool
}

func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand {
Expand All @@ -1009,42 +1011,23 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.Use = "reset [flags]"
s.Command.Short = "Resets a Workflow Execution by Event ID or reset type."
if hasHighlighting {
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\x1b[0m\n\n...or a specific any Event after \x1b[1mWorkflowTaskStarted\x1b[0m.\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\x1b[0m\n\nUse the options listed below to change reset behavior."
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\x1b[0m\n\n...or a specific any Event after \x1b[1mWorkflowTaskStarted\x1b[0m.\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\x1b[0m\nFor batch reset only FirstWorkflowTask, LastWorkflowTask or BuildId can be used. Workflow Id, run Id and event Id \nshould not be set.\nUse the options listed below to change reset behavior."
} else {
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\n```\n\n...or a specific any Event after `WorkflowTaskStarted`.\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\n```\n\nUse the options listed below to change reset behavior."
s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\n```\n\n...or a specific any Event after `WorkflowTaskStarted`.\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\n```\nFor batch reset only FirstWorkflowTask, LastWorkflowTask or BuildId can be used. Workflow Id, run Id and event Id \nshould not be set.\nUse the options listed below to change reset behavior."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id.")
s.Command.Flags().IntVarP(&s.EventId, "event-id", "e", 0, "The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "The reason why this workflow is being reset.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "reason")
s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All")
s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Event types to reapply after the reset point. Accepted values: All, Signal, None.")
s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew"}, "")
s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowResetBatchCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
}

func NewTemporalWorkflowResetBatchCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetBatchCommand {
var s TemporalWorkflowResetBatchCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "reset-batch [flags]"
s.Command.Short = "Reset a batch of Workflow Executions by reset type."
s.Command.Long = "TODO"
s.Command.Args = cobra.NoArgs
s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "")
s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.")
s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new.")
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Start a batch reset to operate on Workflow Executions with given List Filter.")
s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
7 changes: 2 additions & 5 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/query/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"

"github.com/temporalio/cli/temporalcli/internal/printer"
ast2023 marked this conversation as resolved.
Show resolved Hide resolved
)

func (c *TemporalWorkflowCancelCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -58,10 +59,6 @@ func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string)
c.Type, c.RejectCondition, c.WorkflowReferenceOptions)
}

func (*TemporalWorkflowResetBatchCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
Expand Down
101 changes: 98 additions & 3 deletions temporalcli/commands.workflow_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,68 @@ import (
"errors"
"fmt"

"github.com/temporalio/cli/temporalcli/internal/printer"
"github.com/google/uuid"
"go.temporal.io/api/batch/v1"
"go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"

"github.com/temporalio/cli/temporalcli/internal/printer"
)

func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error {
if c.Type.Value == "" && c.EventId <= 0 {
return errors.New("must specify either valid event id or reset type")
validateArguments, doReset := c.getResetOperations()
if err := validateArguments(); err != nil {
return err
}
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
return doReset(cctx, cl)
}

func (c *TemporalWorkflowResetCommand) getResetOperations() (validate func() error, doReset func(*CommandContext, client.Client) error) {
ast2023 marked this conversation as resolved.
Show resolved Hide resolved
if c.WorkflowId != "" {
validate = c.validateWorkflowResetArguments
doReset = c.doWorkflowReset
} else {
validate = c.validateBatchResetArguments
doReset = c.runBatchReset
}
return validate, doReset
}

func (c *TemporalWorkflowResetCommand) validateWorkflowResetArguments() error {
if c.Type.Value == "" && c.EventId <= 0 {
return errors.New("must specify either valid event id or reset type")
}
if c.WorkflowId == "" {
return errors.New("must specify workflow id")
}
return nil
}

func (c *TemporalWorkflowResetCommand) validateBatchResetArguments() error {
if c.Type.Value == "" {
return errors.New("must specify reset type")
}
if c.RunId != "" {
return errors.New("must not specify run Id")
}
if c.EventId != 0 {
return errors.New("must not specify event Id")
}
if c.Type.Value == "BuildId" && c.BuildId == "" {
return errors.New("must specify build Id for BuildId based batch reset")
}
return nil
}
func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl client.Client) error {

var err error
resetBaseRunID := c.RunId
eventID := int64(c.EventId)
if c.Type.Value != "" {
Expand Down Expand Up @@ -62,6 +107,56 @@ func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) err
return nil
}

func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl client.Client) error {
request := workflowservice.StartBatchOperationRequest{
Namespace: c.Parent.Namespace,
JobId: uuid.NewString(),
VisibilityQuery: c.Query,
Reason: c.Reason,
}
request.Operation = &workflowservice.StartBatchOperationRequest_ResetOperation{
ResetOperation: &batch.BatchOperationReset{
Identity: clientIdentity(),
Options: c.batchResetOptions(c.Type.Value),
},
}
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query})
if err != nil {
return fmt.Errorf("failed counting workflows from query: %w", err)
}
yes, err := cctx.promptYes(
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), c.Yes)
if err != nil {
return err
}
if !yes {
return fmt.Errorf("user denied confirmation")
}

return startBatchJob(cctx, cl, &request)
}

func (c *TemporalWorkflowResetCommand) batchResetOptions(resetType string) *common.ResetOptions {
switch resetType {
case "FirstWorkflowTask":
return &common.ResetOptions{
Target: &common.ResetOptions_FirstWorkflowTask{},
}
case "LastWorkflowTask":
return &common.ResetOptions{
Target: &common.ResetOptions_LastWorkflowTask{},
}
case "BuildId":
return &common.ResetOptions{
Target: &common.ResetOptions_BuildId{
BuildId: c.BuildId,
},
}
default:
panic("unsupported operation type was filtered by cli framework")
}
}

func (c *TemporalWorkflowResetCommand) getResetEventIDByType(ctx context.Context, cl client.Client) (string, int64, error) {
resetType, namespace, wid, rid := c.Type.Value, c.Parent.Namespace, c.WorkflowId, c.RunId
wfsvc := cl.WorkflowService()
Expand Down
Loading
Loading