diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_pool.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_pool.go index beb0cc3b467..4ba5f1068e6 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_pool.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_pool.go @@ -18,6 +18,7 @@ package vreplication import ( "context" + "errors" "io" "sync" "sync/atomic" @@ -116,13 +117,6 @@ func (p *parallelWorkersPool) drain(ctx context.Context) (err error) { return vterrors.Wrapf(err, "drain aborted") } } - // p.mu.Lock() - // defer p.mu.Unlock() - // - // p.head = 0 - // for i := range p.workers { - // p.recycleWorker(p.workers[i]) - // } return p.workersError() } @@ -155,7 +149,7 @@ func (p *parallelWorkersPool) availableWorker(ctx context.Context, lastCommitted go func() { if err := w.applyQueuedEvents(ctx); err != nil { - if err == io.EOF { + if errors.Is(vterrors.UnwrapAll(err), io.EOF) { w.pool.posReached.Store(true) } p.workerErrors <- err