diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 07b6695b3a9d..780a4f3171b3 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -322,7 +322,7 @@ func (ae *allocatorError) Error() string { func (*allocatorError) purgatoryErrorMarker() {} -var _ PurgatoryError = &allocatorError{} +var _ purgatoryError = &allocatorError{} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the diff --git a/pkg/kv/kvserver/allocator_test.go b/pkg/kv/kvserver/allocator_test.go index 411f0377d5a2..485aa828d406 100644 --- a/pkg/kv/kvserver/allocator_test.go +++ b/pkg/kv/kvserver/allocator_test.go @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) { // First test to make sure we would send the replica to purgatory. _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if !errors.HasInterface(err, (*PurgatoryError)(nil)) { + if !errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a purgatory error, got: %+v", err) } @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) { storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour) a.storePool.detailsMu.Unlock() _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) - if errors.HasInterface(err, (*PurgatoryError)(nil)) { + if errors.HasInterface(err, (*purgatoryError)(nil)) { t.Fatalf("expected a non purgatory error, got: %+v", err) } } diff --git a/pkg/kv/kvserver/consistency_queue.go b/pkg/kv/kvserver/consistency_queue.go index 771966df1a11..5d1a0120e6a0 100644 --- a/pkg/kv/kvserver/consistency_queue.go +++ b/pkg/kv/kvserver/consistency_queue.go @@ -238,7 +238,3 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration { func (*consistencyQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*consistencyQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/merge_queue.go b/pkg/kv/kvserver/merge_queue.go index 522b20ea3af1..bd1f8a48f1d6 100644 --- a/pkg/kv/kvserver/merge_queue.go +++ b/pkg/kv/kvserver/merge_queue.go @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error } func (rangeMergePurgatoryError) purgatoryErrorMarker() {} -var _ PurgatoryError = rangeMergePurgatoryError{} +var _ purgatoryError = rangeMergePurgatoryError{} func (mq *mergeQueue) requestRangeStats( ctx context.Context, key roachpb.Key, @@ -433,7 +433,3 @@ func (mq *mergeQueue) timer(time.Duration) time.Duration { func (mq *mergeQueue) purgatoryChan() <-chan time.Time { return mq.purgChan } - -func (mq *mergeQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index f90a5ea588a1..d218a1389a6a 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -644,7 +644,3 @@ func (*mvccGCQueue) timer(_ time.Duration) time.Duration { func (*mvccGCQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*mvccGCQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index a74c6d7ee953..e9adac48e1b9 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc // the operations's timeout. const permittedRangeScanSlowdown = 10 -// PurgatoryError indicates a replica processing failure which indicates the -// replica can be placed into purgatory for faster retries than the replica -// scanner's interval. -type PurgatoryError interface { +// a purgatoryError indicates a replica processing failure which indicates +// the replica can be placed into purgatory for faster retries when the +// failure condition changes. +type purgatoryError interface { error purgatoryErrorMarker() // dummy method for unique interface } @@ -270,11 +270,6 @@ type queueImpl interface { // purgatory due to failures. If purgatoryChan returns nil, failing // replicas are not sent to purgatory. purgatoryChan() <-chan time.Time - - // updateChan returns a channel that is signalled whenever there is an update - // to the cluster state that might impact the replicas in the queue's - // purgatory. - updateChan() <-chan time.Time } // queueProcessTimeoutFunc controls the timeout for queue processing for a @@ -385,7 +380,7 @@ type queueConfig struct { // // A queueImpl can opt into a purgatory by returning a non-nil channel from the // `purgatoryChan` method. A replica is put into purgatory when the `process` -// method returns an error with a `PurgatoryError` as an entry somewhere in the +// method returns an error with a `purgatoryError` as an entry somewhere in the // `Cause` chain. A replica in purgatory is not processed again until the // channel is signaled, at which point every replica in purgatory is immediately // processed. This catchup is run without the `timer` rate limiting but shares @@ -419,7 +414,7 @@ type baseQueue struct { syncutil.Mutex // Protects all variables in the mu struct replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem priorityQ priorityQueue // The priority queue - purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors + purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors stopped bool // Some tests in this package disable queues. disabled bool @@ -992,9 +987,8 @@ func isBenign(err error) bool { return errors.HasType(err, (*benignError)(nil)) } -// IsPurgatoryError returns true iff the given error is a purgatory error. -func IsPurgatoryError(err error) (PurgatoryError, bool) { - var purgErr PurgatoryError +func isPurgatoryError(err error) (purgatoryError, bool) { + var purgErr purgatoryError return purgErr, errors.As(err, &purgErr) } @@ -1090,7 +1084,7 @@ func (bq *baseQueue) finishProcessingReplica( // the failing replica to purgatory. Note that even if the item was // scheduled to be requeued, we ignore this if we add the replica to // purgatory. - if purgErr, ok := IsPurgatoryError(err); ok { + if purgErr, ok := isPurgatoryError(err); ok { bq.mu.Lock() bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr) bq.mu.Unlock() @@ -1112,7 +1106,7 @@ func (bq *baseQueue) finishProcessingReplica( // addToPurgatoryLocked adds the specified replica to the purgatory queue, which // holds replicas which have failed processing. func (bq *baseQueue) addToPurgatoryLocked( - ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError, + ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError, ) { bq.mu.AssertHeld() @@ -1150,7 +1144,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } // Otherwise, create purgatory and start processing. - bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{ + bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{ repl.GetRangeID(): purgErr, } @@ -1159,14 +1153,51 @@ func (bq *baseQueue) addToPurgatoryLocked( ticker := time.NewTicker(purgatoryReportInterval) for { select { - case <-bq.impl.updateChan(): - if bq.processReplicasInPurgatory(ctx, stopper) { - return - } case <-bq.impl.purgatoryChan(): - if bq.processReplicasInPurgatory(ctx, stopper) { + func() { + // Acquire from the process semaphore, release when done. + bq.processSem <- struct{}{} + defer func() { <-bq.processSem }() + + // Remove all items from purgatory into a copied slice. + bq.mu.Lock() + ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) + for rangeID := range bq.mu.purgatory { + item := bq.mu.replicas[rangeID] + if item == nil { + log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) + } + item.setProcessing() + ranges = append(ranges, item) + bq.removeFromPurgatoryLocked(item) + } + bq.mu.Unlock() + + for _, item := range ranges { + repl, err := bq.getReplica(item.rangeID) + if err != nil || item.replicaID != repl.ReplicaID() { + continue + } + annotatedCtx := repl.AnnotateCtx(ctx) + if stopper.RunTask( + annotatedCtx, bq.processOpName(), func(ctx context.Context) { + err := bq.processReplica(ctx, repl) + bq.finishProcessingReplica(ctx, stopper, repl, err) + }) != nil { + return + } + } + }() + + // Clean up purgatory, if empty. + bq.mu.Lock() + if len(bq.mu.purgatory) == 0 { + log.Infof(ctx, "purgatory is now empty") + bq.mu.purgatory = nil + bq.mu.Unlock() return } + bq.mu.Unlock() case <-ticker.C: // Report purgatory status. bq.mu.Lock() @@ -1182,61 +1213,7 @@ func (bq *baseQueue) addToPurgatoryLocked( return } } - }, - ) -} - -// processReplicasInPurgatory processes replicas currently in the queue's -// purgatory. -func (bq *baseQueue) processReplicasInPurgatory( - ctx context.Context, stopper *stop.Stopper, -) (purgatoryCleared bool) { - func() { - // Acquire from the process semaphore, release when done. - bq.processSem <- struct{}{} - defer func() { <-bq.processSem }() - - // Remove all items from purgatory into a copied slice. - bq.mu.Lock() - ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) - for rangeID := range bq.mu.purgatory { - item := bq.mu.replicas[rangeID] - if item == nil { - log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) - } - item.setProcessing() - ranges = append(ranges, item) - bq.removeFromPurgatoryLocked(item) - } - bq.mu.Unlock() - - for _, item := range ranges { - repl, err := bq.getReplica(item.rangeID) - if err != nil || item.replicaID != repl.ReplicaID() { - continue - } - annotatedCtx := repl.AnnotateCtx(ctx) - if stopper.RunTask( - annotatedCtx, bq.processOpName(), func(ctx context.Context) { - err := bq.processReplica(ctx, repl) - bq.finishProcessingReplica(ctx, stopper, repl, err) - }, - ) != nil { - return - } - } - }() - - // Clean up purgatory, if empty. - bq.mu.Lock() - if len(bq.mu.purgatory) == 0 { - log.Infof(ctx, "purgatory is now empty") - bq.mu.purgatory = nil - bq.mu.Unlock() - return true /* purgatoryCleared */ - } - bq.mu.Unlock() - return false /* purgatoryCleared */ + }) } // pop dequeues the highest priority replica, if any, in the queue. The diff --git a/pkg/kv/kvserver/queue_concurrency_test.go b/pkg/kv/kvserver/queue_concurrency_test.go index e39345f0f7a0..10b6a350f8a0 100644 --- a/pkg/kv/kvserver/queue_concurrency_test.go +++ b/pkg/kv/kvserver/queue_concurrency_test.go @@ -151,10 +151,6 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time { return time.After(time.Nanosecond) } -func (fakeQueueImpl) updateChan() <-chan time.Time { - return nil -} - type fakeReplica struct { rangeID roachpb.RangeID replicaID roachpb.ReplicaID diff --git a/pkg/kv/kvserver/queue_test.go b/pkg/kv/kvserver/queue_test.go index e273271774ff..c44bfa15bd4f 100644 --- a/pkg/kv/kvserver/queue_test.go +++ b/pkg/kv/kvserver/queue_test.go @@ -84,10 +84,6 @@ func (tq *testQueueImpl) purgatoryChan() <-chan time.Time { return tq.pChan } -func (tq *testQueueImpl) updateChan() <-chan time.Time { - return nil -} - func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue { if !cfg.acceptsUnsplitRanges { // Needed in order to pass the validation in newBaseQueue. diff --git a/pkg/kv/kvserver/raft_log_queue.go b/pkg/kv/kvserver/raft_log_queue.go index 4937839acfdb..c76f9679e063 100644 --- a/pkg/kv/kvserver/raft_log_queue.go +++ b/pkg/kv/kvserver/raft_log_queue.go @@ -754,10 +754,6 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time { return nil } -func (*raftLogQueue) updateChan() <-chan time.Time { - return nil -} - func isLooselyCoupledRaftLogTruncationEnabled( ctx context.Context, settings *cluster.Settings, ) bool { diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index abfc62ad6c3f..c23cbe742495 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -175,7 +175,3 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration { func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time { return nil } - -func (rq *raftSnapshotQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/replica_gc_queue.go b/pkg/kv/kvserver/replica_gc_queue.go index a62b538050f3..b64b6876f685 100644 --- a/pkg/kv/kvserver/replica_gc_queue.go +++ b/pkg/kv/kvserver/replica_gc_queue.go @@ -373,7 +373,3 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration { func (*replicaGCQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*replicaGCQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index bd696e217212..ca4d745db0a1 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -35,12 +35,6 @@ import ( ) const ( - // replicateQueuePurgatoryCheckInterval is the interval at which replicas in - // the replicate queue purgatory are re-attempted. Note that these replicas - // may be re-attempted more frequently by the replicateQueue in case there are - // gossip updates that might affect allocation decisions. - replicateQueuePurgatoryCheckInterval = 1 * time.Minute - // replicateQueueTimerDuration is the duration between replication of queued // replicas. replicateQueueTimerDuration = 0 // zero duration to process replication greedily @@ -345,23 +339,18 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(targetType targ // additional replica to their range. type replicateQueue struct { *baseQueue - metrics ReplicateQueueMetrics - allocator Allocator - // purgCh is signalled every replicateQueuePurgatoryCheckInterval. - purgCh <-chan time.Time - // updateCh is signalled every time there is an update to the cluster's store - // descriptors. - updateCh chan time.Time + metrics ReplicateQueueMetrics + allocator Allocator + updateChan chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines } // newReplicateQueue returns a new instance of replicateQueue. func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { rq := &replicateQueue{ - metrics: makeReplicateQueueMetrics(), - allocator: allocator, - purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, - updateCh: make(chan time.Time, 1), + metrics: makeReplicateQueueMetrics(), + allocator: allocator, + updateChan: make(chan time.Time, 1), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -383,9 +372,10 @@ func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue { purgatory: store.metrics.ReplicateQueuePurgatory, }, ) + updateFn := func() { select { - case rq.updateCh <- timeutil.Now(): + case rq.updateChan <- timeutil.Now(): default: } } @@ -527,15 +517,6 @@ func (rq *replicateQueue) process( return false, errors.Errorf("failed to replicate after %d retries", retryOpts.MaxRetries) } -// decommissionPurgatoryError wraps an error that occurs when attempting to -// rebalance a range that has a replica on a decommissioning node to indicate -// that the error should send the range to purgatory. -type decommissionPurgatoryError struct{ error } - -func (decommissionPurgatoryError) purgatoryErrorMarker() {} - -var _ PurgatoryError = decommissionPurgatoryError{} - func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, @@ -643,14 +624,10 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningVoterReplicas[0], voterReplicas) } - requeue, err := rq.addOrReplaceVoters( + return rq.addOrReplaceVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil case AllocatorReplaceDecommissioningNonVoter: - decommissioningNonVoterReplicas := rq.store.cfg.StorePool.decommissioningReplicas(nonVoterReplicas) + decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) if len(decommissioningNonVoterReplicas) == 0 { return false, nil } @@ -660,12 +637,8 @@ func (rq *replicateQueue) processOneChange( "decommissioning non-voter %v unexpectedly not found in %v", decommissioningNonVoterReplicas[0], nonVoterReplicas) } - requeue, err := rq.addOrReplaceNonVoters( + return rq.addOrReplaceNonVoters( ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil // Remove decommissioning replicas. // @@ -673,17 +646,9 @@ func (rq *replicateQueue) processOneChange( // has decommissioning replicas; in the common case we'll hit // AllocatorReplaceDecommissioning{Non}Voter above. case AllocatorRemoveDecommissioningVoter: - requeue, err := rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil + return rq.removeDecommissioning(ctx, repl, voterTarget, dryRun) case AllocatorRemoveDecommissioningNonVoter: - requeue, err := rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) - if err != nil { - return requeue, decommissionPurgatoryError{err} - } - return requeue, nil + return rq.removeDecommissioning(ctx, repl, nonVoterTarget, dryRun) // Remove dead replicas. // @@ -827,7 +792,7 @@ func (rq *replicateQueue) addOrReplaceVoters( _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) if err != nil { // It does not seem possible to go to the next odd replica state. Note - // that AllocateVoter returns an allocatorError (a PurgatoryError) + // that AllocateVoter returns an allocatorError (a purgatoryError) // when purgatory is requested. return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } @@ -1662,13 +1627,9 @@ func (*replicateQueue) timer(_ time.Duration) time.Duration { return replicateQueueTimerDuration } +// purgatoryChan returns the replicate queue's store update channel. func (rq *replicateQueue) purgatoryChan() <-chan time.Time { - return rq.purgCh -} - -// updateChan returns the replicate queue's store update channel. -func (rq *replicateQueue) updateChan() <-chan time.Time { - return rq.updateCh + return rq.updateChan } // rangeRaftStatus pretty-prints the Raft progress (i.e. Raft log position) of diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index c184e70bd48a..83f49c942fbe 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -32,9 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -589,69 +587,6 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } -// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a -// decommissioning replica puts it in the replicate queue purgatory. -func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // NB: This test injects a fake failure during replica rebalancing, and we use - // this `rejectSnapshots` variable as a flag to activate or deactivate that - // injected failure. - var rejectSnapshots int64 - ctx := context.Background() - tc := testcluster.StartTestCluster( - t, 4, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ - ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { - if atomic.LoadInt64(&rejectSnapshots) == 1 { - return errors.Newf("boom") - } - return nil - }, - }}}, - }, - ) - defer tc.Stopper().Stop(ctx) - - // Add a replica to the second and third nodes, and then decommission the - // second node. Since there are only 4 nodes in the cluster, the - // decommissioning replica must be rebalanced to the fourth node. - const decomNodeIdx = 1 - const decomNodeID = 2 - scratchKey := tc.ScratchRange(t) - tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) - tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) - adminSrv := tc.Server(decomNodeIdx) - conn, err := adminSrv.RPCContext().GRPCDialNode( - adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) - require.NoError(t, err) - adminClient := serverpb.NewAdminClient(conn) - _, err = adminClient.Decommission( - ctx, &serverpb.DecommissionRequest{ - NodeIDs: []roachpb.NodeID{decomNodeID}, - TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, - }, - ) - require.NoError(t, err) - - // Activate the above testing knob to start rejecting future rebalances and - // then attempt to rebalance the decommissioning replica away. We expect a - // purgatory error to be returned here. - atomic.StoreInt64(&rejectSnapshots, 1) - store := tc.GetFirstStoreFromServer(t, 0) - repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) - require.NoError(t, err) - _, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue( - ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */) - require.NoError(t, enqueueErr) - _, isPurgErr := kvserver.IsPurgatoryError(processErr) - if !isPurgErr { - t.Fatalf("expected to receive a purgatory error, got %v", processErr) - } -} - // getLeaseholderStore returns the leaseholder store for the given scratchRange. func getLeaseholderStore( tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor, diff --git a/pkg/kv/kvserver/split_queue.go b/pkg/kv/kvserver/split_queue.go index 493f332d1d1e..a611a335ac73 100644 --- a/pkg/kv/kvserver/split_queue.go +++ b/pkg/kv/kvserver/split_queue.go @@ -155,7 +155,7 @@ type unsplittableRangeError struct{} func (unsplittableRangeError) Error() string { return "could not find valid split key" } func (unsplittableRangeError) purgatoryErrorMarker() {} -var _ PurgatoryError = unsplittableRangeError{} +var _ purgatoryError = unsplittableRangeError{} // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process( @@ -274,7 +274,3 @@ func (*splitQueue) timer(_ time.Duration) time.Duration { func (sq *splitQueue) purgatoryChan() <-chan time.Time { return sq.purgChan } - -func (sq *splitQueue) updateChan() <-chan time.Time { - return nil -} diff --git a/pkg/kv/kvserver/ts_maintenance_queue.go b/pkg/kv/kvserver/ts_maintenance_queue.go index 4f9e07b75e62..f0476e1895f4 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue.go +++ b/pkg/kv/kvserver/ts_maintenance_queue.go @@ -178,7 +178,3 @@ func (q *timeSeriesMaintenanceQueue) timer(duration time.Duration) time.Duration func (*timeSeriesMaintenanceQueue) purgatoryChan() <-chan time.Time { return nil } - -func (*timeSeriesMaintenanceQueue) updateChan() <-chan time.Time { - return nil -}