From afad5c9cb76032e4b24ef2076b8a0c5ac321548a Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 5 Jan 2021 00:32:26 +0100 Subject: [PATCH 1/2] Wait for workflow to start and show stream errors or if streams don't start within timeout, improve Show output Signed-off-by: Rohit Nayak --- go/vt/vtctl/vtctl.go | 73 +++++++++++++++++++++++-- go/vt/wrangler/traffic_switcher.go | 1 - go/vt/wrangler/traffic_switcher_test.go | 1 - go/vt/wrangler/workflow.go | 48 +++++++++++++++- 4 files changed, 114 insertions(+), 9 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 11a01849da1..44dc5e721a5 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -97,7 +97,6 @@ import ( "context" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" @@ -2016,7 +2015,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla if err != nil { return err } - s += "Following vreplication streams are running in this workflow:\n\n" + s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow) for ksShard := range res.ShardStatuses { statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses for _, st := range statuses { @@ -2027,11 +2026,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla msg += " Vstream may not be running." } txLag := int64(now) - st.TransactionTimestamp - msg += fmt.Sprintf(" VStream Lag: %ds", txLag/1e9) - s += fmt.Sprintf("Stream %s (id=%d) :: Status: %s.%s\n", ksShard, st.ID, st.State, msg) + msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9) + if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0 + msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC)) + } + s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg) } } - wr.Logger().Printf("\n%s\n\n", s) + wr.Logger().Printf("\n%s\n", s) return nil } @@ -2062,6 +2064,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla vrwp.Tables = *tables vrwp.AllTables = *allTables vrwp.ExcludeTables = *excludes + vrwp.Timeout = *timeout workflowType = wrangler.MoveTablesWorkflow case wrangler.ReshardWorkflow: if *sourceShards == "" || *targetShards == "" { @@ -2142,7 +2145,65 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla return printCopyProgress() case vReplicationWorkflowActionStart: err = wf.Start() - //TODO: wait for streams to start or report error (pos != "", Message contains error, tx/update time recent) + if err != nil { + return err + } + wr.Logger().Printf("Waiting for workflow to start:\n") + type streamCount struct { + total, running int64 + } + errCh := make(chan error) + wfErrCh := make(chan []*wrangler.WorkflowError) + progressCh := make(chan *streamCount) + timedCtx, cancelTimedCtx := context.WithTimeout(ctx, *timeout) + defer cancelTimedCtx() + + go func(ctx context.Context) { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String()) + return + case <-ticker.C: + totalStreams, runningStreams, workflowErrors, err := wf.GetStreamCount() + if err != nil { + errCh <- err + close(errCh) + return + } + if len(workflowErrors) > 0 { + wfErrCh <- workflowErrors + } + progressCh <- &streamCount{ + total: totalStreams, + running: runningStreams, + } + } + } + }(timedCtx) + + for { + select { + case progress := <-progressCh: + if progress.running == progress.total { + wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total) + return nil + } + wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total) + case err := <-errCh: + wr.Logger().Error(err) + cancelTimedCtx() + return err + case wfErrs := <-wfErrCh: + wr.Logger().Printf("Found problems with the streams created for this workflow:\n") + for _, wfErr := range wfErrs { + wr.Logger().Printf("\tTablet: %d, Id: %d :: %s\n", wfErr.Tablet, wfErr.ID, wfErr.Description) + } + return fmt.Errorf("errors starting workflow") + } + } case vReplicationWorkflowActionSwitchTraffic: err = wf.SwitchTraffic(wrangler.DirectionForward) case vReplicationWorkflowActionReverseTraffic: diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6cf5a3cbda1..c1a549f3ae8 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -157,7 +157,6 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe return nil, nil, err } for _, cell := range cells { - wr.Logger().Infof("cell %s", cell) srvKeyspace, err := wr.ts.GetSrvKeyspace(ctx, cell, targetKeyspace) if err != nil { return nil, nil, err diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index 130be1bfe96..477322d0b58 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -487,7 +487,6 @@ func TestTableMigrateMainflow(t *testing.T) { // TestShardMigrate tests table mode migrations. // This has to be kept in sync with TestTableMigrate. func TestShardMigrateMainflow(t *testing.T) { - //t.Skip("To be fixed before release") //FIXME ctx := context.Background() tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"}) defer tme.stopTablets(t) diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index 44ac91d6561..a15f0380edd 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -120,7 +120,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string { var stateInfo []string s := "" if !vrw.Exists() { - stateInfo = append(stateInfo, "Not Started") + stateInfo = append(stateInfo, WorkflowStateNotStarted) } else { if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 { s = "All Reads Switched" @@ -178,6 +178,52 @@ func (vrw *VReplicationWorkflow) Start() error { return nil } +// WorkflowError has per stream errors if present in a workflow +type WorkflowError struct { + Tablet string + ID int64 + Description string +} + +// NewWorkflowError returns a new WorkflowError object +func NewWorkflowError(tablet string, id int64, description string) *WorkflowError { + wfErr := &WorkflowError{ + Tablet: tablet, + ID: id, + Description: description, + } + return wfErr +} + +// GetStreamCount returns a count of total and running streams and any stream errors +func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowError, error) { + var err error + var workflowErrors []*WorkflowError + var totalStreams, runningStreams int64 + res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace) + if err != nil { + return 0, 0, nil, err + } + for ksShard := range res.ShardStatuses { + statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses + for _, st := range statuses { + totalStreams++ + if strings.HasPrefix(st.Message, "Error:") { + workflowErrors = append(workflowErrors, NewWorkflowError(st.Tablet, st.ID, st.Message)) + continue + } + if st.Pos == "" { + continue + } + if st.State == "Running" { + runningStreams++ + } + } + } + + return totalStreams, runningStreams, workflowErrors, nil +} + // SwitchTraffic switches traffic forward for tablet_types passed func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error { if !vrw.Exists() { From b1024ef170ff1cdecf27cad34eeb05a60614c438 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 5 Jan 2021 01:16:51 +0100 Subject: [PATCH 2/2] Fix test Signed-off-by: Rohit Nayak --- .../vreplication/resharding_workflows_v2_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 86e88fc08ff..0737076e496 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -244,8 +244,9 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-") - - checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched) + if !strings.Contains(lastOutput, "Workflow started successfully") { + t.Fail() + } validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -258,7 +259,9 @@ func testMoveTablesV2Workflow(t *testing.T) { // test basic forward and reverse flows setupCustomerKeyspace(t) moveTables2Start(t, "customer") - checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched) + if !strings.Contains(lastOutput, "Workflow started successfully") { + t.Fail() + } validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t)