Skip to content

Commit

Permalink
storage: add unsplittable ranges to split queue purgatory
Browse files Browse the repository at this point in the history
In cockroachdb#14654 we added a mechanism to double the max range size whenever a
split attempt found a range that was unsplittable. This prevented a
tight loop of split attempts. However, it didn't actually do anything to
help us find a split point to reduce the size of the range in the
future. This size doubling worked in practice, but it was a blunt
instrument that had strange effects (see cockroachdb#24215).

This change rips out this range size doubling and replaces it with a
split queue purgatory. This purgatory is used to house replicas that
are unsplittable, preventing them from getting into a tight loop.

Release note: None
  • Loading branch information
nvanbenschoten committed May 8, 2018
1 parent 3d13d94 commit e495467
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 89 deletions.
98 changes: 60 additions & 38 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2353,67 +2353,89 @@ func TestDistributedTxnCleanup(t *testing.T) {
})
}

// TestUnsplittableRange creates an unsplittable range and tests that
// it is handled correctly by the split queue's purgatory. The test:
// 1. creates an unsplittable range that needs to be split
// 2. makes sure that range enters purgatory
// 3. makes sure a purgatory run still fails
// 4. GCs part of the range so that it no longer needs to be split
// 5. makes sure a purgatory run succeeds and the range leaves purgatory
func TestUnsplittableRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ttl := 1 * time.Hour
const maxBytes = 1 << 16
defer config.TestingSetDefaultZoneConfig(config.ZoneConfig{
RangeMaxBytes: maxBytes,
GC: config.GCPolicy{
TTLSeconds: int32(ttl.Seconds()),
},
})()

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

store := createTestStoreWithConfig(t, stopper, storage.TestStoreConfig(nil))
store.ForceSplitScanAndProcess()
manual := hlc.NewManualClock(123)
splitQueuePurgatoryChan := make(chan time.Time, 1)
cfg := storage.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
cfg.TestingKnobs.SplitQueuePurgatoryChan = splitQueuePurgatoryChan
store := createTestStoreWithConfig(t, stopper, cfg)
if err := server.WaitForInitialSplits(store.DB()); err != nil {
t.Fatal(err)
}

// Add a single large row to /Table/14.
tableKey := keys.MakeTablePrefix(keys.UITableID)
row1Key := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), 1))
col1Key := keys.MakeFamilyKey(append([]byte(nil), row1Key...), 0)
value := bytes.Repeat([]byte("x"), 64<<10)
valueLen := 0.9 * maxBytes
value := bytes.Repeat([]byte("x"), int(valueLen))
if err := store.DB().Put(context.Background(), col1Key, value); err != nil {
t.Fatal(err)
}

store.ForceSplitScanAndProcess()
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(tableKey, nil)
if repl.Desc().StartKey.Equal(tableKey) {
return nil
}
return errors.Errorf("waiting for split: %s", repl)
})

repl := store.LookupReplica(tableKey, nil)
origMaxBytes := repl.GetMaxBytes()
repl.SetMaxBytes(int64(len(value)))
// Wait for half of the ttl and add another large value in the same row.
// Together, these two values bump the range over the max range size.
manual.Increment(ttl.Nanoseconds() / 2)
value2Len := 0.2 * maxBytes
value2 := bytes.Repeat([]byte("y"), int(value2Len))
if err := store.DB().Put(context.Background(), col1Key, value2); err != nil {
t.Fatal(err)
}

// Wait for an attempt to split the range which will fail because it contains
// a single large value. The max-bytes for the range will be changed, but it
// should not have been reset to its original value.
// Ensure that an attempt to split the range will hit an
// unsplittableRangeError and place the range in purgatory.
store.ForceSplitScanAndProcess()
testutils.SucceedsSoon(t, func() error {
maxBytes := repl.GetMaxBytes()
if maxBytes != int64(len(value)) && maxBytes < origMaxBytes {
return nil
}
return errors.Errorf("expected max-bytes to be changed: %d", repl.GetMaxBytes())
})
if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 {
t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen)
}

