Skip to content

Commit

Permalink
storage: Continue trying replica scatter when stores are throttled
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
a-robinson committed Jun 1, 2018
1 parent 10f3f00 commit 2d56c10
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
17 changes: 16 additions & 1 deletion pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ func (*allocatorError) purgatoryErrorMarker() {}

var _ purgatoryError = &allocatorError{}

type throttledStoresError struct {
numThrottledStores int
}

func (t *throttledStoresError) Error() string {
return fmt.Sprintf("%d matching stores are currently throttled", t.numThrottledStores)
}

// IsThrottledStoresError returns true iff the error indicates the allocator
// couldn't take action because too many stores were throttled.
func IsThrottledStoresError(err error) bool {
_, ok := err.(*throttledStoresError)
return ok
}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
// replication queue), but this assumption is broken in tests which force
Expand Down Expand Up @@ -361,7 +376,7 @@ func (a *Allocator) AllocateTarget(
// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
return nil, "", &throttledStoresError{numThrottledStores: throttledStoreCount}
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2129,12 +2129,12 @@ func (r *Replica) adminScatter(
for re := retry.StartWithCtx(ctx, retryOpts); re.Next() && replicasAdded < replicasToAdd; {
requeue, err := rq.processOneChange(ctx, r, sysCfg, func() bool { return false } /* canTransferLease */, false /* dryRun */, true /* disableStatsBasedRebalancing */, true /* forceAddReplica */)
if err != nil {
if IsSnapshotError(err) {
if IsSnapshotError(err) || IsThrottledStoresError(err) {
// TODO: remove
log.Infof(ctx, "encountered snapshot error when adding replica for scatter: %s", err)
log.Infof(ctx, "encountered retriable error when adding replica for scatter: %s", err)
continue
}
log.Warningf(ctx, "encountered error when adding replica for scatter: %s", err)
log.Warningf(ctx, "encountered non-retriable error when adding replica for scatter: %s", err)
break
}
replicasAdded++
Expand All @@ -2160,7 +2160,7 @@ func (r *Replica) adminScatter(
log.Infof(ctx, "encountered snapshot error from replicate queue during scatter: %s", err)
continue
}
log.Warningf(ctx, "encountered error from replicate queue during scatter: %s", err)
log.Warningf(ctx, "encountered non-retriable error from replicate queue during scatter: %s", err)
break
}
if !requeue {
Expand Down

0 comments on commit 2d56c10

Please sign in to comment.