diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 3c39db58180e..26ce665202f9 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -321,7 +321,7 @@ func (ae *allocatorError) Error() string { func (*allocatorError) purgatoryErrorMarker() {} -var _ purgatoryError = &allocatorError{} +var _ PurgatoryError = &allocatorError{} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 485aa828d406..411f0377d5a2 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) { // First test to make sure we would send the replica to purgatory. _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if !errors.HasInterface(err, (*purgatoryError)(nil)) { + if !errors.HasInterface(err, (*PurgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) { storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if errors.HasInterface(err, (*purgatoryError)(nil)) { + if errors.HasInterface(err, (*PurgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } } diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index cd4e0293fe48..522b20ea3af1 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error } func (rangeMergePurgatoryError) purgatoryErrorMarker() {} -var _ purgatoryError = rangeMergePurgatoryError{} +var _ PurgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index 98cf5dc8cb5e..a74c6d7ee953 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc // the operations's timeout. const permittedRangeScanSlowdown = 10 -// a purgatoryError indicates a replica processing failure which indicates -// the replica can be placed into purgatory for faster retries when the -// failure condition changes. -type purgatoryError interface { +// PurgatoryError indicates a replica processing failure which indicates the +// replica can be placed into purgatory for faster retries than the replica +// scanner's interval. +type PurgatoryError interface { error purgatoryErrorMarker() // dummy method for unique interface } @@ -385,7 +385,7 @@ type queueConfig struct { // // A queueImpl can opt into a purgatory by returning a non-nil channel from the // `purgatoryChan` method. A replica is put into purgatory when the `process` -// method returns an error with a `purgatoryError` as an entry somewhere in the +// method returns an error with a `PurgatoryError` as an entry somewhere in the // `Cause` chain. A replica in purgatory is not processed again until the // channel is signaled, at which point every replica in purgatory is immediately // processed. This catchup is run without the `timer` rate limiting but shares @@ -419,7 +419,7 @@ type baseQueue struct { syncutil.Mutex // Protects all variables in the mu struct replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem priorityQ priorityQueue // The priority queue - purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors + purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors stopped bool // Some tests in this package disable queues. disabled bool @@ -992,8 +992,9 @@ func isBenign(err error) bool { return errors.HasType(err, (*benignError)(nil)) } -func isPurgatoryError(err error) (purgatoryError, bool) { - var purgErr purgatoryError +// IsPurgatoryError returns true iff the given error is a purgatory error. +func IsPurgatoryError(err error) (PurgatoryError, bool) { + var purgErr PurgatoryError return purgErr, errors.As(err, &purgErr) } @@ -1089,7 +1090,7 @@ func (bq *baseQueue) finishProcessingReplica( // the failing replica to purgatory. Note that even if the item was // scheduled to be requeued, we ignore this if we add the replica to // purgatory. - if purgErr, ok := isPurgatoryError(err); ok { + if purgErr, ok := IsPurgatoryError(err); ok { bq.mu.Lock() bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr) bq.mu.Unlock() @@ -1111,7 +1112,7 @@ func (bq *baseQueue) finishProcessingReplica( // addToPurgatoryLocked adds the specified replica to the purgatory queue, which // holds replicas which have failed processing. func (bq *baseQueue) addToPurgatoryLocked( - ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError, + ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError, ) { bq.mu.AssertHeld() @@ -1149,7 +1150,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } // Otherwise, create purgatory and start processing. - bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{ + bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{ repl.GetRangeID(): purgErr, } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index 912252d9e22d..bd696e217212 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -527,6 +527,15 @@ func (rq *replicateQueue) process( return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries) } +// decommissionPurgatoryError wraps an error that occurs when attempting to +// rebalance a range that has a replica on a decommissioning node to indicate +// that the error should send the range to purgatory. +type decommissionPurgatoryError struct{ error } + +func (decommissionPurgatoryError) purgatoryErrorMarker() {} + +var _ PurgatoryError = decommissionPurgatoryError{} + func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, @@ -634,10 +643,14 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters( + requeue, err := rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil case AllocatorReplaceDecommissioningNonVoter: - decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) + decommissioningNonVoterReplicas := rq.store.cfg.StorePool.decommissioningReplicas(nonVoterReplicas) if len(decommissioningNonVoterReplicas) == 0 { return false, nil } @@ -647,8 +660,12 @@ func (rq *replicateQueue) processOneChange( "decommissioning non-voter %v unexpectedly not found in %v", decommissioningNonVoterReplicas[0], nonVoterReplicas) } - return rq.addOrReplaceNonVoters( + requeue, err := rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil // Remove decommissioning replicas. // @@ -656,9 +673,17 @@ func (rq *replicateQueue) processOneChange( // has decommissioning replicas; in the common case we'll hit // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + requeue, err := rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil case AllocatorRemoveDecommissioningNonVoter: - return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + requeue, err := rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) + if err != nil { + return requeue, decommissionPurgatoryError{err} + } + return requeue, nil // Remove dead replicas. // @@ -802,7 +827,7 @@ func (rq *replicateQueue) addOrReplaceVoters( _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateVoter returns an allocatorError (a purgatoryError) + // that AllocateVoter returns an allocatorError (a PurgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 7b9f568e5120..8cb38a68d631 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,7 +32,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -587,6 +589,70 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } +// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a +// decommissioning replica puts it in the replicate queue purgatory. +func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // NB: This test injects a fake failure during replica rebalancing, and we use + // this `rejectSnapshots` variable as a flag to activate or deactivate that + // injected failure. + var rejectSnapshots int64 + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) == 1 { + return errors.Newf("boom") + } + return nil + }, + }}}, + }, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica to the second and third nodes, and then decommission the + // second node. Since there are only 4 nodes in the cluster, the + // decommissioning replica must be rebalanced to the fourth node. + const decomNodeIdx = 1 + const decomNodeID = 2 + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) + adminSrv := tc.Server(decomNodeIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + _, err = adminClient.Decommission( + ctx, &serverpb.DecommissionRequest{ + NodeIDs: []roachpb.NodeID{decomNodeID}, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Activate the above testing knob to start rejecting future rebalances and + // then attempt to rebalance the decommissioning replica away. We expect a + // purgatory error to be returned here. + atomic.StoreInt64(&rejectSnapshots, 1) + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) + require.NoError(t, err) + _, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).ManuallyEnqueue( + ctx, "replicate", repl, true, /* skipShouldQueue */ + ) + require.NoError(t, enqueueErr) + _, isPurgErr := kvserver.IsPurgatoryError(processErr) + if !isPurgErr { + t.Fatalf("expected to receive a purgatory error, got %v", processErr) + } +} + // getLeaseholderStore returns the leaseholder store for the given scratchRange. func getLeaseholderStore( tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 0c5fec406746..493f332d1d1e 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -155,7 +155,7 @@ type unsplittableRangeError struct{} func (unsplittableRangeError) Error() string { return "could not find valid split key" } func (unsplittableRangeError) purgatoryErrorMarker() {} -var _ purgatoryError = unsplittableRangeError{} +var _ PurgatoryError = unsplittableRangeError{} // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process(