Skip to content

Commit

Permalink
Merge #37403
Browse files Browse the repository at this point in the history
37403: storage: fixes replica processing by the replica queue r=darinpp a=darinpp

Fixes #36623

This changes the way the replicas are processed by the replica queue.
Prior to the fix, the replica queue was processing each replica only once
and then was moving to the next replica in the queue. So replicas that
require more than one action (most often caused by the lack of join
consensus in the underlying consensus protocol which results in multiple
steps to get to the desired state) will wait a full
cycle between the first and the second action. With the change, each replica
is processed multiple times (up to three) before moving to the next one in the
queue

Release note: None

Co-authored-by: Darin <[email protected]>
  • Loading branch information
craig[bot] and darinpp committed May 14, 2019
2 parents 5fe38b2 + 061ba05 commit 11d0901
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ func (rq *replicateQueue) process(
// snapshot errors, usually signaling that a rebalancing
// reservation could not be made with the selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
if requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */); err != nil {
const maxAttempts = 3
for i := 1; i <= maxAttempts; i++ {
requeue, err := rq.processOneChange(ctx, repl, rq.canTransferLease, false /* dryRun */)
if IsSnapshotError(err) {
// If ChangeReplicas failed because the preemptive snapshot failed, we
// log the error but then return success indicating we should retry the
Expand All @@ -263,20 +265,34 @@ func (rq *replicateQueue) process(
// case we don't want to wait another scanner cycle before reconsidering
// the range.
log.Info(ctx, err)
continue
break
}
return err
} else if requeue {
// Enqueue this replica again to see if there are more changes to be made.
rq.MaybeAddAsync(ctx, repl, rq.store.Clock().Now())
}
if testingAggressiveConsistencyChecks {
if err := rq.store.consistencyQueue.process(ctx, repl, sysCfg); err != nil {
log.Warning(ctx, err)

if err != nil {
return err
}

if testingAggressiveConsistencyChecks {
if err := rq.store.consistencyQueue.process(ctx, repl, sysCfg); err != nil {
log.Warning(ctx, err)
}
}

if !requeue {
return nil
}

if i == maxAttempts {
log.VEventf(ctx, 1, "exhausted re-processing attempts #%d", i)
// Enqueue this replica again to see if there are more changes to be made.
rq.MaybeAddAsync(ctx, repl, rq.store.Clock().Now())
return nil
}

log.VEventf(ctx, 1, "re-processing #%d", i)
}
return nil
}

return errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries)
}

Expand All @@ -297,11 +313,12 @@ func (rq *replicateQueue) processOneChange(
}

rangeInfo := rangeInfoForRepl(repl, desc)
switch action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo); action {
action, _ := rq.allocator.ComputeAction(ctx, zone, rangeInfo)
log.VEventf(ctx, 1, "next replica action: %s", action)
switch action {
case AllocatorNoop:
break
case AllocatorAdd:
log.VEventf(ctx, 1, "adding a new replica")
newStore, details, err := rq.allocator.AllocateTarget(
ctx,
zone,
Expand Down Expand Up @@ -366,8 +383,6 @@ func (rq *replicateQueue) processOneChange(
return false, err
}
case AllocatorRemove:
log.VEventf(ctx, 1, "removing a replica")

// This retry loop involves quick operations on local state, so a
// small MaxBackoff is good (but those local variables change on
// network time scales as raft receives responses).
Expand Down Expand Up @@ -472,7 +487,6 @@ func (rq *replicateQueue) processOneChange(
}
}
case AllocatorRemoveDecommissioning:
log.VEventf(ctx, 1, "removing a decommissioning replica")
decommissioningReplicas := rq.allocator.storePool.decommissioningReplicas(desc.RangeID, desc.Replicas)
if len(decommissioningReplicas) == 0 {
log.VEventf(ctx, 1, "range of replica %s was identified as having decommissioning replicas, "+
Expand Down Expand Up @@ -516,7 +530,6 @@ func (rq *replicateQueue) processOneChange(
}
}
case AllocatorRemoveDead:
log.VEventf(ctx, 1, "removing a dead replica")
if len(deadReplicas) == 0 {
log.VEventf(ctx, 1, "range of replica %s was identified as having dead replicas, but no dead replicas were found", repl)
break
Expand All @@ -536,8 +549,6 @@ func (rq *replicateQueue) processOneChange(
case AllocatorConsiderRebalance:
// The Noop case will result if this replica was queued in order to
// rebalance. Attempt to find a rebalancing target.
log.VEventf(ctx, 1, "allocator noop - considering a rebalance or lease transfer")

if !rq.store.TestingKnobs().DisableReplicaRebalancing {
rebalanceStore, details := rq.allocator.RebalanceTarget(
ctx, zone, repl.RaftStatus(), rangeInfo, storeFilterThrottled)
Expand Down

0 comments on commit 11d0901

Please sign in to comment.