Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "kvserver: retry failures to rebalance decommissioning replicas" #84439

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3104,7 +3104,7 @@ table. Returns an error if validation fails.</p>
### TUPLE{INT AS RANGE_ID, STRING AS ERROR, INT AS END_TO_END_LATENCY_MS, STRING AS VERBOSE_TRACE} functions

<table>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th></tr></thead>
<thead><tr><th>Function &rarr; Returns</th><th>Description</th><th>Volatility</th></tr></thead>
<tbody>
<tr><td><a name="crdb_internal.probe_ranges"></a><code>crdb_internal.probe_ranges(timeout: <a href="interval.html">interval</a>, probe_type: unknown_enum) &rarr; tuple{int AS range_id, string AS error, int AS end_to_end_latency_ms, string AS verbose_trace}</code></td><td><span class="funcdesc"><p>Returns rows of range data based on the results received when using the prober.
Parameters
Expand All @@ -3117,7 +3117,7 @@ Notes
If a probe should fail, the latency will be set to MaxInt64 in order to naturally sort above other latencies.
Read probes are cheaper than write probes. If write probes have already ran, it’s not necessary to also run a read probe.
A write probe will effectively probe reads as well.</p>
</span></td></tr></tbody>
</span></td><td>Volatile</td></tr></tbody>
</table>

