Skip to content

Commit

Permalink
Merge pull request cockroachdb#84439 from aayushshah15/release-22.1
Browse files Browse the repository at this point in the history
  • Loading branch information
aayushshah15 authored Jul 15, 2022
2 parents 7c33e5e + 9f47a21 commit 3c6c893
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 242 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (ae *allocatorError) Error() string {

func (*allocatorError) purgatoryErrorMarker() {}

var _ PurgatoryError = &allocatorError{}
var _ purgatoryError = &allocatorError{}

// allocatorRand pairs a rand.Rand with a mutex.
// NOTE: Allocator is typically only accessed from a single thread (the
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7012,7 +7012,7 @@ func TestAllocatorThrottled(t *testing.T) {

// First test to make sure we would send the replica to purgatory.
_, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if !errors.HasInterface(err, (*PurgatoryError)(nil)) {
if !errors.HasInterface(err, (*purgatoryError)(nil)) {
t.Fatalf("expected a purgatory error, got: %+v", err)
}

Expand All @@ -7036,7 +7036,7 @@ func TestAllocatorThrottled(t *testing.T) {
storeDetail.throttledUntil = timeutil.Now().Add(24 * time.Hour)
a.storePool.detailsMu.Unlock()
_, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil)
if errors.HasInterface(err, (*PurgatoryError)(nil)) {
if errors.HasInterface(err, (*purgatoryError)(nil)) {
t.Fatalf("expected a non purgatory error, got: %+v", err)
}
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,3 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration {
func (*consistencyQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*consistencyQueue) updateChan() <-chan time.Time {
return nil
}
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type rangeMergePurgatoryError struct{ error }

func (rangeMergePurgatoryError) purgatoryErrorMarker() {}

var _ PurgatoryError = rangeMergePurgatoryError{}
var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
Expand Down Expand Up @@ -433,7 +433,3 @@ func (mq *mergeQueue) timer(time.Duration) time.Duration {
func (mq *mergeQueue) purgatoryChan() <-chan time.Time {
return mq.purgChan
}

func (mq *mergeQueue) updateChan() <-chan time.Time {
return nil
}
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,3 @@ func (*mvccGCQueue) timer(_ time.Duration) time.Duration {
func (*mvccGCQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*mvccGCQueue) updateChan() <-chan time.Time {
return nil
}
131 changes: 54 additions & 77 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProc
// the operations's timeout.
const permittedRangeScanSlowdown = 10

// PurgatoryError indicates a replica processing failure which indicates the
// replica can be placed into purgatory for faster retries than the replica
// scanner's interval.
type PurgatoryError interface {
// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
// failure condition changes.
type purgatoryError interface {
error
purgatoryErrorMarker() // dummy method for unique interface
}
Expand Down Expand Up @@ -270,11 +270,6 @@ type queueImpl interface {
// purgatory due to failures. If purgatoryChan returns nil, failing
// replicas are not sent to purgatory.
purgatoryChan() <-chan time.Time

// updateChan returns a channel that is signalled whenever there is an update
// to the cluster state that might impact the replicas in the queue's
// purgatory.
updateChan() <-chan time.Time
}

// queueProcessTimeoutFunc controls the timeout for queue processing for a
Expand Down Expand Up @@ -385,7 +380,7 @@ type queueConfig struct {
//
// A queueImpl can opt into a purgatory by returning a non-nil channel from the
// `purgatoryChan` method. A replica is put into purgatory when the `process`
// method returns an error with a `PurgatoryError` as an entry somewhere in the
// method returns an error with a `purgatoryError` as an entry somewhere in the
// `Cause` chain. A replica in purgatory is not processed again until the
// channel is signaled, at which point every replica in purgatory is immediately
// processed. This catchup is run without the `timer` rate limiting but shares
Expand Down Expand Up @@ -419,7 +414,7 @@ type baseQueue struct {
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]PurgatoryError // Map of replicas to processing errors
purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors
stopped bool
// Some tests in this package disable queues.
disabled bool
Expand Down Expand Up @@ -992,9 +987,8 @@ func isBenign(err error) bool {
return errors.HasType(err, (*benignError)(nil))
}

// IsPurgatoryError returns true iff the given error is a purgatory error.
func IsPurgatoryError(err error) (PurgatoryError, bool) {
var purgErr PurgatoryError
func isPurgatoryError(err error) (purgatoryError, bool) {
var purgErr purgatoryError
return purgErr, errors.As(err, &purgErr)
}

Expand Down Expand Up @@ -1090,7 +1084,7 @@ func (bq *baseQueue) finishProcessingReplica(
// the failing replica to purgatory. Note that even if the item was
// scheduled to be requeued, we ignore this if we add the replica to
// purgatory.
if purgErr, ok := IsPurgatoryError(err); ok {
if purgErr, ok := isPurgatoryError(err); ok {
bq.mu.Lock()
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
bq.mu.Unlock()
Expand All @@ -1112,7 +1106,7 @@ func (bq *baseQueue) finishProcessingReplica(
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
// holds replicas which have failed processing.
func (bq *baseQueue) addToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr PurgatoryError,
ctx context.Context, stopper *stop.Stopper, repl replicaInQueue, purgErr purgatoryError,
) {
bq.mu.AssertHeld()

Expand Down Expand Up @@ -1150,7 +1144,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}

// Otherwise, create purgatory and start processing.
bq.mu.purgatory = map[roachpb.RangeID]PurgatoryError{
bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{
repl.GetRangeID(): purgErr,
}

Expand All @@ -1159,14 +1153,51 @@ func (bq *baseQueue) addToPurgatoryLocked(
ticker := time.NewTicker(purgatoryReportInterval)
for {
select {
case <-bq.impl.updateChan():
if bq.processReplicasInPurgatory(ctx, stopper) {
return
}
case <-bq.impl.purgatoryChan():
if bq.processReplicasInPurgatory(ctx, stopper) {
func() {
// Acquire from the process semaphore, release when done.
bq.processSem <- struct{}{}
defer func() { <-bq.processSem }()

// Remove all items from purgatory into a copied slice.
bq.mu.Lock()
ranges := make([]*replicaItem, 0, len(bq.mu.purgatory))
for rangeID := range bq.mu.purgatory {
item := bq.mu.replicas[rangeID]
if item == nil {
log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID)
}
item.setProcessing()
ranges = append(ranges, item)
bq.removeFromPurgatoryLocked(item)
}
bq.mu.Unlock()

for _, item := range ranges {
repl, err := bq.getReplica(item.rangeID)
if err != nil || item.replicaID != repl.ReplicaID() {
continue
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
return
}
}
}()

// Clean up purgatory, if empty.
bq.mu.Lock()
if len(bq.mu.purgatory) == 0 {
log.Infof(ctx, "purgatory is now empty")
bq.mu.purgatory = nil
bq.mu.Unlock()
return
}
bq.mu.Unlock()
case <-ticker.C:
// Report purgatory status.
bq.mu.Lock()
Expand All @@ -1182,61 +1213,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
return
}
}
},
)
}

// processReplicasInPurgatory processes replicas currently in the queue's
// purgatory.
func (bq *baseQueue) processReplicasInPurgatory(
ctx context.Context, stopper *stop.Stopper,
) (purgatoryCleared bool) {
func() {
// Acquire from the process semaphore, release when done.
bq.processSem <- struct{}{}
defer func() { <-bq.processSem }()

// Remove all items from purgatory into a copied slice.
bq.mu.Lock()
ranges := make([]*replicaItem, 0, len(bq.mu.purgatory))
for rangeID := range bq.mu.purgatory {
item := bq.mu.replicas[rangeID]
if item == nil {
log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID)
}
item.setProcessing()
ranges = append(ranges, item)
bq.removeFromPurgatoryLocked(item)
}
bq.mu.Unlock()

for _, item := range ranges {
repl, err := bq.getReplica(item.rangeID)
if err != nil || item.replicaID != repl.ReplicaID() {
continue
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
},
) != nil {
return
}
}
}()

// Clean up purgatory, if empty.
bq.mu.Lock()
if len(bq.mu.purgatory) == 0 {
log.Infof(ctx, "purgatory is now empty")
bq.mu.purgatory = nil
bq.mu.Unlock()
return true /* purgatoryCleared */
}
bq.mu.Unlock()
return false /* purgatoryCleared */
})
}

// pop dequeues the highest priority replica, if any, in the queue. The
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time {
return time.After(time.Nanosecond)
}

func (fakeQueueImpl) updateChan() <-chan time.Time {
return nil
}

type fakeReplica struct {
rangeID roachpb.RangeID
replicaID roachpb.ReplicaID
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ func (tq *testQueueImpl) purgatoryChan() <-chan time.Time {
return tq.pChan
}

func (tq *testQueueImpl) updateChan() <-chan time.Time {
return nil
}

func makeTestBaseQueue(name string, impl queueImpl, store *Store, cfg queueConfig) *baseQueue {
if !cfg.acceptsUnsplitRanges {
// Needed in order to pass the validation in newBaseQueue.
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,6 @@ func (*raftLogQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*raftLogQueue) updateChan() <-chan time.Time {
return nil
}

func isLooselyCoupledRaftLogTruncationEnabled(
ctx context.Context, settings *cluster.Settings,
) bool {
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,3 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration {
func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (rq *raftSnapshotQueue) updateChan() <-chan time.Time {
return nil
}
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/replica_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,3 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration {
func (*replicaGCQueue) purgatoryChan() <-chan time.Time {
return nil
}

func (*replicaGCQueue) updateChan() <-chan time.Time {
return nil
}
Loading

0 comments on commit 3c6c893

Please sign in to comment.