Skip to content

Commit

Permalink
Merge pull request #7248 from planetscale/rn-wait-for-workflow-start
Browse files Browse the repository at this point in the history
V2 Workflow Start: wait for streams to start and report errors if any while starting a workflow
  • Loading branch information
rohit-nayak-ps authored Jan 5, 2021
2 parents 5b6dfb7 + b1024ef commit 4aac378
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 12 deletions.
9 changes: 6 additions & 3 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ func testReshardV2Workflow(t *testing.T) {

createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-")

checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

Expand All @@ -258,7 +259,9 @@ func testMoveTablesV2Workflow(t *testing.T) {
// test basic forward and reverse flows
setupCustomerKeyspace(t)
moveTables2Start(t, "customer")
checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

Expand Down
73 changes: 67 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ import (
"context"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -2016,7 +2015,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if err != nil {
return err
}
s += "Following vreplication streams are running in this workflow:\n\n"
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow)
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
Expand All @@ -2027,11 +2026,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
msg += " Vstream may not be running."
}
txLag := int64(now) - st.TransactionTimestamp
msg += fmt.Sprintf(" VStream Lag: %ds", txLag/1e9)
s += fmt.Sprintf("Stream %s (id=%d) :: Status: %s.%s\n", ksShard, st.ID, st.State, msg)
msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9)
if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0
msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC))
}
s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg)
}
}
wr.Logger().Printf("\n%s\n\n", s)
wr.Logger().Printf("\n%s\n", s)
return nil
}

Expand Down Expand Up @@ -2062,6 +2064,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
vrwp.Tables = *tables
vrwp.AllTables = *allTables
vrwp.ExcludeTables = *excludes
vrwp.Timeout = *timeout
workflowType = wrangler.MoveTablesWorkflow
case wrangler.ReshardWorkflow:
if *sourceShards == "" || *targetShards == "" {
Expand Down Expand Up @@ -2142,7 +2145,65 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printCopyProgress()
case vReplicationWorkflowActionStart:
err = wf.Start()
//TODO: wait for streams to start or report error (pos != "", Message contains error, tx/update time recent)
if err != nil {
return err
}
wr.Logger().Printf("Waiting for workflow to start:\n")
type streamCount struct {
total, running int64
}
errCh := make(chan error)
wfErrCh := make(chan []*wrangler.WorkflowError)
progressCh := make(chan *streamCount)
timedCtx, cancelTimedCtx := context.WithTimeout(ctx, *timeout)
defer cancelTimedCtx()

go func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String())
return
case <-ticker.C:
totalStreams, runningStreams, workflowErrors, err := wf.GetStreamCount()
if err != nil {
errCh <- err
close(errCh)
return
}
if len(workflowErrors) > 0 {
wfErrCh <- workflowErrors
}
progressCh <- &streamCount{
total: totalStreams,
running: runningStreams,
}
}
}
}(timedCtx)

for {
select {
case progress := <-progressCh:
if progress.running == progress.total {
wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
return nil
}
wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
case err := <-errCh:
wr.Logger().Error(err)
cancelTimedCtx()
return err
case wfErrs := <-wfErrCh:
wr.Logger().Printf("Found problems with the streams created for this workflow:\n")
for _, wfErr := range wfErrs {
wr.Logger().Printf("\tTablet: %d, Id: %d :: %s\n", wfErr.Tablet, wfErr.ID, wfErr.Description)
}
return fmt.Errorf("errors starting workflow")
}
}
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
Expand Down
1 change: 0 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe
return nil, nil, err
}
for _, cell := range cells {
wr.Logger().Infof("cell %s", cell)
srvKeyspace, err := wr.ts.GetSrvKeyspace(ctx, cell, targetKeyspace)
if err != nil {
return nil, nil, err
Expand Down
1 change: 0 additions & 1 deletion go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func TestTableMigrateMainflow(t *testing.T) {
// TestShardMigrate tests table mode migrations.
// This has to be kept in sync with TestTableMigrate.
func TestShardMigrateMainflow(t *testing.T) {
//t.Skip("To be fixed before release") //FIXME
ctx := context.Background()
tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"})
defer tme.stopTablets(t)
Expand Down
48 changes: 47 additions & 1 deletion go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string {
var stateInfo []string
s := ""
if !vrw.Exists() {
stateInfo = append(stateInfo, "Not Started")
stateInfo = append(stateInfo, WorkflowStateNotStarted)
} else {
if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 {
s = "All Reads Switched"
Expand Down Expand Up @@ -178,6 +178,52 @@ func (vrw *VReplicationWorkflow) Start() error {
return nil
}

// WorkflowError has per stream errors if present in a workflow
type WorkflowError struct {
Tablet string
ID int64
Description string
}

// NewWorkflowError returns a new WorkflowError object
func NewWorkflowError(tablet string, id int64, description string) *WorkflowError {
wfErr := &WorkflowError{
Tablet: tablet,
ID: id,
Description: description,
}
return wfErr
}

// GetStreamCount returns a count of total and running streams and any stream errors
func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowError, error) {
var err error
var workflowErrors []*WorkflowError
var totalStreams, runningStreams int64
res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace)
if err != nil {
return 0, 0, nil, err
}
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
totalStreams++
if strings.HasPrefix(st.Message, "Error:") {
workflowErrors = append(workflowErrors, NewWorkflowError(st.Tablet, st.ID, st.Message))
continue
}
if st.Pos == "" {
continue
}
if st.State == "Running" {
runningStreams++
}
}
}

return totalStreams, runningStreams, workflowErrors, nil
}

// SwitchTraffic switches traffic forward for tablet_types passed
func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error {
if !vrw.Exists() {
Expand Down

0 comments on commit 4aac378

Please sign in to comment.