### UUID functions
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (ae *allocatorError) Error() string {

func (*allocatorError) purgatoryErrorMarker() {}

var _ PurgatoryError = &allocatorError{}
var _ purgatoryError = &allocatorError{}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) {

// First test to make sure we would send the replica to purgatory.
_, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if !errors.HasInterface(err, (*PurgatoryError)(nil)) {
if !errors.HasInterface(err, (*purgatoryError)(nil)) {
t.Fatalf("expected a purgatory error, got: %+v", err)
}

Expand All @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) {
storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour)
a.storePool.detailsMu.Unlock()
_, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if errors.HasInterface(err, (*PurgatoryError)(nil)) {
if errors.HasInterface(err, (*purgatoryError)(nil)) {
t.Fatalf("expected a non purgatory error, got: %+v", err)
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,3 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration {
func (*consistencyQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*consistencyQueue) updateChan() <-chan time.Time {
return nil
}
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error }

func (rangeMergePurgatoryError) purgatoryErrorMarker() {}

var _ PurgatoryError = rangeMergePurgatoryError{}
var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
Expand Down Expand Up @@ -433,7 +433,3 @@ func (mq *mergeQueue) timer(time.Duration) time.Duration {
func (mq *mergeQueue) purgatoryChan() <-chan time.Time {
return mq.purgChan
}

func (mq *mergeQueue) updateChan() <-chan time.Time {
return nil
}
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,3 @@ func (*mvccGCQueue) timer(_ time.Duration) time.Duration {
func (*mvccGCQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*mvccGCQueue) updateChan() <-chan time.Time {
return nil
}
131 changes: 54 additions & 77 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// PurgatoryError indicates a replica processing failure which indicates the
// replica can be placed into purgatory for faster retries than the replica
// scanner's interval.
type PurgatoryError interface {
// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
// failure condition changes.
type purgatoryError interface {
error
purgatoryErrorMarker() // dummy method for unique interface
}
Expand Down Expand Up @@ -270,11 +270,6 @@ type queueImpl interface {
// purgatory due to failures. If purgatoryChan returns nil, failing
// replicas are not sent to purgatory.
purgatoryChan() <-chan time.Time

// updateChan returns a channel that is signalled whenever there is an update
// to the cluster state that might impact the replicas in the queue's
// purgatory.
updateChan() <-chan time.Time
}

// queueProcessTimeoutFunc controls the timeout for queue processing for a
Expand Down Expand Up @@ -385,7 +380,7 @@ type queueConfig struct {
//
// A queueImpl can opt into a purgatory by returning a non-nil channel from the
// `purgatoryChan` method. A replica is put into purgatory when the `process`
// method returns an error with a `PurgatoryError` as an entry somewhere in the
// method returns an error with a `purgatoryError` as an entry somewhere in the
// `Cause` chain. A replica in purgatory is not processed again until the
// channel is signaled, at which point every replica in purgatory is immediately
// processed. This catchup is run without the `timer` rate limiting but shares
Expand Down Expand Up @@ -419,7 +414,7 @@ type baseQueue struct {
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors
stopped bool
// Some tests in this package disable queues.
disabled bool
Expand Down Expand Up @@ -992,9 +987,8 @@ func isBenign(err error) bool {
return errors.HasType(err, (*benignError)(nil))
}

// IsPurgatoryError returns true iff the given error is a purgatory error.
func IsPurgatoryError(err error) (PurgatoryError, bool) {
var purgErr PurgatoryError
func isPurgatoryError(err error) (purgatoryError, bool) {
var purgErr purgatoryError
return purgErr, errors.As(err, &purgErr)
}

Expand Down Expand Up @@ -1090,7 +1084,7 @@ func (bq *baseQueue) finishProcessingReplica(
// the failing replica to purgatory. Note that even if the item was
// scheduled to be requeued, we ignore this if we add the replica to
// purgatory.
if purgErr, ok := IsPurgatoryError(err); ok {
if purgErr, ok := isPurgatoryError(err); ok {
bq.mu.Lock()
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
bq.mu.Unlock()
Expand All @@ -1112,7 +1106,7 @@ func (bq *baseQueue) finishProcessingReplica(
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
// holds replicas which have failed processing.
func (bq *baseQueue) addToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError,
) {
bq.mu.AssertHeld()

Expand Down Expand Up @@ -1150,7 +1144,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}

// Otherwise, create purgatory and start processing.
bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{
bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{
repl.GetRangeID(): purgErr,
}

Expand All @@ -1159,14 +1153,51 @@ func (bq *baseQueue) addToPurgatoryLocked(
ticker := time.NewTicker(purgatoryReportInterval)
for {
select {
case <-bq.impl.updateChan():
if bq.processReplicasInPurgatory(ctx, stopper) {
return
}
case <-bq.impl.purgatoryChan():
if bq.processReplicasInPurgatory(ctx, stopper) {
func() {
// Acquire from the process semaphore, release when done.
bq.processSem <- struct{}{}
defer func() { <-bq.processSem }()

// Remove all items from purgatory into a copied slice.
bq.mu.Lock()
ranges := make([]*replicaItem, 0, len(bq.mu.purgatory))
for rangeID := range bq.mu.purgatory {
item := bq.mu.replicas[rangeID]
if item == nil {
log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID)
}
item.setProcessing()
ranges = append(ranges, item)
bq.removeFromPurgatoryLocked(item)
}
bq.mu.Unlock()

for _, item := range ranges {
repl, err := bq.getReplica(item.rangeID)
if err != nil || item.replicaID != repl.ReplicaID() {
continue
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
return
}
}
}()

// Clean up purgatory, if empty.
bq.mu.Lock()
if len(bq.mu.purgatory) == 0 {
log.Infof(ctx, "purgatory is now empty")
bq.mu.purgatory = nil
bq.mu.Unlock()
return
}
bq.mu.Unlock()
case <-ticker.C:
// Report purgatory status.
bq.mu.Lock()
Expand All @@ -1182,61 +1213,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
return
}
}
},
)
}

// processReplicasInPurgatory processes replicas currently in the queue's
// purgatory.
func (bq *baseQueue) processReplicasInPurgatory(
ctx context.Context, stopper *stop.Stopper,
) (purgatoryCleared bool) {
func() {
// Acquire from the process semaphore, release when done.
bq.processSem <- struct{}{}
defer func() { <-bq.processSem }()

// Remove all items from purgatory into a copied slice.
bq.mu.Lock()
ranges := make([]*replicaItem, 0, len(bq.mu.purgatory))
for rangeID := range bq.mu.purgatory {
item := bq.mu.replicas[rangeID]
if item == nil {
log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID)
}
item.setProcessing()
ranges = append(ranges, item)
bq.removeFromPurgatoryLocked(item)
}
bq.mu.Unlock()

for _, item := range ranges {
repl, err := bq.getReplica(item.rangeID)
if err != nil || item.replicaID != repl.ReplicaID() {
continue
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
},
) != nil {
return
}
}
}()

// Clean up purgatory, if empty.
bq.mu.Lock()
if len(bq.mu.purgatory) == 0 {
log.Infof(ctx, "purgatory is now empty")
bq.mu.purgatory = nil
bq.mu.Unlock()
return true /* purgatoryCleared */
}
bq.mu.Unlock()
return false /* purgatoryCleared */
})
}

// pop dequeues the highest priority replica, if any, in the queue. The
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time {
return time.After(time.Nanosecond)
}

func (fakeQueueImpl) updateChan() <-chan time.Time {
return nil
}

type fakeReplica struct {
rangeID roachpb.RangeID
replicaID roachpb.ReplicaID
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ func (tq *testQueueImpl) purgatoryChan() <-chan time.Time {
return tq.pChan
}

func (tq *testQueueImpl) updateChan() <-chan time.Time {
return nil
}

func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue {
if !cfg.acceptsUnsplitRanges {
// Needed in order to pass the validation in newBaseQueue.
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,6 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*raftLogQueue) updateChan() <-chan time.Time {
return nil
}

func isLooselyCoupledRaftLogTruncationEnabled(
ctx context.Context, settings *cluster.Settings,
) bool {
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,3 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration {
func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (rq *raftSnapshotQueue) updateChan() <-chan time.Time {
return nil
}
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,3 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration {
func (*replicaGCQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*replicaGCQueue) updateChan() <-chan time.Time {
return nil
}
Loading