diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index b237686e..4cb77e96 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -771,7 +771,6 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowQueryCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowResetCommand(cctx, &s).Command) - s.Command.AddCommand(&NewTemporalWorkflowResetBatchCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) @@ -1000,6 +999,9 @@ type TemporalWorkflowResetCommand struct { Reason string ReapplyType StringEnum Type StringEnum + BuildId string + Query string + Yes bool } func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetCommand { @@ -1009,42 +1011,23 @@ func NewTemporalWorkflowResetCommand(cctx *CommandContext, parent *TemporalWorkf s.Command.Use = "reset [flags]" s.Command.Short = "Resets a Workflow Execution by Event ID or reset type." if hasHighlighting { - s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\x1b[0m\n\n...or a specific any Event after \x1b[1mWorkflowTaskStarted\x1b[0m.\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\x1b[0m\n\nUse the options listed below to change reset behavior." + s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\x1b[0m\n\n...or a specific any Event after \x1b[1mWorkflowTaskStarted\x1b[0m.\n\x1b[1mtemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\x1b[0m\nFor batch reset only FirstWorkflowTask, LastWorkflowTask or BuildId can be used. Workflow Id, run Id and event Id \nshould not be set.\nUse the options listed below to change reset behavior." } else { - s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\n```\n\n...or a specific any Event after `WorkflowTaskStarted`.\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\n```\n\nUse the options listed below to change reset behavior." + s.Command.Long = "The temporal workflow reset command resets a Workflow Execution.\nA reset allows the Workflow to resume from a certain point without losing its parameters or Event History.\n\nThe Workflow Execution can be set to a given Event Type:\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --type=LastContinuedAsNew\n```\n\n...or a specific any Event after `WorkflowTaskStarted`.\n```\ntemporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent\n```\nFor batch reset only FirstWorkflowTask, LastWorkflowTask or BuildId can be used. Workflow Id, run Id and event Id \nshould not be set.\nUse the options listed below to change reset behavior." } s.Command.Args = cobra.NoArgs - s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id.") - _ = cobra.MarkFlagRequired(s.Command.Flags(), "workflow-id") + s.Command.Flags().StringVarP(&s.WorkflowId, "workflow-id", "w", "", "Workflow Id. Required for non-batch reset operations.") s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run Id.") s.Command.Flags().IntVarP(&s.EventId, "event-id", "e", 0, "The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others.") s.Command.Flags().StringVar(&s.Reason, "reason", "", "The reason why this workflow is being reset.") _ = cobra.MarkFlagRequired(s.Command.Flags(), "reason") s.ReapplyType = NewStringEnum([]string{"All", "Signal", "None"}, "All") s.Command.Flags().Var(&s.ReapplyType, "reapply-type", "Event types to reapply after the reset point. Accepted values: All, Signal, None.") - s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew"}, "") - s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew.") - s.Command.Run = func(c *cobra.Command, args []string) { - if err := s.run(cctx, args); err != nil { - cctx.Options.Fail(err) - } - } - return &s -} - -type TemporalWorkflowResetBatchCommand struct { - Parent *TemporalWorkflowCommand - Command cobra.Command -} - -func NewTemporalWorkflowResetBatchCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowResetBatchCommand { - var s TemporalWorkflowResetBatchCommand - s.Parent = parent - s.Command.DisableFlagsInUseLine = true - s.Command.Use = "reset-batch [flags]" - s.Command.Short = "Reset a batch of Workflow Executions by reset type." - s.Command.Long = "TODO" - s.Command.Args = cobra.NoArgs + s.Type = NewStringEnum([]string{"FirstWorkflowTask", "LastWorkflowTask", "LastContinuedAsNew", "BuildId"}, "") + s.Command.Flags().VarP(&s.Type, "type", "t", "Event type to which you want to reset. Accepted values: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId.") + s.Command.Flags().StringVar(&s.BuildId, "build-id", "", "Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new.") + s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Start a batch reset to operate on Workflow Executions with given List Filter.") + s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Confirm prompt to perform batch. Only allowed if query is present.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 1c7b9bdc..40ad08b6 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -58,10 +58,6 @@ func (c *TemporalWorkflowQueryCommand) run(cctx *CommandContext, args []string) c.Type, c.RejectCondition, c.WorkflowReferenceOptions) } -func (*TemporalWorkflowResetBatchCommand) run(*CommandContext, []string) error { - return fmt.Errorf("TODO") -} - func (c *TemporalWorkflowSignalCommand) run(cctx *CommandContext, args []string) error { cl, err := c.Parent.ClientOptions.dialClient(cctx) if err != nil { diff --git a/temporalcli/commands.workflow_reset.go b/temporalcli/commands.workflow_reset.go index ea99b6e0..39dbe130 100644 --- a/temporalcli/commands.workflow_reset.go +++ b/temporalcli/commands.workflow_reset.go @@ -5,23 +5,68 @@ import ( "errors" "fmt" - "github.com/temporalio/cli/temporalcli/internal/printer" + "github.com/google/uuid" + "go.temporal.io/api/batch/v1" "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" + + "github.com/temporalio/cli/temporalcli/internal/printer" ) func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) error { - if c.Type.Value == "" && c.EventId <= 0 { - return errors.New("must specify either valid event id or reset type") + validateArguments, doReset := c.getResetOperations() + if err := validateArguments(); err != nil { + return err } cl, err := c.Parent.ClientOptions.dialClient(cctx) if err != nil { return err } defer cl.Close() + return doReset(cctx, cl) +} + +func (c *TemporalWorkflowResetCommand) getResetOperations() (validate func() error, doReset func(*CommandContext, client.Client) error) { + if c.WorkflowId != "" { + validate = c.validateWorkflowResetArguments + doReset = c.doWorkflowReset + } else { + validate = c.validateBatchResetArguments + doReset = c.runBatchReset + } + return validate, doReset +} + +func (c *TemporalWorkflowResetCommand) validateWorkflowResetArguments() error { + if c.Type.Value == "" && c.EventId <= 0 { + return errors.New("must specify either valid event id or reset type") + } + if c.WorkflowId == "" { + return errors.New("must specify workflow id") + } + return nil +} + +func (c *TemporalWorkflowResetCommand) validateBatchResetArguments() error { + if c.Type.Value == "" { + return errors.New("must specify reset type") + } + if c.RunId != "" { + return errors.New("must not specify run Id") + } + if c.EventId != 0 { + return errors.New("must not specify event Id") + } + if c.Type.Value == "BuildId" && c.BuildId == "" { + return errors.New("must specify build Id for BuildId based batch reset") + } + return nil +} +func (c *TemporalWorkflowResetCommand) doWorkflowReset(cctx *CommandContext, cl client.Client) error { + var err error resetBaseRunID := c.RunId eventID := int64(c.EventId) if c.Type.Value != "" { @@ -62,6 +107,56 @@ func (c *TemporalWorkflowResetCommand) run(cctx *CommandContext, _ []string) err return nil } +func (c *TemporalWorkflowResetCommand) runBatchReset(cctx *CommandContext, cl client.Client) error { + request := workflowservice.StartBatchOperationRequest{ + Namespace: c.Parent.Namespace, + JobId: uuid.NewString(), + VisibilityQuery: c.Query, + Reason: c.Reason, + } + request.Operation = &workflowservice.StartBatchOperationRequest_ResetOperation{ + ResetOperation: &batch.BatchOperationReset{ + Identity: clientIdentity(), + Options: c.batchResetOptions(c.Type.Value), + }, + } + count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query}) + if err != nil { + return fmt.Errorf("failed counting workflows from query: %w", err) + } + yes, err := cctx.promptYes( + fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), c.Yes) + if err != nil { + return err + } + if !yes { + return fmt.Errorf("user denied confirmation") + } + + return startBatchJob(cctx, cl, &request) +} + +func (c *TemporalWorkflowResetCommand) batchResetOptions(resetType string) *common.ResetOptions { + switch resetType { + case "FirstWorkflowTask": + return &common.ResetOptions{ + Target: &common.ResetOptions_FirstWorkflowTask{}, + } + case "LastWorkflowTask": + return &common.ResetOptions{ + Target: &common.ResetOptions_LastWorkflowTask{}, + } + case "BuildId": + return &common.ResetOptions{ + Target: &common.ResetOptions_BuildId{ + BuildId: c.BuildId, + }, + } + default: + panic("unsupported operation type was filtered by cli framework") + } +} + func (c *TemporalWorkflowResetCommand) getResetEventIDByType(ctx context.Context, cl client.Client) (string, int64, error) { resetType, namespace, wid, rid := c.Type.Value, c.Parent.Namespace, c.WorkflowId, c.RunId wfsvc := cl.WorkflowService() diff --git a/temporalcli/commands.workflow_reset_test.go b/temporalcli/commands.workflow_reset_test.go index 359631dd..157929ba 100644 --- a/temporalcli/commands.workflow_reset_test.go +++ b/temporalcli/commands.workflow_reset_test.go @@ -2,18 +2,25 @@ package temporalcli_test import ( "context" - "errors" "fmt" + "sync/atomic" + "testing" "time" "github.com/google/uuid" "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" + "go.temporal.io/api/history/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/rpc" + "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/worker_versioning" ) func (s *SharedServerSuite) awaitNextWorkflow(searchAttr string) { @@ -119,7 +126,7 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToEventID() { s.Worker.OnDevActivity(func(ctx context.Context, a any) (any, error) { n, ok := a.(float64) if !ok { - return nil, fmt.Errorf("expected int, not %T (%v)", a, a) + return nil, fmt.Errorf("expected float64, not %T (%v)", a, a) } switch n { case 1: @@ -127,7 +134,7 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToEventID() { case 2: twoExecutions++ default: - return 0, errors.New("you've broken the test!") + return 0, fmt.Errorf("activity expected input 1 or 2 got %v", n) } return n, nil }) @@ -207,3 +214,281 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToEventID() { s.Equal(1, oneExecutions, "Should not have re-executed the first activity") s.Equal(2, twoExecutions, "Should have re-executed the second activity") } + +func (s *SharedServerSuite) TestBatchResetByBuildId() { + sut := newSystemUnderTest(s) + + sut.startWorkerFor(originalWorkflow, workflowOptions{name: "wf", version: "v1"}) + sut.executeWorkflow("wf") + sut.waitWorkflowBlockedAfterFirstActivity() + sut.stopWorkerFor("v1") + + sut.startWorkerFor(extendedWorkflowWithBuggyActivity, workflowOptions{name: "wf", version: "v2"}) + sut.allowWorkflowToContinue() + sut.waitBadActivityExecuted() + sut.stopWorkerFor("v2") + + sut.startWorkerFor(extendedWorkflowWithNonDeterministicFix, workflowOptions{name: "wf", version: "v3"}) + sut.waitBlockOnNonDeterministicError() + + query := fmt.Sprintf("WorkflowId = \"%s\"", sut.run.GetID()) + s.CommandHarness.Stdin.WriteString("y\n") + res := s.Execute( + "workflow", "reset", + "--address", s.Address(), + "--reason", "test-reset-event-id", + "--type", "BuildId", + "--build-id", sut.buildPrefix+"v2", + "--query", query, + ) + require.NoError(s.T(), res.Err) + + sut.assertWorkflowComplete() + sut.assertOnlySecondActivityRetried() + sut.stopWorkerFor("v3") +} + +const ( + badActivity = iota + firstActivity + secondActivity + thirdActivity +) + +type assertions interface { + NoError(error, ...interface{}) + Error(error, ...interface{}) + True(bool, ...interface{}) + Eventually(func() bool, time.Duration, time.Duration, ...interface{}) +} + +type batchResetTestData struct { + t *testing.T + buildPrefix string + assert assertions + client client.Client + tq string + counters []*atomic.Int32 + run client.WorkflowRun + ctx context.Context + ctxCancelF context.CancelFunc + workers map[string]worker.Worker + namespace string +} + +func newSystemUnderTest(suite *SharedServerSuite) *batchResetTestData { + sut := batchResetTestData{ + t: suite.T(), + counters: []*atomic.Int32{&atomic.Int32{}, &atomic.Int32{}, &atomic.Int32{}, &atomic.Int32{}}, + assert: suite, + client: suite.Client, + namespace: suite.Namespace(), + buildPrefix: uuid.NewString()[:6] + "-", + tq: suite.T().Name(), + workers: make(map[string]worker.Worker), + } + + suite.T().Cleanup(func() { sut.stopAllWorkers() }) + + sut.ctx, sut.ctxCancelF = context.WithTimeout(context.Background(), 30*time.Second) + return &sut +} + +func (sut *batchResetTestData) internalVersionFor(v string) string { + return sut.buildPrefix + v +} + +func (sut *batchResetTestData) firstActivity() error { + sut.counters[firstActivity].Add(1) + return nil +} + +func (sut *batchResetTestData) secondActivity() error { + sut.counters[secondActivity].Add(1) + return nil +} + +func (sut *batchResetTestData) thirdActivity() error { + sut.counters[thirdActivity].Add(1) + return nil +} + +func (sut *batchResetTestData) badActivity() error { + sut.counters[badActivity].Add(1) + return nil +} + +type workflowOptions struct { + name string + version string +} + +func (sut *batchResetTestData) startWorkerFor(wf VersionedTestWorkflow, options workflowOptions) { + sut.t.Helper() + sut.workers[options.version] = worker.New(sut.client, sut.tq, worker.Options{BuildID: sut.internalVersionFor(options.version)}) + sut.workers[options.version].RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: options.name}) + sut.workers[options.version].RegisterActivityWithOptions(sut.firstActivity, activity.RegisterOptions{Name: "firstActivity"}) + sut.workers[options.version].RegisterActivityWithOptions(sut.secondActivity, activity.RegisterOptions{Name: "secondActivity"}) + sut.workers[options.version].RegisterActivityWithOptions(sut.thirdActivity, activity.RegisterOptions{Name: "thirdActivity"}) + sut.workers[options.version].RegisterActivityWithOptions(sut.badActivity, activity.RegisterOptions{Name: "badActivity"}) + err := sut.workers[options.version].Start() + sut.assert.NoError(err, "Could not start worker for %v, workflowVersion %s", wf, options.version) +} + +func (sut *batchResetTestData) stopWorkerFor(version string) { + if w, ok := sut.workers[version]; ok { + w.Stop() + delete(sut.workers, version) + } +} + +func (sut *batchResetTestData) stopAllWorkers() { + for version := range sut.workers { + sut.stopWorkerFor(version) + } +} + +func (sut *batchResetTestData) executeWorkflow(workflowName string) { + sut.t.Helper() + run, err := sut.client.ExecuteWorkflow(sut.ctx, client.StartWorkflowOptions{TaskQueue: sut.tq}, workflowName) + sut.assert.NoError(err, "Failed to execute workflow %s", workflowName) + sut.run = run +} + +type VersionedTestWorkflow = func(workflow.Context, any) (string, error) + +func originalWorkflow(ctx workflow.Context, _ any) (string, error) { + ao := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ScheduleToCloseTimeout: 5 * time.Second}) + if err := workflow.ExecuteActivity(ao, "firstActivity").Get(ctx, nil); err != nil { + return "original workflow, first activity", err + } + + ch := workflow.GetSignalChannel(ctx, "wait") + ch.Receive(ctx, nil) + + if err := workflow.ExecuteActivity(ao, "secondActivity").Get(ctx, nil); err != nil { + return "first workflow, second activity", fmt.Errorf("firstWorkflow:secondActivity:%v", err) + } + return "done 1!", nil +} + +func extendedWorkflowWithBuggyActivity(ctx workflow.Context, arg any) (string, error) { + if result, err := originalWorkflow(ctx, arg); err != nil { + return "buggy workflow: " + result, fmt.Errorf("buggyWorkflow:firtstActivity:%v", err) + } + + // (we run activity in a loop so that's visible in history, not just failing workflow tasks, + // otherwise we wouldn't need a reset to "fix" it, just a new build would be enough.) + ao := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ScheduleToCloseTimeout: 5 * time.Second}) + for { + if err := workflow.ExecuteActivity(ao, "badActivity").Get(ctx, nil); err != nil { + return "buggy workflow, bad activity", fmt.Errorf("buggyWorkflow:badActivity:%v", err) + } + workflow.Sleep(ctx, time.Second) + } +} + +func extendedWorkflowWithNonDeterministicFix(ctx workflow.Context, arg any) (string, error) { + if result, err := originalWorkflow(ctx, arg); err != nil { + return "fixed workflow: " + result, fmt.Errorf("fixedWorkflow:firtstActivity:%v", err) + } + // introduce non-determinism by replacing badActivity with Sleep. (Replacing one activity with another does not + // result in non-determinism) + workflow.Sleep(ctx, time.Second) + + ao := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ScheduleToCloseTimeout: 5 * time.Second}) + if err := workflow.ExecuteActivity(ao, "thirdActivity").Get(ctx, nil); err != nil { + return "fixed workflow, third activity", fmt.Errorf("fixedWorkflow:thirdActivity:%v", err) + } + + return "done 3!", nil +} + +func (sut *batchResetTestData) waitWorkflowBlockedAfterFirstActivity() { + sut.t.Helper() + sut.assert.Eventually( + func() bool { + workflowHistory, err := sut.getWorkflowHistory() + return err == nil && len(workflowHistory) >= 10 + }, + 5*time.Second, + 100*time.Millisecond, + ) +} + +func (sut *batchResetTestData) allowWorkflowToContinue() { + sut.t.Helper() + err := sut.client.SignalWorkflow(sut.ctx, sut.run.GetID(), sut.run.GetRunID(), "wait", nil) + sut.assert.NoError(err, "failed to signal workflow to allow it to continue") +} + +func (sut *batchResetTestData) waitBadActivityExecuted() { + sut.t.Helper() + sut.assert.Eventually(func() bool { return sut.counters[badActivity].Load() >= 3 }, 10*time.Second, 200*time.Millisecond) +} + +func (sut *batchResetTestData) waitBlockOnNonDeterministicError() { + sut.t.Helper() + // but v3 is not quite compatible, the workflow should be blocked on non-determinism errors for now. + waitCtx, cancel := context.WithTimeout(sut.ctx, 2*time.Second) + defer cancel() + sut.assert.Error(sut.run.Get(waitCtx, nil)) + + // wait for it to appear in visibility + query := fmt.Sprintf(`%s = "%s" and %s = "%s"`, + searchattribute.ExecutionStatus, "Running", + searchattribute.BuildIds, worker_versioning.UnversionedBuildIdSearchAttribute(sut.internalVersionFor("v2"))) + sut.assert.Eventually(func() bool { + resp, err := sut.client.ListWorkflow(sut.ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: sut.namespace, + Query: query, + }) + return err == nil && len(resp.Executions) == 1 + }, 10*time.Second, 500*time.Millisecond) + +} + +func (sut *batchResetTestData) assertWorkflowComplete() { + sut.t.Helper() + // need to loop since runid will be resolved early and we need to re-resolve to pick up + // the new run instead of the terminated one + sut.assert.Eventually(func() bool { + var out string + if sut.client.GetWorkflow(sut.ctx, sut.run.GetID(), "").Get(sut.ctx, &out) == nil { + if out == "done 3!" { + return true + } + } + return false + }, 10*time.Second, 200*time.Millisecond) +} + +func (sut *batchResetTestData) assertOnlySecondActivityRetried() { + sut.t.Helper() + firstActivityAttempts := sut.counters[firstActivity].Load() + secondActivityAttempts := sut.counters[secondActivity].Load() + thirdActivityAttempts := sut.counters[thirdActivity].Load() + sut.assert.True( + int32(1) == firstActivityAttempts && int32(2) == secondActivityAttempts && int32(1) == thirdActivityAttempts, + "expected only second activity restarted, got first attempts: %d, second attempts: %d, third attempts: %d", + firstActivityAttempts, + secondActivityAttempts, + thirdActivityAttempts, + ) +} + +func (sut *batchResetTestData) getWorkflowHistory() ([]*history.HistoryEvent, error) { + ctx, _ := rpc.NewContextWithTimeoutAndVersionHeaders(90 * time.Second) + iter := sut.client.GetWorkflowHistory(ctx, sut.run.GetID(), "", false, 1) + + events := make([]*history.HistoryEvent, 0, 20) + for iter.HasNext() { + event, err := iter.Next() + if err != nil { + return events, err + } + events = append(events, event) + } + + return events, nil +} diff --git a/temporalcli/commandsmd/commands.md b/temporalcli/commandsmd/commands.md index 61ad8570..3f952ecd 100644 --- a/temporalcli/commandsmd/commands.md +++ b/temporalcli/commandsmd/commands.md @@ -464,21 +464,23 @@ temporal workflow reset --workflow-id=meaningful-business-id --type=LastContinue ``` temporal workflow reset --workflow-id=meaningful-business-id --event-id=MyLastEvent ``` - +For batch reset only FirstWorkflowTask, LastWorkflowTask or BuildId can be used. Workflow Id, run Id and event Id +should not be set. Use the options listed below to change reset behavior. #### Options -* `--workflow-id`, `-w` (string) - Workflow Id. Required. +* `--workflow-id`, `-w` (string) - Workflow Id. Required for non-batch reset operations. * `--run-id`, `-r` (string) - Run Id. * `--event-id`, `-e` (int) - The Event Id for any Event after `WorkflowTaskStarted` you want to reset to (exclusive). It can be `WorkflowTaskCompleted`, `WorkflowTaskFailed` or others. * `--reason` (string) - The reason why this workflow is being reset. Required. * `--reapply-type` (string-enum) - Event types to reapply after the reset point. Options: All, Signal, None. Default: All. -* `--type`, `-t` (string-enum) - Event type to which you want to reset. Options: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew. +* `--type`, `-t` (string-enum) - Event type to which you want to reset. Options: FirstWorkflowTask, LastWorkflowTask, LastContinuedAsNew, BuildId. +* `--build-id` (string) - Only used if type is BuildId. Reset the first workflow task processed by this build id. Note that by default, this reset is allowed to be to a prior run in a chain of continue-as-new. +* `--query`, `-q` (string) - Start a batch reset to operate on Workflow Executions with given List Filter. +* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present. -### temporal workflow reset-batch: Reset a batch of Workflow Executions by reset type. -TODO ### temporal workflow show: Show Event History for a Workflow Execution.