diff --git a/store/tikv/range_task.go b/store/tikv/range_task.go index 1fac00a3a2588..dcf3d72069219 100644 --- a/store/tikv/range_task.go +++ b/store/tikv/range_task.go @@ -129,8 +129,6 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe key := startKey for { select { - case <-ctx.Done(): - return errors.Trace(ctx.Err()) case <-statLogTicker.C: logutil.Logger(ctx).Info("range task in progress", zap.String("name", s.name), @@ -168,7 +166,12 @@ func (s *RangeTaskRunner) RunOnRange(ctx context.Context, startKey []byte, endKe } pushTaskStartTime := time.Now() - taskCh <- task + + select { + case taskCh <- task: + case <-ctx.Done(): + break + } metrics.TiKVRangeTaskPushDuration.WithLabelValues(s.name).Observe(time.Since(pushTaskStartTime).Seconds()) if isLast { @@ -247,8 +250,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { } completedRegions, err := w.handler(ctx, *r) - atomic.AddInt32(w.completedRegions, int32(completedRegions)) - if err != nil { logutil.Logger(ctx).Info("canceling range task because of error", zap.String("name", w.name), @@ -259,5 +260,6 @@ func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) { cancel() break } + atomic.AddInt32(w.completedRegions, int32(completedRegions)) } }