Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add unsplittable ranges to split queue purgatory #25245

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2353,67 +2353,89 @@ func TestDistributedTxnCleanup(t *testing.T) {
})
}

// TestUnsplittableRange creates an unsplittable range and tests that
// it is handled correctly by the split queue's purgatory. The test:
// 1. creates an unsplittable range that needs to be split
// 2. makes sure that range enters purgatory
// 3. makes sure a purgatory run still fails
// 4. GCs part of the range so that it no longer needs to be split
// 5. makes sure a purgatory run succeeds and the range leaves purgatory
func TestUnsplittableRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ttl := 1 * time.Hour
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
GC: config.GCPolicy{
TTLSeconds: int32(ttl.Seconds()),
},
})()

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

store := createTestStoreWithConfig(t, stopper, storage.TestStoreConfig(nil))
store.ForceSplitScanAndProcess()
manual := hlc.NewManualClock(123)
splitQueuePurgatoryChan := make(chan time.Time, 1)
cfg := storage.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
cfg.TestingKnobs.SplitQueuePurgatoryChan = splitQueuePurgatoryChan
store := createTestStoreWithConfig(t, stopper, cfg)
if err := server.WaitForInitialSplits(store.DB()); err != nil {
t.Fatal(err)
}

// Add a single large row to /Table/14.
tableKey := keys.MakeTablePrefix(keys.UITableID)
row1Key := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), 1))
col1Key := keys.MakeFamilyKey(append([]byte(nil), row1Key...), 0)
value := bytes.Repeat([]byte("x"), 64<<10)
valueLen := 0.9 * maxBytes
value := bytes.Repeat([]byte("x"), int(valueLen))
if err := store.DB().Put(context.Background(), col1Key, value); err != nil {
t.Fatal(err)
}

store.ForceSplitScanAndProcess()
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(tableKey, nil)
if repl.Desc().StartKey.Equal(tableKey) {
return nil
}
return errors.Errorf("waiting for split: %s", repl)
})

repl := store.LookupReplica(tableKey, nil)
origMaxBytes := repl.GetMaxBytes()
repl.SetMaxBytes(int64(len(value)))
// Wait for half of the ttl and add another large value in the same row.
// Together, these two values bump the range over the max range size.
manual.Increment(ttl.Nanoseconds() / 2)
value2Len := 0.2 * maxBytes
value2 := bytes.Repeat([]byte("y"), int(value2Len))
if err := store.DB().Put(context.Background(), col1Key, value2); err != nil {
t.Fatal(err)
}

// Wait for an attempt to split the range which will fail because it contains
// a single large value. The max-bytes for the range will be changed, but it
// should not have been reset to its original value.
// Ensure that an attempt to split the range will hit an
// unsplittableRangeError and place the range in purgatory.
store.ForceSplitScanAndProcess()
testutils.SucceedsSoon(t, func() error {
maxBytes := repl.GetMaxBytes()
if maxBytes != int64(len(value)) && maxBytes < origMaxBytes {
return nil
}
return errors.Errorf("expected max-bytes to be changed: %d", repl.GetMaxBytes())
})
if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 {
t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen)
}

// Add two more rows to the range.
for i := int64(2); i < 4; i++ {
rowKey := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), i))
colKey := keys.MakeFamilyKey(append([]byte(nil), rowKey...), 0)
if err := store.DB().Put(context.Background(), colKey, value); err != nil {
t.Fatal(err)
}
// Signal the split queue's purgatory channel and ensure that the purgatory
// remains occupied because the range still needs to split but can't.
splitQueuePurgatoryChan <- timeutil.Now()
if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 {
t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen)
}

// Wait for the range to be split and verify that max-bytes was reset to the
// value in the zone config.
store.ForceSplitScanAndProcess()
// Wait for much longer than the ttl to accumulate GCByteAge.
manual.Increment(10 * ttl.Nanoseconds())
// Trigger the GC queue, which should clean up the earlier version of the
// row. Once the first version of the row is cleaned up, the range should
// exit the split queue purgatory.
repl := store.LookupReplica(tableKey, nil)
if err := store.ManualGC(repl); err != nil {
t.Fatal(err)
}

