Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
37943: storage: remove a couple papercuts in baseQueue/scanner r=tbg a=danhhz

See individual commits for details. Found while trying to grok the queue stuff. Both of these tripped me up for a minute -- "why is this necessary?" -- and in both cases I had to convince myself it wasn't.

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Jun 3, 2019
2 parents 4d815f8 + 6d0dee9 commit bfdf293
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 30 deletions.
23 changes: 9 additions & 14 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,13 @@ func (pq *priorityQueue) update(item *replicaItem, priority float64) {
}

var (
errQueueDisabled = errors.New("queue disabled")
errQueueStopped = errors.New("queue stopped")
errReplicaNotAddable = errors.New("replica shouldn't be added to queue")
errQueueDisabled = errors.New("queue disabled")
errQueueStopped = errors.New("queue stopped")
)

func isExpectedQueueError(err error) bool {
cause := errors.Cause(err)
return err == nil || cause == errQueueDisabled || cause == errReplicaNotAddable
return err == nil || cause == errQueueDisabled
}

// shouldQueueAgain is a helper function to determine whether the
Expand Down Expand Up @@ -440,7 +439,7 @@ func (h baseQueueHelper) MaybeAdd(ctx context.Context, repl replicaInQueue, now
}

func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) {
_, err := h.bq.addInternal(ctx, repl.Desc(), true /* should */, prio)
_, err := h.bq.addInternal(ctx, repl.Desc(), prio)
if err != nil && log.V(1) {
log.Infof(ctx, "during Add: %s", err)
}
Expand Down Expand Up @@ -544,7 +543,10 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
// know what they're getting into so that's fine.
realRepl, _ := repl.(*Replica)
should, priority := bq.impl.shouldQueue(ctx, now, realRepl, cfg)
if _, err := bq.addInternal(ctx, repl.Desc(), should, priority); !isExpectedQueueError(err) {
if !should {
return
}
if _, err := bq.addInternal(ctx, repl.Desc(), priority); !isExpectedQueueError(err) {
log.Errorf(ctx, "unable to add: %s", err)
}
}
Expand All @@ -561,7 +563,7 @@ func (bq *baseQueue) requiresSplit(cfg *config.SystemConfig, repl replicaInQueue
// the replica is already queued at a lower priority, updates the existing
// priority. Expects the queue lock to be held by caller.
func (bq *baseQueue) addInternal(
ctx context.Context, desc *roachpb.RangeDescriptor, should bool, priority float64,
ctx context.Context, desc *roachpb.RangeDescriptor, priority float64,
) (bool, error) {
// NB: this is intentionally outside of bq.mu to avoid having to consider
// lock ordering constraints.
Expand Down Expand Up @@ -590,13 +592,6 @@ func (bq *baseQueue) addInternal(
return false, nil
}

// Note that even though the caller said not to queue the replica, we don't
// want to remove it if it's already been queued. It may have been added by
// a queuer that knows more than this one.
if !should {
return false, errReplicaNotAddable
}

item, ok := bq.mu.replicas[desc.RangeID]
if ok {
// Replica is already processing. Mark to be requeued.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
func (bq *baseQueue) testingAdd(
ctx context.Context, repl replicaInQueue, priority float64,
) (bool, error) {
return bq.addInternal(ctx, repl.Desc(), true, priority)
return bq.addInternal(ctx, repl.Desc(), priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
Expand Down
31 changes: 16 additions & 15 deletions pkg/storage/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,24 +288,25 @@ func (rs *replicaScanner) scanLoop(stopper *stop.Stopper) {
shouldStop = rs.waitAndProcess(ctx, stopper, start, nil)
}

shouldStop = shouldStop || nil != stopper.RunTask(
ctx, "storage.replicaScanner: scan loop",
func(ctx context.Context) {
// Increment iteration count.
rs.mu.Lock()
defer rs.mu.Unlock()
rs.mu.scanCount++
rs.mu.total += timeutil.Since(start)
if log.V(6) {
log.Infof(ctx, "reset replica scan iteration")
}

// Reset iteration and start time.
start = timeutil.Now()
})
// waitAndProcess returns true when the system is stopping. Note that this
// means we don't have to check the stopper as well.
if shouldStop {
return
}

// Increment iteration count.
func() {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.mu.scanCount++
rs.mu.total += timeutil.Since(start)
}()
if log.V(6) {
log.Infof(ctx, "reset replica scan iteration")
}

// Reset iteration and start time.
start = timeutil.Now()
}
})
}
Expand Down

0 comments on commit bfdf293

Please sign in to comment.