From afad5c9cb76032e4b24ef2076b8a0c5ac321548a Mon Sep 17 00:00:00 2001
From: Rohit Nayak <rohit@planetscale.com>
Date: Tue, 5 Jan 2021 00:32:26 +0100
Subject: [PATCH 1/2] Wait for workflow to start and show stream errors or if
 streams don't start within timeout, improve Show output

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
---
 go/vt/vtctl/vtctl.go                    | 73 +++++++++++++++++++++++--
 go/vt/wrangler/traffic_switcher.go      |  1 -
 go/vt/wrangler/traffic_switcher_test.go |  1 -
 go/vt/wrangler/workflow.go              | 48 +++++++++++++++-
 4 files changed, 114 insertions(+), 9 deletions(-)

diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go
index 11a01849da1..44dc5e721a5 100644
--- a/go/vt/vtctl/vtctl.go
+++ b/go/vt/vtctl/vtctl.go
@@ -97,7 +97,6 @@ import (
 	"context"
 	"github.com/golang/protobuf/jsonpb"
 	"github.com/golang/protobuf/proto"
-
 	"vitess.io/vitess/go/flagutil"
 	"vitess.io/vitess/go/json2"
 	"vitess.io/vitess/go/mysql"
@@ -2016,7 +2015,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
 		if err != nil {
 			return err
 		}
-		s += "Following vreplication streams are running in this workflow:\n\n"
+		s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow)
 		for ksShard := range res.ShardStatuses {
 			statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
 			for _, st := range statuses {
@@ -2027,11 +2026,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
 					msg += " Vstream may not be running."
 				}
 				txLag := int64(now) - st.TransactionTimestamp
-				msg += fmt.Sprintf(" VStream Lag: %ds", txLag/1e9)
-				s += fmt.Sprintf("Stream %s (id=%d) :: Status: %s.%s\n", ksShard, st.ID, st.State, msg)
+				msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9)
+				if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0
+					msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC))
+				}
+				s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg)
 			}
 		}
-		wr.Logger().Printf("\n%s\n\n", s)
+		wr.Logger().Printf("\n%s\n", s)
 		return nil
 	}
 
@@ -2062,6 +2064,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
 			vrwp.Tables = *tables
 			vrwp.AllTables = *allTables
 			vrwp.ExcludeTables = *excludes
+			vrwp.Timeout = *timeout
 			workflowType = wrangler.MoveTablesWorkflow
 		case wrangler.ReshardWorkflow:
 			if *sourceShards == "" || *targetShards == "" {
@@ -2142,7 +2145,65 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
 		return printCopyProgress()
 	case vReplicationWorkflowActionStart:
 		err = wf.Start()
-		//TODO: wait for streams to start or report error (pos != "", Message contains error, tx/update time recent)
+		if err != nil {
+			return err
+		}
+		wr.Logger().Printf("Waiting for workflow to start:\n")
+		type streamCount struct {
+			total, running int64
+		}
+		errCh := make(chan error)
+		wfErrCh := make(chan []*wrangler.WorkflowError)
+		progressCh := make(chan *streamCount)
+		timedCtx, cancelTimedCtx := context.WithTimeout(ctx, *timeout)
+		defer cancelTimedCtx()
+
+		go func(ctx context.Context) {
+			ticker := time.NewTicker(1 * time.Second)
+			defer ticker.Stop()
+			for {
+				select {
+				case <-ctx.Done():
+					errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String())
+					return
+				case <-ticker.C:
+					totalStreams, runningStreams, workflowErrors, err := wf.GetStreamCount()
+					if err != nil {
+						errCh <- err
+						close(errCh)
+						return
+					}
+					if len(workflowErrors) > 0 {
+						wfErrCh <- workflowErrors
+					}
+					progressCh <- &streamCount{
+						total:   totalStreams,
+						running: runningStreams,
+					}
+				}
+			}
+		}(timedCtx)
+
+		for {
+			select {
+			case progress := <-progressCh:
+				if progress.running == progress.total {
+					wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
+					return nil
+				}
+				wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
+			case err := <-errCh:
+				wr.Logger().Error(err)
+				cancelTimedCtx()
+				return err
+			case wfErrs := <-wfErrCh:
+				wr.Logger().Printf("Found problems with the streams created for this workflow:\n")
+				for _, wfErr := range wfErrs {
+					wr.Logger().Printf("\tTablet: %d, Id: %d :: %s\n", wfErr.Tablet, wfErr.ID, wfErr.Description)
+				}
+				return fmt.Errorf("errors starting workflow")
+			}
+		}
 	case vReplicationWorkflowActionSwitchTraffic:
 		err = wf.SwitchTraffic(wrangler.DirectionForward)
 	case vReplicationWorkflowActionReverseTraffic:
diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go
index 6cf5a3cbda1..c1a549f3ae8 100644
--- a/go/vt/wrangler/traffic_switcher.go
+++ b/go/vt/wrangler/traffic_switcher.go
@@ -157,7 +157,6 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe
 		return nil, nil, err
 	}
 	for _, cell := range cells {
-		wr.Logger().Infof("cell %s", cell)
 		srvKeyspace, err := wr.ts.GetSrvKeyspace(ctx, cell, targetKeyspace)
 		if err != nil {
 			return nil, nil, err
diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go
index 130be1bfe96..477322d0b58 100644
--- a/go/vt/wrangler/traffic_switcher_test.go
+++ b/go/vt/wrangler/traffic_switcher_test.go
@@ -487,7 +487,6 @@ func TestTableMigrateMainflow(t *testing.T) {
 // TestShardMigrate tests table mode migrations.
 // This has to be kept in sync with TestTableMigrate.
 func TestShardMigrateMainflow(t *testing.T) {
-	//t.Skip("To be fixed before release") //FIXME
 	ctx := context.Background()
 	tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"})
 	defer tme.stopTablets(t)
diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go
index 44ac91d6561..a15f0380edd 100644
--- a/go/vt/wrangler/workflow.go
+++ b/go/vt/wrangler/workflow.go
@@ -120,7 +120,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string {
 	var stateInfo []string
 	s := ""
 	if !vrw.Exists() {
-		stateInfo = append(stateInfo, "Not Started")
+		stateInfo = append(stateInfo, WorkflowStateNotStarted)
 	} else {
 		if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 {
 			s = "All Reads Switched"
@@ -178,6 +178,52 @@ func (vrw *VReplicationWorkflow) Start() error {
 	return nil
 }
 
+// WorkflowError has per stream errors if present in a workflow
+type WorkflowError struct {
+	Tablet      string
+	ID          int64
+	Description string
+}
+
+// NewWorkflowError returns a new WorkflowError object
+func NewWorkflowError(tablet string, id int64, description string) *WorkflowError {
+	wfErr := &WorkflowError{
+		Tablet:      tablet,
+		ID:          id,
+		Description: description,
+	}
+	return wfErr
+}
+
+// GetStreamCount returns a count of total and running streams and any stream errors
+func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowError, error) {
+	var err error
+	var workflowErrors []*WorkflowError
+	var totalStreams, runningStreams int64
+	res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace)
+	if err != nil {
+		return 0, 0, nil, err
+	}
+	for ksShard := range res.ShardStatuses {
+		statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
+		for _, st := range statuses {
+			totalStreams++
+			if strings.HasPrefix(st.Message, "Error:") {
+				workflowErrors = append(workflowErrors, NewWorkflowError(st.Tablet, st.ID, st.Message))
+				continue
+			}
+			if st.Pos == "" {
+				continue
+			}
+			if st.State == "Running" {
+				runningStreams++
+			}
+		}
+	}
+
+	return totalStreams, runningStreams, workflowErrors, nil
+}
+
 // SwitchTraffic switches traffic forward for tablet_types passed
 func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error {
 	if !vrw.Exists() {

From b1024ef170ff1cdecf27cad34eeb05a60614c438 Mon Sep 17 00:00:00 2001
From: Rohit Nayak <rohit@planetscale.com>
Date: Tue, 5 Jan 2021 01:16:51 +0100
Subject: [PATCH 2/2] Fix test

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
---
 .../vreplication/resharding_workflows_v2_test.go         | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
index 86e88fc08ff..0737076e496 100644
--- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
+++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go
@@ -244,8 +244,9 @@ func testReshardV2Workflow(t *testing.T) {
 
 	createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
 	reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-")
-
-	checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
+	if !strings.Contains(lastOutput, "Workflow started successfully") {
+		t.Fail()
+	}
 	validateReadsRouteToSource(t, "replica")
 	validateWritesRouteToSource(t)
 
@@ -258,7 +259,9 @@ func testMoveTablesV2Workflow(t *testing.T) {
 	// test basic forward and reverse flows
 	setupCustomerKeyspace(t)
 	moveTables2Start(t, "customer")
-	checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
+	if !strings.Contains(lastOutput, "Workflow started successfully") {
+		t.Fail()
+	}
 	validateReadsRouteToSource(t, "replica")
 	validateWritesRouteToSource(t)