diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 859ccc8e84de..6868029df8d3 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3104,7 +3104,7 @@ table. Returns an error if validation fails.

### TUPLE{INT AS RANGE_ID, STRING AS ERROR, INT AS END_TO_END_LATENCY_MS, STRING AS VERBOSE_TRACE} functions - + +
Function → ReturnsDescription
Function → ReturnsDescriptionVolatility
crdb_internal.probe_ranges(timeout: interval, probe_type: unknown_enum) → tuple{int AS range_id, string AS error, int AS end_to_end_latency_ms, string AS verbose_trace}

Returns rows of range data based on the results received when using the prober. Parameters @@ -3117,7 +3117,7 @@ Notes If a probe should fail, the latency will be set to MaxInt64 in order to naturally sort above other latencies. Read probes are cheaper than write probes. If write probes have already ran, it’s not necessary to also run a read probe. A write probe will effectively probe reads as well.

-
Volatile
### UUID functions diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 07b6695b3a9d..780a4f3171b3 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -322,7 +322,7 @@ func (ae *allocatorError) Error() string { func (*allocatorError) purgatoryErrorMarker() {} -var _ PurgatoryError = &allocatorError{} +var _ purgatoryError = &allocatorError{} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 411f0377d5a2..485aa828d406 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) { // First test to make sure we would send the replica to purgatory. _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if !errors.HasInterface(err, (*PurgatoryError)(nil)) { + if !errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) { storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if errors.HasInterface(err, (*PurgatoryError)(nil)) { + if errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } } diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..5d1a0120e6a0 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -238,7 +238,3 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration { func (*consistencyQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*consistencyQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 522b20ea3af1..bd1f8a48f1d6 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error } func (rangeMergePurgatoryError) purgatoryErrorMarker() {} -var _ PurgatoryError = rangeMergePurgatoryError{} +var _ purgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, @@ -433,7 +433,3 @@ func (mq *mergeQueue) timer(time.Duration) time.Duration { func (mq *mergeQueue) purgatoryChan() <-chan time.Time { return mq.purgChan } - -func (mq *mergeQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index f90a5ea588a1..d218a1389a6a 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -644,7 +644,3 @@ func (*mvccGCQueue) timer(_ time.Duration) time.Duration { func (*mvccGCQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*mvccGCQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index a74c6d7ee953..e9adac48e1b9 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc // the operations's timeout. const permittedRangeScanSlowdown = 10 -// PurgatoryError indicates a replica processing failure which indicates the -// replica can be placed into purgatory for faster retries than the replica -// scanner's interval. -type PurgatoryError interface { +// a purgatoryError indicates a replica processing failure which indicates +// the replica can be placed into purgatory for faster retries when the +// failure condition changes. +type purgatoryError interface { error purgatoryErrorMarker() // dummy method for unique interface } @@ -270,11 +270,6 @@ type queueImpl interface { // purgatory due to failures. If purgatoryChan returns nil, failing // replicas are not sent to purgatory. purgatoryChan() <-chan time.Time - - // updateChan returns a channel that is signalled whenever there is an update - // to the cluster state that might impact the replicas in the queue's - // purgatory. - updateChan() <-chan time.Time } // queueProcessTimeoutFunc controls the timeout for queue processing for a @@ -385,7 +380,7 @@ type queueConfig struct { // // A queueImpl can opt into a purgatory by returning a non-nil channel from the // `purgatoryChan` method. A replica is put into purgatory when the `process` -// method returns an error with a `PurgatoryError` as an entry somewhere in the +// method returns an error with a `purgatoryError` as an entry somewhere in the // `Cause` chain. A replica in purgatory is not processed again until the // channel is signaled, at which point every replica in purgatory is immediately // processed. This catchup is run without the `timer` rate limiting but shares @@ -419,7 +414,7 @@ type baseQueue struct { syncutil.Mutex // Protects all variables in the mu struct replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem priorityQ priorityQueue // The priority queue - purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors + purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors stopped bool // Some tests in this package disable queues. disabled bool @@ -992,9 +987,8 @@ func isBenign(err error) bool { return errors.HasType(err, (*benignError)(nil)) } -// IsPurgatoryError returns true iff the given error is a purgatory error. -func IsPurgatoryError(err error) (PurgatoryError, bool) { - var purgErr PurgatoryError +func isPurgatoryError(err error) (purgatoryError, bool) { + var purgErr purgatoryError return purgErr, errors.As(err, &purgErr) } @@ -1090,7 +1084,7 @@ func (bq *baseQueue) finishProcessingReplica( // the failing replica to purgatory. Note that even if the item was // scheduled to be requeued, we ignore this if we add the replica to // purgatory. - if purgErr, ok := IsPurgatoryError(err); ok { + if purgErr, ok := isPurgatoryError(err); ok { bq.mu.Lock() bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr) bq.mu.Unlock() @@ -1112,7 +1106,7 @@ func (bq *baseQueue) finishProcessingReplica( // addToPurgatoryLocked adds the specified replica to the purgatory queue, which // holds replicas which have failed processing. func (bq *baseQueue) addToPurgatoryLocked( - ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError, + ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError, ) { bq.mu.AssertHeld() @@ -1150,7 +1144,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } // Otherwise, create purgatory and start processing. - bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{ + bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{ repl.GetRangeID(): purgErr, } @@ -1159,14 +1153,51 @@ func (bq *baseQueue) addToPurgatoryLocked( ticker := time.NewTicker(purgatoryReportInterval) for { select { - case <-bq.impl.updateChan(): - if bq.processReplicasInPurgatory(ctx, stopper) { - return - } case <-bq.impl.purgatoryChan(): - if bq.processReplicasInPurgatory(ctx, stopper) { + func() { + // Acquire from the process semaphore, release when done. + bq.processSem <- struct{}{} + defer func() { <-bq.processSem }() + + // Remove all items from purgatory into a copied slice. + bq.mu.Lock() + ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) + for rangeID := range bq.mu.purgatory { + item := bq.mu.replicas[rangeID] + if item == nil { + log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) + } + item.setProcessing() + ranges = append(ranges, item) + bq.removeFromPurgatoryLocked(item) + } + bq.mu.Unlock() + + for _, item := range ranges { + repl, err := bq.getReplica(item.rangeID) + if err != nil || item.replicaID != repl.ReplicaID() { + continue + } + annotatedCtx := repl.AnnotateCtx(ctx) + if stopper.RunTask( + annotatedCtx, bq.processOpName(), func(ctx context.Context) { + err := bq.processReplica(ctx, repl) + bq.finishProcessingReplica(ctx, stopper, repl, err) + }) != nil { + return + } + } + }() + + // Clean up purgatory, if empty. + bq.mu.Lock() + if len(bq.mu.purgatory) == 0 { + log.Infof(ctx, "purgatory is now empty") + bq.mu.purgatory = nil + bq.mu.Unlock() return } + bq.mu.Unlock() case <-ticker.C: // Report purgatory status. bq.mu.Lock() @@ -1182,61 +1213,7 @@ func (bq *baseQueue) addToPurgatoryLocked( return } } - }, - ) -} - -// processReplicasInPurgatory processes replicas currently in the queue's -// purgatory. -func (bq *baseQueue) processReplicasInPurgatory( - ctx context.Context, stopper *stop.Stopper, -) (purgatoryCleared bool) { - func() { - // Acquire from the process semaphore, release when done. - bq.processSem <- struct{}{} - defer func() { <-bq.processSem }() - - // Remove all items from purgatory into a copied slice. - bq.mu.Lock() - ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) - for rangeID := range bq.mu.purgatory { - item := bq.mu.replicas[rangeID] - if item == nil { - log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) - } - item.setProcessing() - ranges = append(ranges, item) - bq.removeFromPurgatoryLocked(item) - } - bq.mu.Unlock() - - for _, item := range ranges { - repl, err := bq.getReplica(item.rangeID) - if err != nil || item.replicaID != repl.ReplicaID() { - continue - } - annotatedCtx := repl.AnnotateCtx(ctx) - if stopper.RunTask( - annotatedCtx, bq.processOpName(), func(ctx context.Context) { - err := bq.processReplica(ctx, repl) - bq.finishProcessingReplica(ctx, stopper, repl, err) - }, - ) != nil { - return - } - } - }() - - // Clean up purgatory, if empty. - bq.mu.Lock() - if len(bq.mu.purgatory) == 0 { - log.Infof(ctx, "purgatory is now empty") - bq.mu.purgatory = nil - bq.mu.Unlock() - return true /* purgatoryCleared */ - } - bq.mu.Unlock() - return false /* purgatoryCleared */ + }) } // pop dequeues the highest priority replica, if any, in the queue. The diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index e39345f0f7a0..10b6a350f8a0 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -151,10 +151,6 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time { return time.After(time.Nanosecond) } -func (fakeQueueImpl) updateChan() <-chan time.Time { - return nil -} - type fakeReplica struct { rangeID roachpb.RangeID replicaID roachpb.ReplicaID diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index e273271774ff..c44bfa15bd4f 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -84,10 +84,6 @@ func (tq *testQueueImpl) purgatoryChan() <-chan time.Time { return tq.pChan } -func (tq *testQueueImpl) updateChan() <-chan time.Time { - return nil -} - func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue { if !cfg.acceptsUnsplitRanges { // Needed in order to pass the validation in newBaseQueue. diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 4937839acfdb..c76f9679e063 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -754,10 +754,6 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time { return nil } -func (*raftLogQueue) updateChan() <-chan time.Time { - return nil -} - func isLooselyCoupledRaftLogTruncationEnabled( ctx context.Context, settings *cluster.Settings, ) bool { diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index abfc62ad6c3f..c23cbe742495 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -175,7 +175,3 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration { func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time { return nil } - -func (rq *raftSnapshotQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index a62b538050f3..b64b6876f685 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -373,7 +373,3 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration { func (*replicaGCQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*replicaGCQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index bd696e217212..ca4d745db0a1 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -35,12 +35,6 @@ import ( ) const ( - // replicateQueuePurgatoryCheckInterval is the interval at which replicas in - // the replicate queue purgatory are re-attempted. Note that these replicas - // may be re-attempted more frequently by the replicateQueue in case there are - // gossip updates that might affect allocation decisions. - replicateQueuePurgatoryCheckInterval = 1 * time.Minute - // replicateQueueTimerDuration is the duration between replication of queued // replicas. replicateQueueTimerDuration = 0 // zero duration to process replication greedily @@ -345,23 +339,18 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(targetType targ // additional replica to their range. type replicateQueue struct { *baseQueue - metrics ReplicateQueueMetrics - allocator Allocator - // purgCh is signalled every replicateQueuePurgatoryCheckInterval. - purgCh <-chan time.Time - // updateCh is signalled every time there is an update to the cluster's store - // descriptors. - updateCh chan time.Time + metrics ReplicateQueueMetrics + allocator Allocator + updateChan chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines } // newReplicateQueue returns a new instance of replicateQueue. func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { rq := &replicateQueue{ - metrics: makeReplicateQueueMetrics(), - allocator: allocator, - purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, - updateCh: make(chan time.Time, 1), + metrics: makeReplicateQueueMetrics(), + allocator: allocator, + updateChan: make(chan time.Time, 1), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -383,9 +372,10 @@ func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { purgatory: store.metrics.ReplicateQueuePurgatory, }, ) + updateFn := func() { select { - case rq.updateCh <- timeutil.Now(): + case rq.updateChan <- timeutil.Now(): default: } } @@ -527,15 +517,6 @@ func (rq *replicateQueue) process( return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries) } -// decommissionPurgatoryError wraps an error that occurs when attempting to -// rebalance a range that has a replica on a decommissioning node to indicate -// that the error should send the range to purgatory. -type decommissionPurgatoryError struct{ error } - -func (decommissionPurgatoryError) purgatoryErrorMarker() {} - -var _ PurgatoryError = decommissionPurgatoryError{} - func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, @@ -643,14 +624,10 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningVoterReplicas[0], voterReplicas) } - requeue, err := rq.addOrReplaceVoters( + return rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil case AllocatorReplaceDecommissioningNonVoter: - decommissioningNonVoterReplicas := rq.store.cfg.StorePool.decommissioningReplicas(nonVoterReplicas) + decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) if len(decommissioningNonVoterReplicas) == 0 { return false, nil } @@ -660,12 +637,8 @@ func (rq *replicateQueue) processOneChange( "decommissioning non-voter %v unexpectedly not found in %v", decommissioningNonVoterReplicas[0], nonVoterReplicas) } - requeue, err := rq.addOrReplaceNonVoters( + return rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil // Remove decommissioning replicas. // @@ -673,17 +646,9 @@ func (rq *replicateQueue) processOneChange( // has decommissioning replicas; in the common case we'll hit // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - requeue, err := rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil + return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) case AllocatorRemoveDecommissioningNonVoter: - requeue, err := rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil + return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) // Remove dead replicas. // @@ -827,7 +792,7 @@ func (rq *replicateQueue) addOrReplaceVoters( _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateVoter returns an allocatorError (a PurgatoryError) + // that AllocateVoter returns an allocatorError (a purgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } @@ -1662,13 +1627,9 @@ func (*replicateQueue) timer(_ time.Duration) time.Duration { return replicateQueueTimerDuration } +// purgatoryChan returns the replicate queue's store update channel. func (rq *replicateQueue) purgatoryChan() <-chan time.Time { - return rq.purgCh -} - -// updateChan returns the replicate queue's store update channel. -func (rq *replicateQueue) updateChan() <-chan time.Time { - return rq.updateCh + return rq.updateChan } // rangeRaftStatus pretty-prints the Raft progress (i.e. Raft log position) of diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index c184e70bd48a..83f49c942fbe 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,9 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -589,69 +587,6 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } -// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a -// decommissioning replica puts it in the replicate queue purgatory. -func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // NB: This test injects a fake failure during replica rebalancing, and we use - // this `rejectSnapshots` variable as a flag to activate or deactivate that - // injected failure. - var rejectSnapshots int64 - ctx := context.Background() - tc := testcluster.StartTestCluster( - t, 4, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ - ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) == 1 { - return errors.Newf("boom") - } - return nil - }, - }}}, - }, - ) - defer tc.Stopper().Stop(ctx) - - // Add a replica to the second and third nodes, and then decommission the - // second node. Since there are only 4 nodes in the cluster, the - // decommissioning replica must be rebalanced to the fourth node. - const decomNodeIdx = 1 - const decomNodeID = 2 - scratchKey := tc.ScratchRange(t) - tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) - tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) - adminSrv := tc.Server(decomNodeIdx) - conn, err := adminSrv.RPCContext().GRPCDialNode( - adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) - require.NoError(t, err) - adminClient := serverpb.NewAdminClient(conn) - _, err = adminClient.Decommission( - ctx, &serverpb.DecommissionRequest{ - NodeIDs: []roachpb.NodeID{decomNodeID}, - TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, - }, - ) - require.NoError(t, err) - - // Activate the above testing knob to start rejecting future rebalances and - // then attempt to rebalance the decommissioning replica away. We expect a - // purgatory error to be returned here. - atomic.StoreInt64(&rejectSnapshots, 1) - store := tc.GetFirstStoreFromServer(t, 0) - repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) - require.NoError(t, err) - _, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue( - ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */) - require.NoError(t, enqueueErr) - _, isPurgErr := kvserver.IsPurgatoryError(processErr) - if !isPurgErr { - t.Fatalf("expected to receive a purgatory error, got %v", processErr) - } -} - // getLeaseholderStore returns the leaseholder store for the given scratchRange. func getLeaseholderStore( tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 493f332d1d1e..a611a335ac73 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -155,7 +155,7 @@ type unsplittableRangeError struct{} func (unsplittableRangeError) Error() string { return "could not find valid split key" } func (unsplittableRangeError) purgatoryErrorMarker() {} -var _ PurgatoryError = unsplittableRangeError{} +var _ purgatoryError = unsplittableRangeError{} // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process( @@ -274,7 +274,3 @@ func (*splitQueue) timer(_ time.Duration) time.Duration { func (sq *splitQueue) purgatoryChan() <-chan time.Time { return sq.purgChan } - -func (sq *splitQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index 4f9e07b75e62..f0476e1895f4 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -178,7 +178,3 @@ func (q *timeSeriesMaintenanceQueue) timer(duration time.Duration) time.Duration func (*timeSeriesMaintenanceQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*timeSeriesMaintenanceQueue) updateChan() <-chan time.Time { - return nil -}