// Signal the split queue's purgatory channel and ensure that the purgatory
// removes its now well-sized replica.
splitQueuePurgatoryChan <- timeutil.Now()
testutils.SucceedsSoon(t, func() error {
if origMaxBytes == repl.GetMaxBytes() {
purgLen := store.SplitQueuePurgatoryLength()
if purgLen == 0 {
return nil
}
return errors.Errorf("expected max-bytes=%d, but got max-bytes=%d",
origMaxBytes, repl.GetMaxBytes())
return errors.Errorf("expected split queue purgatory to be empty, found %d", purgLen)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/consistency_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,6 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration {
}

// purgatoryChan returns nil.
func (*consistencyQueue) purgatoryChan() <-chan struct{} {
func (*consistencyQueue) purgatoryChan() <-chan time.Time {
return nil
}
2 changes: 1 addition & 1 deletion pkg/storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,6 @@ func (*gcQueue) timer(_ time.Duration) time.Duration {
}

// purgatoryChan returns nil.
func (*gcQueue) purgatoryChan() <-chan struct{} {
func (*gcQueue) purgatoryChan() <-chan time.Time {
return nil
}
6 changes: 6 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func (s *Store) ReplicateQueuePurgatoryLength() int {
return s.replicateQueue.PurgatoryLength()
}

// SplitQueuePurgatoryLength returns the number of replicas in split
// queue purgatory.
func (s *Store) SplitQueuePurgatoryLength() int {
return s.splitQueue.PurgatoryLength()
}

// SetRaftLogQueueActive enables or disables the raft log queue.
func (s *Store) SetRaftLogQueueActive(active bool) {
s.setRaftLogQueueActive(active)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ var (
metaSplitQueueProcessingNanos = metric.Metadata{
Name: "queue.split.processingnanos",
Help: "Nanoseconds spent processing replicas in the split queue"}
metaSplitQueuePurgatory = metric.Metadata{
Name: "queue.split.purgatory",
Help: "Number of replicas in the split queue's purgatory, waiting to become splittable"}
metaTimeSeriesMaintenanceQueueSuccesses = metric.Metadata{
Name: "queue.tsmaintenance.process.success",
Help: "Number of replicas successfully processed by the time series maintenance queue"}
Expand Down Expand Up @@ -637,6 +640,7 @@ type StoreMetrics struct {
SplitQueueFailures *metric.Counter
SplitQueuePending *metric.Gauge
SplitQueueProcessingNanos *metric.Counter
SplitQueuePurgatory *metric.Gauge
TimeSeriesMaintenanceQueueSuccesses *metric.Counter
TimeSeriesMaintenanceQueueFailures *metric.Counter
TimeSeriesMaintenanceQueuePending *metric.Gauge
Expand Down Expand Up @@ -824,6 +828,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
SplitQueueFailures: metric.NewCounter(metaSplitQueueFailures),
SplitQueuePending: metric.NewGauge(metaSplitQueuePending),
SplitQueueProcessingNanos: metric.NewCounter(metaSplitQueueProcessingNanos),
SplitQueuePurgatory: metric.NewGauge(metaSplitQueuePurgatory),
TimeSeriesMaintenanceQueueSuccesses: metric.NewCounter(metaTimeSeriesMaintenanceQueueFailures),
TimeSeriesMaintenanceQueueFailures: metric.NewCounter(metaTimeSeriesMaintenanceQueueSuccesses),
TimeSeriesMaintenanceQueuePending: metric.NewGauge(metaTimeSeriesMaintenanceQueuePending),
Expand Down
79 changes: 45 additions & 34 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ type queueImpl interface {
// yet, this can be 0.
timer(time.Duration) time.Duration

// purgatoryChan returns a channel that is signaled when it's time
// to retry replicas which have been relegated to purgatory due to
// failures. If purgatoryChan returns nil, failing replicas are not
// sent to purgatory.
purgatoryChan() <-chan struct{}
// purgatoryChan returns a channel that is signaled with the current
// time when it's time to retry replicas which have been relegated to
// purgatory due to failures. If purgatoryChan returns nil, failing
// replicas are not sent to purgatory.
purgatoryChan() <-chan time.Time
}

type queueConfig struct {
Expand Down Expand Up @@ -253,10 +253,10 @@ type baseQueue struct {
processSem chan struct{}
processDur int64 // accessed atomically
mu struct {
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]error // Map of replicas to processing errors
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors
stopped bool
// Some tests in this package disable queues.
disabled bool
Expand Down Expand Up @@ -312,6 +312,11 @@ func (bq *baseQueue) Length() int {

// PurgatoryLength returns the current size of purgatory.
func (bq *baseQueue) PurgatoryLength() int {
// Lock processing while measuring the purgatory length. This ensures that
// no purgatory replicas are concurrently being processed, during which time
// they are removed from bq.mu.purgatory even though they may be re-added.
defer bq.lockProcessing()()

bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.mu.purgatory)
Expand Down Expand Up @@ -736,12 +741,6 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err
func (bq *baseQueue) finishProcessingReplica(
ctx context.Context, stopper *stop.Stopper, repl *Replica, err error,
) {
if err != nil {
// Increment failures metric here to capture all error returns from
// process().
bq.failures.Inc(1)
}

bq.mu.Lock()
defer bq.mu.Unlock()

Expand All @@ -755,32 +754,44 @@ func (bq *baseQueue) finishProcessingReplica(
cb(err)
}

// Handle failures.
if err != nil {
// Increment failures metric to capture all error.
bq.failures.Inc(1)

// Determine whether a failure is a purgatory error. If it is, add
// the failing replica to purgatory. Note that even if the item was
// scheduled to be requeued, we ignore this if we add the replica to
// purgatory.
if purgErr, ok := errors.Cause(err).(purgatoryError); ok {
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
return
}

// If not a purgatory error, log.
log.Error(ctx, err)
}

// Maybe add replica back into queue, if requested.
if item.requeue {
// Maybe add replica back into queue.
bq.maybeAddLocked(ctx, repl, bq.store.Clock().Now())
} else if err != nil {
// Maybe add failing replica to purgatory if the queue supports it.
bq.maybeAddToPurgatoryLocked(ctx, stopper, repl, err)
}
}

// maybeAddToPurgatoryLocked possibly adds the specified replica
// to the purgatory queue, which holds replicas which have failed
// processing. To be added, the failing error must implement
// purgatoryError and the queue implementation must have its own
// mechanism for signaling re-processing of replicas held in
// purgatory.
func (bq *baseQueue) maybeAddToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl *Replica, triggeringErr error,
// addToPurgatoryLocked adds the specified replica to the purgatory queue, which
// holds replicas which have failed processing.
func (bq *baseQueue) addToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl *Replica, purgErr purgatoryError,
) {
// Check whether the failure is a purgatory error and whether the queue supports it.
if _, ok := errors.Cause(triggeringErr).(purgatoryError); !ok || bq.impl.purgatoryChan() == nil {
log.Error(ctx, triggeringErr)
// Check whether the queue supports purgatory errors. If not then something
// went wrong because a purgatory error should not have ended up here.
if bq.impl.purgatoryChan() == nil {
log.Errorf(ctx, "queue does not support purgatory errors, but saw %v", purgErr)
return
}

if log.V(1) {
log.Info(ctx, errors.Wrap(triggeringErr, "purgatory"))
log.Info(ctx, errors.Wrap(purgErr, "purgatory"))
}

item := &replicaItem{value: repl.RangeID}
Expand All @@ -792,13 +803,13 @@ func (bq *baseQueue) maybeAddToPurgatoryLocked(

// If purgatory already exists, just add to the map and we're done.
if bq.mu.purgatory != nil {
bq.mu.purgatory[repl.RangeID] = triggeringErr
bq.mu.purgatory[repl.RangeID] = purgErr
return
}

// Otherwise, create purgatory and start processing.
bq.mu.purgatory = map[roachpb.RangeID]error{
repl.RangeID: triggeringErr,
bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{
repl.RangeID: purgErr,
}

workerCtx := bq.AnnotateCtx(context.Background())
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// testQueueImpl implements queueImpl with a closure for shouldQueue.
Expand All @@ -44,7 +45,7 @@ type testQueueImpl struct {
processed int32 // accessed atomically
duration time.Duration
blocker chan struct{} // timer() blocks on this if not nil
pChan chan struct{}
pChan chan time.Time
err error // always returns this error on process
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func (tq *testQueueImpl) timer(_ time.Duration) time.Duration {
return 0
}

func (tq *testQueueImpl) purgatoryChan() <-chan struct{} {
func (tq *testQueueImpl) purgatoryChan() <-chan time.Time {
return tq.pChan
}

Expand Down Expand Up @@ -658,7 +659,7 @@ func TestBaseQueuePurgatory(t *testing.T) {
priority = float64(r.RangeID)
return
},
pChan: make(chan struct{}, 1),
pChan: make(chan time.Time, 1),
err: &testError{},
}

Expand Down Expand Up @@ -704,7 +705,7 @@ func TestBaseQueuePurgatory(t *testing.T) {
})

// Now, signal that purgatoried replicas should retry.
testQueue.pChan <- struct{}{}
testQueue.pChan <- timeutil.Now()

testutils.SucceedsSoon(t, func() error {
if pc := testQueue.getProcessed(); pc != replicaCount*2 {
Expand Down Expand Up @@ -739,7 +740,7 @@ func TestBaseQueuePurgatory(t *testing.T) {

// Remove error and reprocess.
testQueue.err = nil
testQueue.pChan <- struct{}{}
testQueue.pChan <- timeutil.Now()

testutils.SucceedsSoon(t, func() error {
if pc := testQueue.getProcessed(); pc != replicaCount*3 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (*raftLogQueue) timer(_ time.Duration) time.Duration {
}

// purgatoryChan returns nil.
func (*raftLogQueue) purgatoryChan() <-chan struct{} {
func (*raftLogQueue) purgatoryChan() <-chan time.Time {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,6 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration {
return raftSnapshotQueueTimerDuration
}

func (rq *raftSnapshotQueue) purgatoryChan() <-chan struct{} {
func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time {
return nil
}
Loading