diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index a3d4b3cce1c1..3c6dad99546a 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -146,14 +146,13 @@ func (pq *priorityQueue) update(item *replicaItem, priority float64) { } var ( - errQueueDisabled = errors.New("queue disabled") - errQueueStopped = errors.New("queue stopped") - errReplicaNotAddable = errors.New("replica shouldn't be added to queue") + errQueueDisabled = errors.New("queue disabled") + errQueueStopped = errors.New("queue stopped") ) func isExpectedQueueError(err error) bool { cause := errors.Cause(err) - return err == nil || cause == errQueueDisabled || cause == errReplicaNotAddable + return err == nil || cause == errQueueDisabled } // shouldQueueAgain is a helper function to determine whether the @@ -440,7 +439,7 @@ func (h baseQueueHelper) MaybeAdd(ctx context.Context, repl replicaInQueue, now } func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) { - _, err := h.bq.addInternal(ctx, repl.Desc(), true /* should */, prio) + _, err := h.bq.addInternal(ctx, repl.Desc(), prio) if err != nil && log.V(1) { log.Infof(ctx, "during Add: %s", err) } @@ -544,7 +543,10 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. // know what they're getting into so that's fine. realRepl, _ := repl.(*Replica) should, priority := bq.impl.shouldQueue(ctx, now, realRepl, cfg) - if _, err := bq.addInternal(ctx, repl.Desc(), should, priority); !isExpectedQueueError(err) { + if !should { + return + } + if _, err := bq.addInternal(ctx, repl.Desc(), priority); !isExpectedQueueError(err) { log.Errorf(ctx, "unable to add: %s", err) } } @@ -561,7 +563,7 @@ func (bq *baseQueue) requiresSplit(cfg *config.SystemConfig, repl replicaInQueue // the replica is already queued at a lower priority, updates the existing // priority. Expects the queue lock to be held by caller. func (bq *baseQueue) addInternal( - ctx context.Context, desc *roachpb.RangeDescriptor, should bool, priority float64, + ctx context.Context, desc *roachpb.RangeDescriptor, priority float64, ) (bool, error) { // NB: this is intentionally outside of bq.mu to avoid having to consider // lock ordering constraints. @@ -590,13 +592,6 @@ func (bq *baseQueue) addInternal( return false, nil } - // Note that even though the caller said not to queue the replica, we don't - // want to remove it if it's already been queued. It may have been added by - // a queuer that knows more than this one. - if !should { - return false, errReplicaNotAddable - } - item, ok := bq.mu.replicas[desc.RangeID] if ok { // Replica is already processing. Mark to be requeued. diff --git a/pkg/storage/queue_helpers_testutil.go b/pkg/storage/queue_helpers_testutil.go index 315c6e93ca33..4968bbf8d97f 100644 --- a/pkg/storage/queue_helpers_testutil.go +++ b/pkg/storage/queue_helpers_testutil.go @@ -27,7 +27,7 @@ import ( func (bq *baseQueue) testingAdd( ctx context.Context, repl replicaInQueue, priority float64, ) (bool, error) { - return bq.addInternal(ctx, repl.Desc(), true, priority) + return bq.addInternal(ctx, repl.Desc(), priority) } func forceScanAndProcess(s *Store, q *baseQueue) error { diff --git a/pkg/storage/scanner.go b/pkg/storage/scanner.go index 0db68d5996d7..6210e3ef5b0d 100644 --- a/pkg/storage/scanner.go +++ b/pkg/storage/scanner.go @@ -288,24 +288,25 @@ func (rs *replicaScanner) scanLoop(stopper *stop.Stopper) { shouldStop = rs.waitAndProcess(ctx, stopper, start, nil) } - shouldStop = shouldStop || nil != stopper.RunTask( - ctx, "storage.replicaScanner: scan loop", - func(ctx context.Context) { - // Increment iteration count. - rs.mu.Lock() - defer rs.mu.Unlock() - rs.mu.scanCount++ - rs.mu.total += timeutil.Since(start) - if log.V(6) { - log.Infof(ctx, "reset replica scan iteration") - } - - // Reset iteration and start time. - start = timeutil.Now() - }) + // waitAndProcess returns true when the system is stopping. Note that this + // means we don't have to check the stopper as well. if shouldStop { return } + + // Increment iteration count. + func() { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.mu.scanCount++ + rs.mu.total += timeutil.Since(start) + }() + if log.V(6) { + log.Infof(ctx, "reset replica scan iteration") + } + + // Reset iteration and start time. + start = timeutil.Now() } }) }