From a540ae22c6a18c0abad70a6628ea8eb9d04320f6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 1 May 2018 16:01:51 -0400 Subject: [PATCH 1/3] storage: queueImpl.purgatoryChan now returns chan time.Time This makes the interface easier to use with any time-based channel mechanism, like Tickers and Timers. Release note: None --- pkg/storage/consistency_queue.go | 2 +- pkg/storage/gc_queue.go | 2 +- pkg/storage/queue.go | 10 +++++----- pkg/storage/queue_test.go | 11 ++++++----- pkg/storage/raft_log_queue.go | 2 +- pkg/storage/raft_snapshot_queue.go | 2 +- pkg/storage/replica_gc_queue.go | 2 +- pkg/storage/replicate_queue.go | 8 ++++---- pkg/storage/split_queue.go | 2 +- pkg/storage/ts_maintenance_queue.go | 2 +- 10 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pkg/storage/consistency_queue.go b/pkg/storage/consistency_queue.go index 927ca4e7dd32..8e2f6942686a 100644 --- a/pkg/storage/consistency_queue.go +++ b/pkg/storage/consistency_queue.go @@ -133,6 +133,6 @@ func (q *consistencyQueue) timer(duration time.Duration) time.Duration { } // purgatoryChan returns nil. -func (*consistencyQueue) purgatoryChan() <-chan struct{} { +func (*consistencyQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/gc_queue.go b/pkg/storage/gc_queue.go index 9130fd246de2..f26c439c30df 100644 --- a/pkg/storage/gc_queue.go +++ b/pkg/storage/gc_queue.go @@ -906,6 +906,6 @@ func (*gcQueue) timer(_ time.Duration) time.Duration { } // purgatoryChan returns nil. -func (*gcQueue) purgatoryChan() <-chan struct{} { +func (*gcQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index e02bd66505ba..9a534fd3838c 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -177,11 +177,11 @@ type queueImpl interface { // yet, this can be 0. timer(time.Duration) time.Duration - // purgatoryChan returns a channel that is signaled when it's time - // to retry replicas which have been relegated to purgatory due to - // failures. If purgatoryChan returns nil, failing replicas are not - // sent to purgatory. - purgatoryChan() <-chan struct{} + // purgatoryChan returns a channel that is signaled with the current + // time when it's time to retry replicas which have been relegated to + // purgatory due to failures. If purgatoryChan returns nil, failing + // replicas are not sent to purgatory. + purgatoryChan() <-chan time.Time } type queueConfig struct { diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index 37ae91e89de6..4cab8031feab 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) // testQueueImpl implements queueImpl with a closure for shouldQueue. @@ -44,7 +45,7 @@ type testQueueImpl struct { processed int32 // accessed atomically duration time.Duration blocker chan struct{} // timer() blocks on this if not nil - pChan chan struct{} + pChan chan time.Time err error // always returns this error on process } @@ -73,7 +74,7 @@ func (tq *testQueueImpl) timer(_ time.Duration) time.Duration { return 0 } -func (tq *testQueueImpl) purgatoryChan() <-chan struct{} { +func (tq *testQueueImpl) purgatoryChan() <-chan time.Time { return tq.pChan } @@ -658,7 +659,7 @@ func TestBaseQueuePurgatory(t *testing.T) { priority = float64(r.RangeID) return }, - pChan: make(chan struct{}, 1), + pChan: make(chan time.Time, 1), err: &testError{}, } @@ -704,7 +705,7 @@ func TestBaseQueuePurgatory(t *testing.T) { }) // Now, signal that purgatoried replicas should retry. - testQueue.pChan <- struct{}{} + testQueue.pChan <- timeutil.Now() testutils.SucceedsSoon(t, func() error { if pc := testQueue.getProcessed(); pc != replicaCount*2 { @@ -739,7 +740,7 @@ func TestBaseQueuePurgatory(t *testing.T) { // Remove error and reprocess. testQueue.err = nil - testQueue.pChan <- struct{}{} + testQueue.pChan <- timeutil.Now() testutils.SucceedsSoon(t, func() error { if pc := testQueue.getProcessed(); pc != replicaCount*3 { diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index dea506ea3ad8..40d1c9df8c7a 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -279,7 +279,7 @@ func (*raftLogQueue) timer(_ time.Duration) time.Duration { } // purgatoryChan returns nil. -func (*raftLogQueue) purgatoryChan() <-chan struct{} { +func (*raftLogQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index cfe149220cf4..d3a1a7257154 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -121,6 +121,6 @@ func (*raftSnapshotQueue) timer(_ time.Duration) time.Duration { return raftSnapshotQueueTimerDuration } -func (rq *raftSnapshotQueue) purgatoryChan() <-chan struct{} { +func (rq *raftSnapshotQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/replica_gc_queue.go b/pkg/storage/replica_gc_queue.go index b1e62232480d..142c378c03e8 100644 --- a/pkg/storage/replica_gc_queue.go +++ b/pkg/storage/replica_gc_queue.go @@ -254,6 +254,6 @@ func (*replicaGCQueue) timer(_ time.Duration) time.Duration { } // purgatoryChan returns nil. -func (*replicaGCQueue) purgatoryChan() <-chan struct{} { +func (*replicaGCQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 61eb52357fd0..0557037d346e 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -96,7 +96,7 @@ type replicateQueue struct { *baseQueue metrics ReplicateQueueMetrics allocator Allocator - updateChan chan struct{} + updateChan chan time.Time lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines } @@ -105,7 +105,7 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep rq := &replicateQueue{ metrics: makeReplicateQueueMetrics(), allocator: allocator, - updateChan: make(chan struct{}, 1), + updateChan: make(chan time.Time, 1), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -125,7 +125,7 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep updateFn := func() { select { - case rq.updateChan <- struct{}{}: + case rq.updateChan <- timeutil.Now(): default: } } @@ -608,7 +608,7 @@ func (*replicateQueue) timer(_ time.Duration) time.Duration { } // purgatoryChan returns the replicate queue's store update channel. -func (rq *replicateQueue) purgatoryChan() <-chan struct{} { +func (rq *replicateQueue) purgatoryChan() <-chan time.Time { return rq.updateChan } diff --git a/pkg/storage/split_queue.go b/pkg/storage/split_queue.go index fe213c73776a..76b3ad3f16d8 100644 --- a/pkg/storage/split_queue.go +++ b/pkg/storage/split_queue.go @@ -165,6 +165,6 @@ func (*splitQueue) timer(_ time.Duration) time.Duration { } // purgatoryChan returns nil. -func (*splitQueue) purgatoryChan() <-chan struct{} { +func (*splitQueue) purgatoryChan() <-chan time.Time { return nil } diff --git a/pkg/storage/ts_maintenance_queue.go b/pkg/storage/ts_maintenance_queue.go index 9ce919bbbac0..aa34a83b4032 100644 --- a/pkg/storage/ts_maintenance_queue.go +++ b/pkg/storage/ts_maintenance_queue.go @@ -154,6 +154,6 @@ func (q *timeSeriesMaintenanceQueue) timer(duration time.Duration) time.Duration return replInterval - duration } -func (*timeSeriesMaintenanceQueue) purgatoryChan() <-chan struct{} { +func (*timeSeriesMaintenanceQueue) purgatoryChan() <-chan time.Time { return nil } From 3d13d94a62b98b7df3ac94d290c8553286e408b2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 1 May 2018 16:54:57 -0400 Subject: [PATCH 2/3] storage: improve logging of queue errors Fixes #25191. Release note: None --- pkg/storage/queue.go | 64 ++++++++++++++++++++++++-------------------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 9a534fd3838c..a953939978ec 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -253,10 +253,10 @@ type baseQueue struct { processSem chan struct{} processDur int64 // accessed atomically mu struct { - syncutil.Mutex // Protects all variables in the mu struct - replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem - priorityQ priorityQueue // The priority queue - purgatory map[roachpb.RangeID]error // Map of replicas to processing errors + syncutil.Mutex // Protects all variables in the mu struct + replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem + priorityQ priorityQueue // The priority queue + purgatory map[roachpb.RangeID]purgatoryError // Map of replicas to processing errors stopped bool // Some tests in this package disable queues. disabled bool @@ -736,12 +736,6 @@ func (bq *baseQueue) processReplica(queueCtx context.Context, repl *Replica) err func (bq *baseQueue) finishProcessingReplica( ctx context.Context, stopper *stop.Stopper, repl *Replica, err error, ) { - if err != nil { - // Increment failures metric here to capture all error returns from - // process(). - bq.failures.Inc(1) - } - bq.mu.Lock() defer bq.mu.Unlock() @@ -755,32 +749,44 @@ func (bq *baseQueue) finishProcessingReplica( cb(err) } + // Handle failures. + if err != nil { + // Increment failures metric to capture all error. + bq.failures.Inc(1) + + // Determine whether a failure is a purgatory error. If it is, add + // the failing replica to purgatory. Note that even if the item was + // scheduled to be requeued, we ignore this if we add the replica to + // purgatory. + if purgErr, ok := errors.Cause(err).(purgatoryError); ok { + bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr) + return + } + + // If not a purgatory error, log. + log.Error(ctx, err) + } + + // Maybe add replica back into queue, if requested. if item.requeue { - // Maybe add replica back into queue. bq.maybeAddLocked(ctx, repl, bq.store.Clock().Now()) - } else if err != nil { - // Maybe add failing replica to purgatory if the queue supports it. - bq.maybeAddToPurgatoryLocked(ctx, stopper, repl, err) } } -// maybeAddToPurgatoryLocked possibly adds the specified replica -// to the purgatory queue, which holds replicas which have failed -// processing. To be added, the failing error must implement -// purgatoryError and the queue implementation must have its own -// mechanism for signaling re-processing of replicas held in -// purgatory. -func (bq *baseQueue) maybeAddToPurgatoryLocked( - ctx context.Context, stopper *stop.Stopper, repl *Replica, triggeringErr error, +// addToPurgatoryLocked adds the specified replica to the purgatory queue, which +// holds replicas which have failed processing. +func (bq *baseQueue) addToPurgatoryLocked( + ctx context.Context, stopper *stop.Stopper, repl *Replica, purgErr purgatoryError, ) { - // Check whether the failure is a purgatory error and whether the queue supports it. - if _, ok := errors.Cause(triggeringErr).(purgatoryError); !ok || bq.impl.purgatoryChan() == nil { - log.Error(ctx, triggeringErr) + // Check whether the queue supports purgatory errors. If not then something + // went wrong because a purgatory error should not have ended up here. + if bq.impl.purgatoryChan() == nil { + log.Errorf(ctx, "queue does not support purgatory errors, but saw %v", purgErr) return } if log.V(1) { - log.Info(ctx, errors.Wrap(triggeringErr, "purgatory")) + log.Info(ctx, errors.Wrap(purgErr, "purgatory")) } item := &replicaItem{value: repl.RangeID} @@ -792,13 +798,13 @@ func (bq *baseQueue) maybeAddToPurgatoryLocked( // If purgatory already exists, just add to the map and we're done. if bq.mu.purgatory != nil { - bq.mu.purgatory[repl.RangeID] = triggeringErr + bq.mu.purgatory[repl.RangeID] = purgErr return } // Otherwise, create purgatory and start processing. - bq.mu.purgatory = map[roachpb.RangeID]error{ - repl.RangeID: triggeringErr, + bq.mu.purgatory = map[roachpb.RangeID]purgatoryError{ + repl.RangeID: purgErr, } workerCtx := bq.AnnotateCtx(context.Background()) From e49546720184fddd1e0f1180a3c1a33804d59415 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 2 May 2018 13:14:50 -0400 Subject: [PATCH 3/3] storage: add unsplittable ranges to split queue purgatory In #14654 we added a mechanism to double the max range size whenever a split attempt found a range that was unsplittable. This prevented a tight loop of split attempts. However, it didn't actually do anything to help us find a split point to reduce the size of the range in the future. This size doubling worked in practice, but it was a blunt instrument that had strange effects (see #24215). This change rips out this range size doubling and replaces it with a split queue purgatory. This purgatory is used to house replicas that are unsplittable, preventing them from getting into a tight loop. Release note: None --- pkg/storage/client_split_test.go | 98 +++++++++++++++++++------------- pkg/storage/helpers_test.go | 6 ++ pkg/storage/metrics.go | 5 ++ pkg/storage/queue.go | 5 ++ pkg/storage/replica_command.go | 48 ++++++++-------- pkg/storage/split_queue.go | 64 ++++++++++++--------- pkg/storage/store.go | 3 + 7 files changed, 140 insertions(+), 89 deletions(-) diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 7ffa8fad8df3..b5a2e2e9053c 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -2353,67 +2353,89 @@ func TestDistributedTxnCleanup(t *testing.T) { }) } +// TestUnsplittableRange creates an unsplittable range and tests that +// it is handled correctly by the split queue's purgatory. The test: +// 1. creates an unsplittable range that needs to be split +// 2. makes sure that range enters purgatory +// 3. makes sure a purgatory run still fails +// 4. GCs part of the range so that it no longer needs to be split +// 5. makes sure a purgatory run succeeds and the range leaves purgatory func TestUnsplittableRange(t *testing.T) { defer leaktest.AfterTest(t)() + ttl := 1 * time.Hour + const maxBytes = 1 << 16 + defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{ + RangeMaxBytes: maxBytes, + GC: config.GCPolicy{ + TTLSeconds: int32(ttl.Seconds()), + }, + })() + stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) - store := createTestStoreWithConfig(t, stopper, storage.TestStoreConfig(nil)) - store.ForceSplitScanAndProcess() + manual := hlc.NewManualClock(123) + splitQueuePurgatoryChan := make(chan time.Time, 1) + cfg := storage.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) + cfg.TestingKnobs.SplitQueuePurgatoryChan = splitQueuePurgatoryChan + store := createTestStoreWithConfig(t, stopper, cfg) + if err := server.WaitForInitialSplits(store.DB()); err != nil { + t.Fatal(err) + } // Add a single large row to /Table/14. tableKey := keys.MakeTablePrefix(keys.UITableID) row1Key := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), 1)) col1Key := keys.MakeFamilyKey(append([]byte(nil), row1Key...), 0) - value := bytes.Repeat([]byte("x"), 64<<10) + valueLen := 0.9 * maxBytes + value := bytes.Repeat([]byte("x"), int(valueLen)) if err := store.DB().Put(context.Background(), col1Key, value); err != nil { t.Fatal(err) } - store.ForceSplitScanAndProcess() - testutils.SucceedsSoon(t, func() error { - repl := store.LookupReplica(tableKey, nil) - if repl.Desc().StartKey.Equal(tableKey) { - return nil - } - return errors.Errorf("waiting for split: %s", repl) - }) - - repl := store.LookupReplica(tableKey, nil) - origMaxBytes := repl.GetMaxBytes() - repl.SetMaxBytes(int64(len(value))) + // Wait for half of the ttl and add another large value in the same row. + // Together, these two values bump the range over the max range size. + manual.Increment(ttl.Nanoseconds() / 2) + value2Len := 0.2 * maxBytes + value2 := bytes.Repeat([]byte("y"), int(value2Len)) + if err := store.DB().Put(context.Background(), col1Key, value2); err != nil { + t.Fatal(err) + } - // Wait for an attempt to split the range which will fail because it contains - // a single large value. The max-bytes for the range will be changed, but it - // should not have been reset to its original value. + // Ensure that an attempt to split the range will hit an + // unsplittableRangeError and place the range in purgatory. store.ForceSplitScanAndProcess() - testutils.SucceedsSoon(t, func() error { - maxBytes := repl.GetMaxBytes() - if maxBytes != int64(len(value)) && maxBytes < origMaxBytes { - return nil - } - return errors.Errorf("expected max-bytes to be changed: %d", repl.GetMaxBytes()) - }) + if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 { + t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen) + } - // Add two more rows to the range. - for i := int64(2); i < 4; i++ { - rowKey := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), i)) - colKey := keys.MakeFamilyKey(append([]byte(nil), rowKey...), 0) - if err := store.DB().Put(context.Background(), colKey, value); err != nil { - t.Fatal(err) - } + // Signal the split queue's purgatory channel and ensure that the purgatory + // remains occupied because the range still needs to split but can't. + splitQueuePurgatoryChan <- timeutil.Now() + if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 { + t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen) } - // Wait for the range to be split and verify that max-bytes was reset to the - // value in the zone config. - store.ForceSplitScanAndProcess() + // Wait for much longer than the ttl to accumulate GCByteAge. + manual.Increment(10 * ttl.Nanoseconds()) + // Trigger the GC queue, which should clean up the earlier version of the + // row. Once the first version of the row is cleaned up, the range should + // exit the split queue purgatory. + repl := store.LookupReplica(tableKey, nil) + if err := store.ManualGC(repl); err != nil { + t.Fatal(err) + } + + // Signal the split queue's purgatory channel and ensure that the purgatory + // removes its now well-sized replica. + splitQueuePurgatoryChan <- timeutil.Now() testutils.SucceedsSoon(t, func() error { - if origMaxBytes == repl.GetMaxBytes() { + purgLen := store.SplitQueuePurgatoryLength() + if purgLen == 0 { return nil } - return errors.Errorf("expected max-bytes=%d, but got max-bytes=%d", - origMaxBytes, repl.GetMaxBytes()) + return errors.Errorf("expected split queue purgatory to be empty, found %d", purgLen) }) } diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 622fabf72900..54d3e35b3657 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -152,6 +152,12 @@ func (s *Store) ReplicateQueuePurgatoryLength() int { return s.replicateQueue.PurgatoryLength() } +// SplitQueuePurgatoryLength returns the number of replicas in split +// queue purgatory. +func (s *Store) SplitQueuePurgatoryLength() int { + return s.splitQueue.PurgatoryLength() +} + // SetRaftLogQueueActive enables or disables the raft log queue. func (s *Store) SetRaftLogQueueActive(active bool) { s.setRaftLogQueueActive(active) diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index 0f6f2648da2d..2778cc501094 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -400,6 +400,9 @@ var ( metaSplitQueueProcessingNanos = metric.Metadata{ Name: "queue.split.processingnanos", Help: "Nanoseconds spent processing replicas in the split queue"} + metaSplitQueuePurgatory = metric.Metadata{ + Name: "queue.split.purgatory", + Help: "Number of replicas in the split queue's purgatory, waiting to become splittable"} metaTimeSeriesMaintenanceQueueSuccesses = metric.Metadata{ Name: "queue.tsmaintenance.process.success", Help: "Number of replicas successfully processed by the time series maintenance queue"} @@ -637,6 +640,7 @@ type StoreMetrics struct { SplitQueueFailures *metric.Counter SplitQueuePending *metric.Gauge SplitQueueProcessingNanos *metric.Counter + SplitQueuePurgatory *metric.Gauge TimeSeriesMaintenanceQueueSuccesses *metric.Counter TimeSeriesMaintenanceQueueFailures *metric.Counter TimeSeriesMaintenanceQueuePending *metric.Gauge @@ -824,6 +828,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { SplitQueueFailures: metric.NewCounter(metaSplitQueueFailures), SplitQueuePending: metric.NewGauge(metaSplitQueuePending), SplitQueueProcessingNanos: metric.NewCounter(metaSplitQueueProcessingNanos), + SplitQueuePurgatory: metric.NewGauge(metaSplitQueuePurgatory), TimeSeriesMaintenanceQueueSuccesses: metric.NewCounter(metaTimeSeriesMaintenanceQueueFailures), TimeSeriesMaintenanceQueueFailures: metric.NewCounter(metaTimeSeriesMaintenanceQueueSuccesses), TimeSeriesMaintenanceQueuePending: metric.NewGauge(metaTimeSeriesMaintenanceQueuePending), diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index a953939978ec..a6189f2fc3ae 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -312,6 +312,11 @@ func (bq *baseQueue) Length() int { // PurgatoryLength returns the current size of purgatory. func (bq *baseQueue) PurgatoryLength() int { + // Lock processing while measuring the purgatory length. This ensures that + // no purgatory replicas are concurrently being processed, during which time + // they are removed from bq.mu.purgatory even though they may be re-added. + defer bq.lockProcessing()() + bq.mu.Lock() defer bq.mu.Unlock() return len(bq.mu.purgatory) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 921f0d93d90e..8fcb898e4c5f 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -716,11 +716,12 @@ func runCommitTrigger( // AdminSplit divides the range into into two ranges using args.SplitKey. func (r *Replica) AdminSplit( ctx context.Context, args roachpb.AdminSplitRequest, -) (reply roachpb.AdminSplitResponse, pErr *roachpb.Error) { +) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) { if len(args.SplitKey) == 0 { return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided") } + var lastErr error retryOpts := base.DefaultRetryOptions() retryOpts.MaxRetries = 10 for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); { @@ -729,22 +730,22 @@ func (r *Replica) AdminSplit( // Without the lease, a replica's local descriptor can be arbitrarily // stale, which will result in a ConditionFailedError. To avoid this, // we make sure that we still have the lease before each attempt. - if _, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { + if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil { return roachpb.AdminSplitResponse{}, pErr } - reply, _, pErr = r.adminSplitWithDescriptor(ctx, args, r.Desc()) + reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc()) // On seeing a ConditionFailedError or an AmbiguousResultError, retry the // command with the updated descriptor. - switch pErr.GetDetail().(type) { + switch errors.Cause(lastErr).(type) { case *roachpb.ConditionFailedError: case *roachpb.AmbiguousResultError: default: - return reply, pErr + return reply, roachpb.NewError(lastErr) } } // If we broke out of the loop after MaxRetries, return the last error. - return roachpb.AdminSplitResponse{}, pErr + return roachpb.AdminSplitResponse{}, roachpb.NewError(lastErr) } func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (string, bool) { @@ -782,7 +783,7 @@ func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (stri // See the comment on splitTrigger for details on the complexities. func (r *Replica) adminSplitWithDescriptor( ctx context.Context, args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor, -) (_ roachpb.AdminSplitResponse, validSplitKey bool, _ *roachpb.Error) { +) (roachpb.AdminSplitResponse, error) { var reply roachpb.AdminSplitResponse // Determine split key if not provided with args. This scan is @@ -805,54 +806,54 @@ func (r *Replica) adminSplitWithDescriptor( foundSplitKey, err = engine.MVCCFindSplitKey( ctx, r.store.engine, desc.StartKey, desc.EndKey, targetSize, allowMeta2Splits) if err != nil { - return reply, false, roachpb.NewErrorf("unable to determine split key: %s", err) + return reply, errors.Errorf("unable to determine split key: %s", err) } if foundSplitKey == nil { // No suitable split key could be found. - return reply, false, nil + return reply, unsplittableRangeError{} } } else { // If the key that routed this request to this range is now out of this // range's bounds, return an error for the client to try again on the // correct range. if !containsKey(*desc, args.Span.Key) { - return reply, false, - roachpb.NewError(roachpb.NewRangeKeyMismatchError(args.Span.Key, args.Span.Key, desc)) + return reply, roachpb.NewRangeKeyMismatchError(args.Span.Key, args.Span.Key, desc) } foundSplitKey = args.SplitKey } if !containsKey(*desc, foundSplitKey) { - return reply, false, - roachpb.NewErrorf("requested split key %s out of bounds of %s", args.SplitKey, r) + return reply, errors.Errorf("requested split key %s out of bounds of %s", args.SplitKey, r) } var err error splitKey, err = keys.Addr(foundSplitKey) if err != nil { - return reply, false, roachpb.NewError(err) + return reply, err } if !splitKey.Equal(foundSplitKey) { - return reply, false, roachpb.NewErrorf("cannot split range at range-local key %s", splitKey) + return reply, errors.Errorf("cannot split range at range-local key %s", splitKey) } if !engine.IsValidSplitKey(foundSplitKey, allowMeta2Splits) { - return reply, false, roachpb.NewErrorf("cannot split range at key %s", splitKey) + return reply, errors.Errorf("cannot split range at key %s", splitKey) } } // If the range starts at the splitKey, we treat the AdminSplit // as a no-op and return success instead of throwing an error. if desc.StartKey.Equal(splitKey) { + if len(args.SplitKey) == 0 { + log.Fatal(ctx, "MVCCFindSplitKey returned start key of range") + } log.Event(ctx, "range already split") - return reply, false, nil + return reply, nil } log.Event(ctx, "found split key") // Create right hand side range descriptor with the newly-allocated Range ID. rightDesc, err := r.store.NewRangeDescriptor(ctx, splitKey, desc.EndKey, desc.Replicas) if err != nil { - return reply, true, - roachpb.NewErrorf("unable to allocate right hand side range descriptor: %s", err) + return reply, errors.Errorf("unable to allocate right hand side range descriptor: %s", err) } // Init updated version of existing range descriptor. @@ -937,15 +938,12 @@ func (r *Replica) adminSplitWithDescriptor( // range descriptors are picked outside the transaction. Return // ConditionFailedError in the error detail so that the command can be // retried. - pErr := roachpb.NewError(err) if msg, ok := maybeDescriptorChangedError(desc, err); ok { - pErr.Message = fmt.Sprintf("split at key %s failed: %s", splitKey, msg) - } else { - pErr.Message = fmt.Sprintf("split at key %s failed: %s", splitKey, err) + err = errors.Wrap(err, msg) } - return reply, true, pErr + return reply, errors.Wrapf(err, "split at key %s failed", splitKey) } - return reply, true, nil + return reply, nil } // splitTrigger is called on a successful commit of a transaction diff --git a/pkg/storage/split_queue.go b/pkg/storage/split_queue.go index 76b3ad3f16d8..d040d1d0c6b6 100644 --- a/pkg/storage/split_queue.go +++ b/pkg/storage/split_queue.go @@ -32,6 +32,13 @@ const ( // splitQueueTimerDuration is the duration between splits of queued ranges. splitQueueTimerDuration = 0 // zero duration to process splits greedily. + // splitQueuePurgatoryCheckInterval is the interval at which replicas in + // purgatory make split attempts. Purgatory is used by the splitQueue to + // store ranges that are large enough to require a split but are + // unsplittable because they do not contain a suitable split key. Purgatory + // prevents them from repeatedly attempting to split at an unbounded rate. + splitQueuePurgatoryCheckInterval = 1 * time.Minute + // splits should be relatively isolated, other than requiring expensive // RocksDB scans over part of the splitting range to recompute stats. We // allow a limitted number of splits to be processed at once. @@ -42,13 +49,23 @@ const ( // or along intersecting zone config boundaries. type splitQueue struct { *baseQueue - db *client.DB + db *client.DB + purgChan <-chan time.Time } // newSplitQueue returns a new instance of splitQueue. func newSplitQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *splitQueue { + var purgChan <-chan time.Time + if c := store.TestingKnobs().SplitQueuePurgatoryChan; c != nil { + purgChan = c + } else { + purgTicker := time.NewTicker(splitQueuePurgatoryCheckInterval) + purgChan = purgTicker.C + } + sq := &splitQueue{ - db: db, + db: db, + purgChan: purgChan, } sq.baseQueue = newBaseQueue( "split", sq, store, gossip, @@ -62,6 +79,7 @@ func newSplitQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *splitQue failures: store.metrics.SplitQueueFailures, pending: store.metrics.SplitQueuePending, processingNanos: store.metrics.SplitQueueProcessingNanos, + purgatory: store.metrics.SplitQueuePurgatory, }, ) return sq @@ -89,6 +107,15 @@ func (sq *splitQueue) shouldQueue( return } +// unsplittableRangeError indicates that a split attempt failed because a no +// suitable split key could be found. +type unsplittableRangeError struct{} + +func (unsplittableRangeError) Error() string { return "could not find valid split key" } +func (unsplittableRangeError) purgatoryErrorMarker() {} + +var _ purgatoryError = unsplittableRangeError{} + // process synchronously invokes admin split for each proposed split key. func (sq *splitQueue) process(ctx context.Context, r *Replica, sysCfg config.SystemConfig) error { err := sq.processAttempt(ctx, r, sysCfg) @@ -113,7 +140,7 @@ func (sq *splitQueue) processAttempt( // First handle case of splitting due to zone config maps. desc := r.Desc() if splitKey := sysCfg.ComputeSplitKey(desc.StartKey, desc.EndKey); splitKey != nil { - if _, _, pErr := r.adminSplitWithDescriptor( + if _, err := r.adminSplitWithDescriptor( ctx, roachpb.AdminSplitRequest{ Span: roachpb.Span{ @@ -122,8 +149,8 @@ func (sq *splitQueue) processAttempt( SplitKey: splitKey.AsRawKey(), }, desc, - ); pErr != nil { - return errors.Wrapf(pErr.GoError(), "unable to split %s at key %q", r, splitKey) + ); err != nil { + return errors.Wrapf(err, "unable to split %s at key %q", r, splitKey) } return nil } @@ -134,27 +161,12 @@ func (sq *splitQueue) processAttempt( size := r.GetMVCCStats().Total() maxBytes := r.GetMaxBytes() if maxBytes > 0 && float64(size)/float64(maxBytes) > 1 { - if _, validSplitKey, pErr := r.adminSplitWithDescriptor( + _, err := r.adminSplitWithDescriptor( ctx, roachpb.AdminSplitRequest{}, desc, - ); pErr != nil { - return pErr.GoError() - } else if !validSplitKey { - // If we couldn't find a split key, set the max-bytes for the range to - // double its current size to prevent future attempts to split the range - // until it grows again. - newMaxBytes := size * 2 - r.SetMaxBytes(newMaxBytes) - log.VEventf(ctx, 2, "couldn't find valid split key, growing max bytes to %d", newMaxBytes) - } else { - // We successfully split the range, reset max-bytes to the zone setting. - zone, err := sysCfg.GetZoneConfigForKey(desc.StartKey) - if err != nil { - return err - } - r.SetMaxBytes(zone.RangeMaxBytes) - } + ) + return err } return nil } @@ -164,7 +176,7 @@ func (*splitQueue) timer(_ time.Duration) time.Duration { return splitQueueTimerDuration } -// purgatoryChan returns nil. -func (*splitQueue) purgatoryChan() <-chan time.Time { - return nil +// purgatoryChan returns the split queue's purgatory channel. +func (sq *splitQueue) purgatoryChan() <-chan time.Time { + return sq.purgChan } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 928813de17ac..4fd73edfd0dd 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -744,6 +744,9 @@ type StoreTestingKnobs struct { // process ranges that need to be split, for use in tests that use // the replication queue but disable the split queue. ReplicateQueueAcceptsUnsplit bool + // SplitQueuePurgatoryChan allows a test to control the channel used to + // trigger split queue purgatory processing. + SplitQueuePurgatoryChan <-chan time.Time // SkipMinSizeCheck, if set, makes the store creation process skip the check // for a minimum size. SkipMinSizeCheck bool