Skip to content

Commit

Permalink
kvserver: retry failures to rebalance decommissioning replicas
Browse files Browse the repository at this point in the history
This commit makes it such that failures to rebalance replicas on
decommissioning nodes no longer move the replica out of the
replicateQueue as they previously used to. Instead, these failures now
put these replicas into the replicateQueue's purgatory, which will retry
these replicas every minute.

All this is intended to improve the speed of decommissioning towards
its tail end, since previously, failures to rebalance these replicas
meant that they were only retried after about 10 minutes.

Release note: None
  • Loading branch information
aayushshah15 committed Jun 12, 2022
1 parent 1c3437e commit a536b07
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (ae *allocatorError) Error() string {

func (*allocatorError) purgatoryErrorMarker() {}

var _ purgatoryError = &allocatorError{}
var _ PurgatoryError = &allocatorError{}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) {

// First test to make sure we would send the replica to purgatory.
_, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if !errors.HasInterface(err, (*purgatoryError)(nil)) {
if !errors.HasInterface(err, (*PurgatoryError)(nil)) {
t.Fatalf("expected a purgatory error, got: %+v", err)
}

Expand All @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) {
storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour)
a.storePool.detailsMu.Unlock()
_, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if errors.HasInterface(err, (*purgatoryError)(nil)) {
if errors.HasInterface(err, (*PurgatoryError)(nil)) {
t.Fatalf("expected a non purgatory error, got: %+v", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error }

func (rangeMergePurgatoryError) purgatoryErrorMarker() {}

var _ purgatoryError = rangeMergePurgatoryError{}
var _ PurgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
Expand Down
23 changes: 12 additions & 11 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
// failure condition changes.
type purgatoryError interface {
// PurgatoryError indicates a replica processing failure which indicates the
// replica can be placed into purgatory for faster retries than the replica
// scanner's interval.
type PurgatoryError interface {
error
purgatoryErrorMarker() // dummy method for unique interface
}
Expand Down Expand Up @@ -385,7 +385,7 @@ type queueConfig struct {
//
// A queueImpl can opt into a purgatory by returning a non-nil channel from the
// `purgatoryChan` method. A replica is put into purgatory when the `process`
// method returns an error with a `purgatoryError` as an entry somewhere in the
// method returns an error with a `PurgatoryError` as an entry somewhere in the
// `Cause` chain. A replica in purgatory is not processed again until the
// channel is signaled, at which point every replica in purgatory is immediately
// processed. This catchup is run without the `timer` rate limiting but shares
Expand Down Expand Up @@ -419,7 +419,7 @@ type baseQueue struct {
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
stopped bool
// Some tests in this package disable queues.
disabled bool
Expand Down Expand Up @@ -992,8 +992,9 @@ func isBenign(err error) bool {
return errors.HasType(err, (*benignError)(nil))
}

func isPurgatoryError(err error) (purgatoryError, bool) {
var purgErr purgatoryError
// IsPurgatoryError returns true iff the given error is a purgatory error.
func IsPurgatoryError(err error) (PurgatoryError, bool) {
var purgErr PurgatoryError
return purgErr, errors.As(err, &purgErr)
}

Expand Down Expand Up @@ -1089,7 +1090,7 @@ func (bq *baseQueue) finishProcessingReplica(
// the failing replica to purgatory. Note that even if the item was
// scheduled to be requeued, we ignore this if we add the replica to
// purgatory.
if purgErr, ok := isPurgatoryError(err); ok {
if purgErr, ok := IsPurgatoryError(err); ok {
bq.mu.Lock()
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
bq.mu.Unlock()
Expand All @@ -1111,7 +1112,7 @@ func (bq *baseQueue) finishProcessingReplica(
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
// holds replicas which have failed processing.
func (bq *baseQueue) addToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError,
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
) {
bq.mu.AssertHeld()

Expand Down Expand Up @@ -1149,7 +1150,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}

// Otherwise, create purgatory and start processing.
bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{
bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{
repl.GetRangeID(): purgErr,
}

Expand Down
37 changes: 31 additions & 6 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,15 @@ func (rq *replicateQueue) process(
return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries)
}

// decommissionPurgatoryError wraps an error that occurs when attempting to
// rebalance a range that has a replica on a decommissioning node to indicate
// that the error should send the range to purgatory.
type decommissionPurgatoryError struct{ error }

func (decommissionPurgatoryError) purgatoryErrorMarker() {}

var _ PurgatoryError = decommissionPurgatoryError{}

func (rq *replicateQueue) processOneChange(
ctx context.Context,
repl *Replica,
Expand Down Expand Up @@ -634,10 +643,14 @@ func (rq *replicateQueue) processOneChange(
"decommissioning voter %v unexpectedly not found in %v",
decommissioningVoterReplicas[0], voterReplicas)
}
return rq.addOrReplaceVoters(
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil
case AllocatorReplaceDecommissioningNonVoter:
decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas)
decommissioningNonVoterReplicas := rq.store.cfg.StorePool.decommissioningReplicas(nonVoterReplicas)
if len(decommissioningNonVoterReplicas) == 0 {
return false, nil
}
Expand All @@ -647,18 +660,30 @@ func (rq *replicateQueue) processOneChange(
"decommissioning non-voter %v unexpectedly not found in %v",
decommissioningNonVoterReplicas[0], nonVoterReplicas)
}
return rq.addOrReplaceNonVoters(
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil

// Remove decommissioning replicas.
//
// NB: these two paths will only be hit when the range is over-replicated and
// has decommissioning replicas; in the common case we'll hit
// AllocatorReplaceDecommissioning{Non}Voter above.
case AllocatorRemoveDecommissioningVoter:
return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun)
requeue, err := rq.removeDecommissioning(ctx, repl, voterTarget, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil
case AllocatorRemoveDecommissioningNonVoter:
return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun)
requeue, err := rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil

// Remove dead replicas.
//
Expand Down Expand Up @@ -802,7 +827,7 @@ func (rq *replicateQueue) addOrReplaceVoters(
_, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters)
if err != nil {
// It does not seem possible to go to the next odd replica state. Note
// that AllocateVoter returns an allocatorError (a purgatoryError)
// that AllocateVoter returns an allocatorError (a PurgatoryError)
// when purgatory is requested.
return false, errors.Wrap(err, "avoid up-replicating to fragile quorum")
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -587,6 +589,70 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) {
})
}

// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a
// decommissioning replica puts it in the replicate queue purgatory.
func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// NB: This test injects a fake failure during replica rebalancing, and we use
// this `rejectSnapshots` variable as a flag to activate or deactivate that
// injected failure.
var rejectSnapshots int64
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
return nil
},
}}},
},
)
defer tc.Stopper().Stop(ctx)

// Add a replica to the second and third nodes, and then decommission the
// second node. Since there are only 4 nodes in the cluster, the
// decommissioning replica must be rebalanced to the fourth node.
const decomNodeIdx = 1
const decomNodeID = 2
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx))
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1))
adminSrv := tc.Server(decomNodeIdx)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
_, err = adminClient.Decommission(
ctx, &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{decomNodeID},
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
},
)
require.NoError(t, err)

// Activate the above testing knob to start rejecting future rebalances and
// then attempt to rebalance the decommissioning replica away. We expect a
// purgatory error to be returned here.
atomic.StoreInt64(&rejectSnapshots, 1)
store := tc.GetFirstStoreFromServer(t, 0)
repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID)
require.NoError(t, err)
_, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).ManuallyEnqueue(
ctx, "replicate", repl, true, /* skipShouldQueue */
)
require.NoError(t, enqueueErr)
_, isPurgErr := kvserver.IsPurgatoryError(processErr)
if !isPurgErr {
t.Fatalf("expected to receive a purgatory error, got %v", processErr)
}
}

// getLeaseholderStore returns the leaseholder store for the given scratchRange.
func getLeaseholderStore(
tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ type unsplittableRangeError struct{}
func (unsplittableRangeError) Error() string { return "could not find valid split key" }
func (unsplittableRangeError) purgatoryErrorMarker() {}

var _ purgatoryError = unsplittableRangeError{}
var _ PurgatoryError = unsplittableRangeError{}

// process synchronously invokes admin split for each proposed split key.
func (sq *splitQueue) process(
Expand Down

0 comments on commit a536b07

Please sign in to comment.