Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 Workflow Start: wait for streams to start and report errors if any while starting a workflow #7248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ func testReshardV2Workflow(t *testing.T) {

createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-")
reshard2Start(t, "-80,80-", "-40,40-80,80-c0,c0-")

checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

Expand All @@ -258,7 +259,9 @@ func testMoveTablesV2Workflow(t *testing.T) {
// test basic forward and reverse flows
setupCustomerKeyspace(t)
moveTables2Start(t, "customer")
checkStates(t, wrangler.WorkflowStateNotStarted, wrangler.WorkflowStateNotSwitched)
if !strings.Contains(lastOutput, "Workflow started successfully") {
t.Fail()
}
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

Expand Down
73 changes: 67 additions & 6 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ import (
"context"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -2016,7 +2015,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
if err != nil {
return err
}
s += "Following vreplication streams are running in this workflow:\n\n"
s += fmt.Sprintf("Following vreplication streams are running for workflow %s.%s:\n\n", target, workflow)
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
Expand All @@ -2027,11 +2026,14 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
msg += " Vstream may not be running."
}
txLag := int64(now) - st.TransactionTimestamp
msg += fmt.Sprintf(" VStream Lag: %ds", txLag/1e9)
s += fmt.Sprintf("Stream %s (id=%d) :: Status: %s.%s\n", ksShard, st.ID, st.State, msg)
msg += fmt.Sprintf(" VStream Lag: %ds.", txLag/1e9)
if st.TransactionTimestamp > 0 { // if no events occur after copy phase, TransactionTimeStamp can be 0
msg += fmt.Sprintf(" Tx time: %s.", time.Unix(st.TransactionTimestamp, 0).Format(time.ANSIC))
}
s += fmt.Sprintf("id=%d on %s: Status: %s.%s\n", st.ID, ksShard, st.State, msg)
}
}
wr.Logger().Printf("\n%s\n\n", s)
wr.Logger().Printf("\n%s\n", s)
return nil
}

Expand Down Expand Up @@ -2062,6 +2064,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
vrwp.Tables = *tables
vrwp.AllTables = *allTables
vrwp.ExcludeTables = *excludes
vrwp.Timeout = *timeout
workflowType = wrangler.MoveTablesWorkflow
case wrangler.ReshardWorkflow:
if *sourceShards == "" || *targetShards == "" {
Expand Down Expand Up @@ -2142,7 +2145,65 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
return printCopyProgress()
case vReplicationWorkflowActionStart:
err = wf.Start()
//TODO: wait for streams to start or report error (pos != "", Message contains error, tx/update time recent)
if err != nil {
return err
}
wr.Logger().Printf("Waiting for workflow to start:\n")
type streamCount struct {
total, running int64
}
errCh := make(chan error)
wfErrCh := make(chan []*wrangler.WorkflowError)
progressCh := make(chan *streamCount)
timedCtx, cancelTimedCtx := context.WithTimeout(ctx, *timeout)
defer cancelTimedCtx()

go func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String())
return
case <-ticker.C:
totalStreams, runningStreams, workflowErrors, err := wf.GetStreamCount()
if err != nil {
errCh <- err
close(errCh)
return
}
if len(workflowErrors) > 0 {
wfErrCh <- workflowErrors
}
progressCh <- &streamCount{
total: totalStreams,
running: runningStreams,
}
}
}
}(timedCtx)

for {
select {
case progress := <-progressCh:
if progress.running == progress.total {
wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
return nil
}
wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
case err := <-errCh:
wr.Logger().Error(err)
cancelTimedCtx()
return err
case wfErrs := <-wfErrCh:
wr.Logger().Printf("Found problems with the streams created for this workflow:\n")
for _, wfErr := range wfErrs {
wr.Logger().Printf("\tTablet: %d, Id: %d :: %s\n", wfErr.Tablet, wfErr.ID, wfErr.Description)
}
return fmt.Errorf("errors starting workflow")
}
}
case vReplicationWorkflowActionSwitchTraffic:
err = wf.SwitchTraffic(wrangler.DirectionForward)
case vReplicationWorkflowActionReverseTraffic:
Expand Down
1 change: 0 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (wr *Wrangler) getCellsWithShardReadsSwitched(ctx context.Context, targetKe
return nil, nil, err
}
for _, cell := range cells {
wr.Logger().Infof("cell %s", cell)
srvKeyspace, err := wr.ts.GetSrvKeyspace(ctx, cell, targetKeyspace)
if err != nil {
return nil, nil, err
Expand Down
1 change: 0 additions & 1 deletion go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func TestTableMigrateMainflow(t *testing.T) {
// TestShardMigrate tests table mode migrations.
// This has to be kept in sync with TestTableMigrate.
func TestShardMigrateMainflow(t *testing.T) {
//t.Skip("To be fixed before release") //FIXME
ctx := context.Background()
tme := newTestShardMigrater(ctx, t, []string{"-40", "40-"}, []string{"-80", "80-"})
defer tme.stopTablets(t)
Expand Down
48 changes: 47 additions & 1 deletion go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string {
var stateInfo []string
s := ""
if !vrw.Exists() {
stateInfo = append(stateInfo, "Not Started")
stateInfo = append(stateInfo, WorkflowStateNotStarted)
} else {
if len(ws.RdonlyCellsNotSwitched) == 0 && len(ws.ReplicaCellsNotSwitched) == 0 && len(ws.ReplicaCellsSwitched) > 0 {
s = "All Reads Switched"
Expand Down Expand Up @@ -178,6 +178,52 @@ func (vrw *VReplicationWorkflow) Start() error {
return nil
}

// WorkflowError has per stream errors if present in a workflow
type WorkflowError struct {
Tablet string
ID int64
Description string
}

// NewWorkflowError returns a new WorkflowError object
func NewWorkflowError(tablet string, id int64, description string) *WorkflowError {
wfErr := &WorkflowError{
Tablet: tablet,
ID: id,
Description: description,
}
return wfErr
}

// GetStreamCount returns a count of total and running streams and any stream errors
func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowError, error) {
var err error
var workflowErrors []*WorkflowError
var totalStreams, runningStreams int64
res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace)
if err != nil {
return 0, 0, nil, err
}
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].MasterReplicationStatuses
for _, st := range statuses {
totalStreams++
if strings.HasPrefix(st.Message, "Error:") {
workflowErrors = append(workflowErrors, NewWorkflowError(st.Tablet, st.ID, st.Message))
continue
}
if st.Pos == "" {
continue
}
if st.State == "Running" {
runningStreams++
}
}
}

return totalStreams, runningStreams, workflowErrors, nil
}

// SwitchTraffic switches traffic forward for tablet_types passed
func (vrw *VReplicationWorkflow) SwitchTraffic(direction TrafficSwitchDirection) error {
if !vrw.Exists() {
Expand Down