Skip to content

Commit

Permalink
storage: change sticky bit to represent expiration time instead of sp…
Browse files Browse the repository at this point in the history
…lit time

This change will allow more granular control over how long splits last
for a particular range.

AdminSplit now takes an hlc timestamp that represents when the split
should expire. After expiration, the split is eligible for automatic
merging.

If a range should immediately be eligible (I.E. split by split queue),
its expiration time is set to the zero timestamp, and if a range should
never be split (I.E. manually split with no expiration specified), its
expiration time is set to timestamp.MaxTimestamp.

Release note: None
  • Loading branch information
jeffrey-xiao committed Jun 6, 2019
1 parent 9f38ee4 commit 90e8985
Show file tree
Hide file tree
Showing 40 changed files with 1,464 additions and 1,436 deletions.
3 changes: 0 additions & 3 deletions c-deps/libroach/protos/roachpb/data.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions c-deps/libroach/protos/roachpb/data.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions c-deps/libroach/protos/roachpb/metadata.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions c-deps/libroach/protos/roachpb/metadata.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of the first
// entry in the _next_ chunk.
log.VEventf(restoreCtx, 1, "presplitting chunk %d of %d", idx, len(importSpanChunks))
if err := db.AdminSplit(ctx, chunkKey, chunkKey, false /* manual */); err != nil {
if err := db.AdminSplit(ctx, chunkKey, chunkKey, hlc.Timestamp{} /* expirationTime */); err != nil {
return err
}

Expand Down Expand Up @@ -849,7 +849,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of
// the _next_ entry.
log.VEventf(restoreCtx, 1, "presplitting %d of %d", idx, len(importSpans))
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, false /* manual */); err != nil {
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, hlc.Timestamp{} /* expirationTime */); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/tpcc"
Expand Down Expand Up @@ -125,7 +126,7 @@ func benchmarkAddSSTable(b *testing.B, dir string, tables []tableSSTable) {
}}
s, _, kvDB := serverutils.StartServer(b, args)
for _, t := range tables {
if err := kvDB.AdminSplit(ctx, t.span.Key, t.span.Key, false /* manual */); err != nil {
if err := kvDB.AdminSplit(ctx, t.span.Key, t.span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
b.Fatal(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
if cp.spec.IngestDirectly {
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.AllIndexSpans() {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, false /* manual */); err != nil {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/diskmap"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -176,7 +177,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
end = sst.span.EndKey
}

if err := sp.db.AdminSplit(ctx, end, end, false /* manual */); err != nil {
if err := sp.db.AdminSplit(ctx, end, end, hlc.Timestamp{} /* expirationTime */); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ func runTestImport(t *testing.T, init func(*cluster.Settings)) {
t.Fatalf("failed to rewrite key: %s", reqMidKey2)
}

if err := kvDB.AdminSplit(ctx, reqMidKey1, reqMidKey1, true /* manual */); err != nil {
if err := kvDB.AdminSplit(ctx, reqMidKey1, reqMidKey1, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
if err := kvDB.AdminSplit(ctx, reqMidKey2, reqMidKey2, true /* manual */); err != nil {
if err := kvDB.AdminSplit(ctx, reqMidKey2, reqMidKey2, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -586,7 +587,7 @@ func (b *Batch) adminMerge(key interface{}) {

// adminSplit is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminSplit(spanKeyIn, splitKeyIn interface{}, manual bool) {
func (b *Batch) adminSplit(spanKeyIn, splitKeyIn interface{}, expirationTime hlc.Timestamp) {
spanKey, err := marshalKey(spanKeyIn)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -601,8 +602,8 @@ func (b *Batch) adminSplit(spanKeyIn, splitKeyIn interface{}, manual bool) {
RequestHeader: roachpb.RequestHeader{
Key: spanKey,
},
SplitKey: splitKey,
Manual: manual,
SplitKey: splitKey,
ExpirationTime: expirationTime,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
14 changes: 9 additions & 5 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,18 @@ func (db *DB) AdminMerge(ctx context.Context, key interface{}) error {
// #16008 for details, and #16344 for the tracking issue to clean this mess up
// properly.
//
// When manual is true, the sticky bit associated with the split range is set.
// Any ranges with the sticky bit set will be skipped by the merge queue when
// scanning for potential ranges to merge.
// expirationTime is the timestamp when the split expires and is eligible for
// automatic merging by the merge queue. To specify that a split should
// immediately be eligible for automatic merging, set expirationTime to
// hlc.Timestamp{} (I.E. the zero timestamp). To specify that a split should
// never be eligible, set expirationTime to hlc.MaxTimestamp.
//
// The keys can be either byte slices or a strings.
func (db *DB) AdminSplit(ctx context.Context, spanKey, splitKey interface{}, manual bool) error {
func (db *DB) AdminSplit(
ctx context.Context, spanKey, splitKey interface{}, expirationTime hlc.Timestamp,
) error {
b := &Batch{}
b.adminSplit(spanKey, splitKey, manual)
b.adminSplit(spanKey, splitKey, expirationTime)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
func setupMultipleRanges(ctx context.Context, db *client.DB, splitAt ...string) error {
// Split the keyspace at the given keys.
for _, key := range splitAt {
if err := db.AdminSplit(ctx, key /* spanKey */, key /* splitKey */, true /* manual */); err != nil {
if err := db.AdminSplit(ctx, key /* spanKey */, key /* splitKey */, hlc.MaxTimestamp /* expirationTime */); err != nil {
return err
}
}
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func TestParallelSender(t *testing.T) {
// Split into multiple ranges.
splitKeys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
for _, key := range splitKeys {
if err := db.AdminSplit(context.TODO(), key, key, true /* manual */); err != nil {
if err := db.AdminSplit(context.TODO(), key, key, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1091,7 +1091,7 @@ func initReverseScanTestEnv(s serverutils.TestServerInterface, t *testing.T) *cl
// ["", "b"),["b", "e") ,["e", "g") and ["g", "\xff\xff").
for _, key := range []string{"b", "e", "g"} {
// Split the keyspace at the given key.
if err := db.AdminSplit(context.TODO(), key, key, true /* manual */); err != nil {
if err := db.AdminSplit(context.TODO(), key, key, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1530,7 +1530,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {
// Split first using the default client and scan to make sure that
// the range descriptor cache reflects the split.
for _, key := range []string{"b", "f"} {
if err := db.AdminSplit(context.TODO(), key, key, true /* manual */); err != nil {
if err := db.AdminSplit(context.TODO(), key, key, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
}
Expand All @@ -1556,6 +1556,7 @@ func TestBatchPutWithConcurrentSplit(t *testing.T) {
Key: roachpb.Key(key),
},
SplitKey: roachpb.Key(key),
ExpirationTime: hlc.MaxTimestamp,
}
if _, err := client.SendWrapped(context.Background(), ds, req); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1584,7 +1585,7 @@ func TestReverseScanWithSplitAndMerge(t *testing.T) {

// Case 1: An encounter with a range split.
// Split the range ["b", "e") at "c".
if err := db.AdminSplit(context.TODO(), "c", "c", true /* manual */); err != nil {
if err := db.AdminSplit(context.TODO(), "c", "c", hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestRangeSplitMeta(t *testing.T) {
for _, splitRKey := range splitKeys {
splitKey := roachpb.Key(splitRKey)
log.Infof(ctx, "starting split at key %q...", splitKey)
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
log.Infof(ctx, "split at key %q complete", splitKey)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestRangeSplitsWithConcurrentTxns(t *testing.T) {
<-txnChannel
}
log.Infof(ctx, "starting split at key %q...", splitKey)
if pErr := s.DB.AdminSplit(context.TODO(), splitKey, splitKey, true /* manual */); pErr != nil {
if pErr := s.DB.AdminSplit(context.TODO(), splitKey, splitKey, hlc.MaxTimestamp /* expirationTime */); pErr != nil {
t.Error(pErr)
}
log.Infof(ctx, "split at key %q complete", splitKey)
Expand Down Expand Up @@ -249,19 +249,19 @@ func TestRangeSplitsWithSameKeyTwice(t *testing.T) {

splitKey := roachpb.Key("aa")
log.Infof(ctx, "starting split at key %q...", splitKey)
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
log.Infof(ctx, "split at key %q first time complete", splitKey)
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey, splitKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
}

// TestSplitStickyBit checks that the sticky bit is set when performing a manual
// split. There are two cases to consider:
// 1. Range is split so sticky bit is set on RHS.
// 2. Range is already split and split key is the start key of a range, so set
// 1. Range is split so sticky bit is updated on RHS.
// 2. Range is already split and split key is the start key of a range, so update
// the sticky bit of that range, but no range is split.
func TestRangeSplitsStickyBit(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -277,7 +277,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
descKey := keys.RangeDescriptorKey(splitKey)

// Splitting range.
if err := s.DB.AdminSplit(ctx, splitKey.AsRawKey(), splitKey.AsRawKey(), true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey.AsRawKey(), splitKey.AsRawKey(), hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}

Expand All @@ -287,7 +287,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if desc.StickyBit == nil {
if (desc.StickyBit == hlc.Timestamp{}) {
t.Fatal("Sticky bit not set after splitting")
}

Expand All @@ -297,7 +297,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
}

// Splitting range.
if err := s.DB.AdminSplit(ctx, splitKey.AsRawKey(), splitKey.AsRawKey(), true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey.AsRawKey(), splitKey.AsRawKey(), hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}

Expand All @@ -306,7 +306,7 @@ func TestRangeSplitsStickyBit(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if desc.StickyBit == nil {
if (desc.StickyBit == hlc.Timestamp{}) {
t.Fatal("Sticky bit not set after splitting")
}
}
2 changes: 1 addition & 1 deletion pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) {
keyA := roachpb.Key("a")
keyC := roachpb.Key("c")
splitKey := roachpb.Key("b")
if err := s.DB.AdminSplit(ctx, splitKey /* spanKey */, splitKey /* splitKey */, true /* manual */); err != nil {
if err := s.DB.AdminSplit(ctx, splitKey /* spanKey */, splitKey /* splitKey */, hlc.MaxTimestamp /* expirationTimestamp */); err != nil {
t.Fatal(err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/tscache"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/localtestcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -406,7 +407,7 @@ func TestTxnRepeatGetWithRangeSplit(t *testing.T) {
}
s.Manual.Increment(time.Second.Nanoseconds())
// Split range by keyB.
if err := s.DB.AdminSplit(context.TODO(), splitKey, splitKey, true /* manual */); err != nil {
if err := s.DB.AdminSplit(context.TODO(), splitKey, splitKey, hlc.MaxTimestamp /* expirationTime */); err != nil {
t.Fatal(err)
}
// Wait till split complete.
Expand Down
Loading

0 comments on commit 90e8985

Please sign in to comment.