diff --git a/go/cmd/vtbackup/cli/vtbackup.go b/go/cmd/vtbackup/cli/vtbackup.go index 84a26af156f..26c270233c1 100644 --- a/go/cmd/vtbackup/cli/vtbackup.go +++ b/go/cmd/vtbackup/cli/vtbackup.go @@ -340,10 +340,16 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac if err != nil { return fmt.Errorf("failed to initialize mysql config: %v", err) } + ctx, cancelCtx := context.WithCancel(ctx) backgroundCtx, cancelbackgroundCtx := context.WithCancel(backgroundCtx) - mysqld.OnFailure(func(err error) { - log.Warning("Cancelling the vtbackup context as MySQL has failed") + defer func() { + cancelCtx() + cancelbackgroundCtx() + }() + + mysqld.OnTerm(func() { + log.Warning("Cancelling the vtbackup context as MySQL has terminated") cancelCtx() cancelbackgroundCtx() }) @@ -533,12 +539,12 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac waitStartTime = time.Now() - continuousErrorCount int + // continuousErrorCount int ) for { - if continuousErrorCount == maximumErrorCountWhenWaitingForReplicationStatus { - return fmt.Errorf("timeout waiting for replication status after %d errors", maximumErrorCountWhenWaitingForReplicationStatus) - } + // if continuousErrorCount == maximumErrorCountWhenWaitingForReplicationStatus { + // return fmt.Errorf("timeout waiting for replication status after %d errors", maximumErrorCountWhenWaitingForReplicationStatus) + // } select { case <-ctx.Done(): @@ -550,7 +556,7 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac status, statusErr = mysqld.ReplicationStatus(ctx) if statusErr != nil { log.Warningf("Error getting replication status: %v", statusErr) - continuousErrorCount++ + // continuousErrorCount++ continue } if status.Position.AtLeast(primaryPos) { @@ -573,11 +579,11 @@ func takeBackup(ctx, backgroundCtx context.Context, topoServer *topo.Server, bac if err := startReplication(ctx, mysqld, topoServer); err != nil { log.Warningf("Failed to restart replication: %v", err) } - continuousErrorCount++ + // continuousErrorCount++ } else { // Since replication is working if we got here, let's reset the error count to zero. // This allows us to avoid failing if we only have transient errors from time to time. - continuousErrorCount = 0 + // continuousErrorCount = 0 phaseStatus.Set([]string{phaseNameCatchupReplication, phaseStatusCatchupReplicationStopped}, 0) } } diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 43ec3d88575..d7435705a8a 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -118,10 +118,9 @@ type Mysqld struct { capabilities capabilitySet // mutex protects the fields below. - mutex sync.Mutex - onTermFuncs []func() - onFailureFuncs []func(error) - cancelWaitCmd chan struct{} + mutex sync.Mutex + onTermFuncs []func() + cancelWaitCmd chan struct{} semiSyncType mysql.SemiSyncType } @@ -446,11 +445,6 @@ func (mysqld *Mysqld) startNoWait(cnf *Mycnf, mysqldArgs ...string) error { for _, callback := range mysqld.onTermFuncs { go callback() } - if err != nil { - for _, failureFunc := range mysqld.onFailureFuncs { - go failureFunc(err) - } - } mysqld.mutex.Unlock() } }(mysqld.cancelWaitCmd) @@ -1253,12 +1247,6 @@ func (mysqld *Mysqld) OnTerm(f func()) { mysqld.onTermFuncs = append(mysqld.onTermFuncs, f) } -func (mysqld *Mysqld) OnFailure(f func(error)) { - mysqld.mutex.Lock() - defer mysqld.mutex.Unlock() - mysqld.onFailureFuncs = append(mysqld.onFailureFuncs, f) -} - func buildLdPaths() ([]string, error) { vtMysqlRoot, err := vtenv.VtMysqlRoot() if err != nil {