// Add two more rows to the range.
for i := int64(2); i < 4; i++ {
rowKey := roachpb.Key(encoding.EncodeVarintAscending(append([]byte(nil), tableKey...), i))
colKey := keys.MakeFamilyKey(append([]byte(nil), rowKey...), 0)
if err := store.DB().Put(context.Background(), colKey, value); err != nil {
t.Fatal(err)
}
// Signal the split queue's purgatory channel and ensure that the purgatory
// remains occupied because the range still needs to split but can't.
splitQueuePurgatoryChan <- timeutil.Now()
if purgLen := store.SplitQueuePurgatoryLength(); purgLen != 1 {
t.Fatalf("expected split queue purgatory to contain 1 replica, found %d", purgLen)
}

// Wait for the range to be split and verify that max-bytes was reset to the
// value in the zone config.
store.ForceSplitScanAndProcess()
// Wait for much longer than the ttl to accumulate GCByteAge.
manual.Increment(10 * ttl.Nanoseconds())
// Trigger the GC queue, which should clean up the earlier version of the
// row. Once the first version of the row is cleaned up, the range should
// exit the split queue purgatory.
repl := store.LookupReplica(tableKey, nil)
if err := store.ManualGC(repl); err != nil {
t.Fatal(err)
}

// Signal the split queue's purgatory channel and ensure that the purgatory
// removes its now well-sized replica.
splitQueuePurgatoryChan <- timeutil.Now()
testutils.SucceedsSoon(t, func() error {
if origMaxBytes == repl.GetMaxBytes() {
purgLen := store.SplitQueuePurgatoryLength()
if purgLen == 0 {
return nil
}
return errors.Errorf("expected max-bytes=%d, but got max-bytes=%d",
origMaxBytes, repl.GetMaxBytes())
return errors.Errorf("expected split queue purgatory to be empty, found %d", purgLen)
})
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func (s *Store) ReplicateQueuePurgatoryLength() int {
return s.replicateQueue.PurgatoryLength()
}

// SplitQueuePurgatoryLength returns the number of replicas in split
// queue purgatory.
func (s *Store) SplitQueuePurgatoryLength() int {
return s.splitQueue.PurgatoryLength()
}

// SetRaftLogQueueActive enables or disables the raft log queue.
func (s *Store) SetRaftLogQueueActive(active bool) {
s.setRaftLogQueueActive(active)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ var (
metaSplitQueueProcessingNanos = metric.Metadata{
Name: "queue.split.processingnanos",
Help: "Nanoseconds spent processing replicas in the split queue"}
metaSplitQueuePurgatory = metric.Metadata{
Name: "queue.split.purgatory",
Help: "Number of replicas in the split queue's purgatory, waiting to become splittable"}
metaTimeSeriesMaintenanceQueueSuccesses = metric.Metadata{
Name: "queue.tsmaintenance.process.success",
Help: "Number of replicas successfully processed by the time series maintenance queue"}
Expand Down Expand Up @@ -637,6 +640,7 @@ type StoreMetrics struct {
SplitQueueFailures *metric.Counter
SplitQueuePending *metric.Gauge
SplitQueueProcessingNanos *metric.Counter
SplitQueuePurgatory *metric.Gauge
TimeSeriesMaintenanceQueueSuccesses *metric.Counter
TimeSeriesMaintenanceQueueFailures *metric.Counter
TimeSeriesMaintenanceQueuePending *metric.Gauge
Expand Down Expand Up @@ -824,6 +828,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
SplitQueueFailures: metric.NewCounter(metaSplitQueueFailures),
SplitQueuePending: metric.NewGauge(metaSplitQueuePending),
SplitQueueProcessingNanos: metric.NewCounter(metaSplitQueueProcessingNanos),
SplitQueuePurgatory: metric.NewGauge(metaSplitQueuePurgatory),
TimeSeriesMaintenanceQueueSuccesses: metric.NewCounter(metaTimeSeriesMaintenanceQueueFailures),
TimeSeriesMaintenanceQueueFailures: metric.NewCounter(metaTimeSeriesMaintenanceQueueSuccesses),
TimeSeriesMaintenanceQueuePending: metric.NewGauge(metaTimeSeriesMaintenanceQueuePending),
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ func (bq *baseQueue) Length() int {

// PurgatoryLength returns the current size of purgatory.
func (bq *baseQueue) PurgatoryLength() int {
// Lock processing while measuring the purgatory length. This ensures that
// no purgatory replicas are concurrently being processed, during which time
// they are removed from bq.mu.purgatory even though they may be re-added.
defer bq.lockProcessing()()

bq.mu.Lock()
defer bq.mu.Unlock()
return len(bq.mu.purgatory)
Expand Down
48 changes: 23 additions & 25 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,11 +716,12 @@ func runCommitTrigger(
// AdminSplit divides the range into into two ranges using args.SplitKey.
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest,
) (reply roachpb.AdminSplitResponse, pErr *roachpb.Error) {
) (reply roachpb.AdminSplitResponse, _ *roachpb.Error) {
if len(args.SplitKey) == 0 {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
}

var lastErr error
retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
Expand All @@ -729,22 +730,22 @@ func (r *Replica) AdminSplit(
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this,
// we make sure that we still have the lease before each attempt.
if _, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil {
return roachpb.AdminSplitResponse{}, pErr
}

reply, _, pErr = r.adminSplitWithDescriptor(ctx, args, r.Desc())
reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc())
// On seeing a ConditionFailedError or an AmbiguousResultError, retry the
// command with the updated descriptor.
switch pErr.GetDetail().(type) {
switch errors.Cause(lastErr).(type) {
case *roachpb.ConditionFailedError:
case *roachpb.AmbiguousResultError:
default:
return reply, pErr
return reply, roachpb.NewError(lastErr)
}
}
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.AdminSplitResponse{}, pErr
return roachpb.AdminSplitResponse{}, roachpb.NewError(lastErr)
}

func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (string, bool) {
Expand Down Expand Up @@ -782,7 +783,7 @@ func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (stri
// See the comment on splitTrigger for details on the complexities.
func (r *Replica) adminSplitWithDescriptor(
ctx context.Context, args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor,
) (_ roachpb.AdminSplitResponse, validSplitKey bool, _ *roachpb.Error) {
) (roachpb.AdminSplitResponse, error) {
var reply roachpb.AdminSplitResponse

// Determine split key if not provided with args. This scan is
Expand All @@ -805,54 +806,54 @@ func (r *Replica) adminSplitWithDescriptor(
foundSplitKey, err = engine.MVCCFindSplitKey(
ctx, r.store.engine, desc.StartKey, desc.EndKey, targetSize, allowMeta2Splits)
if err != nil {
return reply, false, roachpb.NewErrorf("unable to determine split key: %s", err)
return reply, errors.Errorf("unable to determine split key: %s", err)
}
if foundSplitKey == nil {
// No suitable split key could be found.
return reply, false, nil
return reply, unsplittableRangeError{}
}
} else {
// If the key that routed this request to this range is now out of this
// range's bounds, return an error for the client to try again on the
// correct range.
if !containsKey(*desc, args.Span.Key) {
return reply, false,
roachpb.NewError(roachpb.NewRangeKeyMismatchError(args.Span.Key, args.Span.Key, desc))
return reply, roachpb.NewRangeKeyMismatchError(args.Span.Key, args.Span.Key, desc)
}
foundSplitKey = args.SplitKey
}

if !containsKey(*desc, foundSplitKey) {
return reply, false,
roachpb.NewErrorf("requested split key %s out of bounds of %s", args.SplitKey, r)
return reply, errors.Errorf("requested split key %s out of bounds of %s", args.SplitKey, r)
}

var err error
splitKey, err = keys.Addr(foundSplitKey)
if err != nil {
return reply, false, roachpb.NewError(err)
return reply, err
}
if !splitKey.Equal(foundSplitKey) {
return reply, false, roachpb.NewErrorf("cannot split range at range-local key %s", splitKey)
return reply, errors.Errorf("cannot split range at range-local key %s", splitKey)
}
if !engine.IsValidSplitKey(foundSplitKey, allowMeta2Splits) {
return reply, false, roachpb.NewErrorf("cannot split range at key %s", splitKey)
return reply, errors.Errorf("cannot split range at key %s", splitKey)
}
}

// If the range starts at the splitKey, we treat the AdminSplit
// as a no-op and return success instead of throwing an error.
if desc.StartKey.Equal(splitKey) {
if len(args.SplitKey) == 0 {
log.Fatal(ctx, "MVCCFindSplitKey returned start key of range")
}
log.Event(ctx, "range already split")
return reply, false, nil
return reply, nil
}
log.Event(ctx, "found split key")

// Create right hand side range descriptor with the newly-allocated Range ID.
rightDesc, err := r.store.NewRangeDescriptor(ctx, splitKey, desc.EndKey, desc.Replicas)
if err != nil {
return reply, true,
roachpb.NewErrorf("unable to allocate right hand side range descriptor: %s", err)
return reply, errors.Errorf("unable to allocate right hand side range descriptor: %s", err)
}

// Init updated version of existing range descriptor.
Expand Down Expand Up @@ -937,15 +938,12 @@ func (r *Replica) adminSplitWithDescriptor(
// range descriptors are picked outside the transaction. Return
// ConditionFailedError in the error detail so that the command can be
// retried.
pErr := roachpb.NewError(err)
if msg, ok := maybeDescriptorChangedError(desc, err); ok {
pErr.Message = fmt.Sprintf("split at key %s failed: %s", splitKey, msg)
} else {
pErr.Message = fmt.Sprintf("split at key %s failed: %s", splitKey, err)
err = errors.Wrap(err, msg)
}
return reply, true, pErr
return reply, errors.Wrapf(err, "split at key %s failed", splitKey)
}
return reply, true, nil
return reply, nil
}

// splitTrigger is called on a successful commit of a transaction
Expand Down
Loading

0 comments on commit e495467

Please sign in to comment.