diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6eaac8b9892c..f34f3cfc1527 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -268,7 +268,7 @@ func inputReader( type mergedSST struct { entry execinfrapb.RestoreSpanEntry - iter storage.SimpleMVCCIterator + iter *storage.ReadAsOfIterator cleanup func() } @@ -300,8 +300,10 @@ func (rd *restoreDataProcessor) openSSTs( // channel. sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { multiIter := storage.MakeMultiIterator(itersToSend) + readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) cleanup := func() { + readAsOfIter.Close() multiIter.Close() for _, iter := range itersToSend { iter.Close() @@ -316,7 +318,7 @@ func (rd *restoreDataProcessor) openSSTs( mSST := mergedSST{ entry: entry, - iter: multiIter, + iter: readAsOfIter, cleanup: cleanup, } @@ -449,39 +451,21 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} - for iter.SeekGE(startKeyMVCC); ; { + for iter.SeekGE(startKeyMVCC); ; iter.NextKey() { ok, err := iter.Valid() if err != nil { return summary, err } - if !ok { - break - } - - if !rd.spec.RestoreTime.IsEmpty() { - // TODO(dan): If we have to skip past a lot of versions to find the - // latest one before args.EndTime, then this could be slow. - if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) { - iter.Next() - continue - } - } if !ok || !iter.UnsafeKey().Less(endKeyMVCC) { break } - if len(iter.UnsafeValue()) == 0 { - // Value is deleted. - iter.NextKey() - continue - } key := iter.UnsafeKey() keyScratch = append(keyScratch[:0], key.Key...) key.Key = keyScratch valueScratch = append(valueScratch[:0], iter.UnsafeValue()...) value := roachpb.Value{RawBytes: valueScratch} - iter.NextKey() key.Key, ok, err = kr.RewriteKey(key.Key) if err != nil { diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 8f54ee576591..2a8ec647e8e6 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "pebble_iterator.go", "pebble_merge.go", "pebble_mvcc_scanner.go", + "read_as_of_iterator.go", "replicas_storage.go", "resource_limiter.go", "row_counter.go", @@ -116,6 +117,7 @@ go_test( "pebble_file_registry_test.go", "pebble_mvcc_scanner_test.go", "pebble_test.go", + "read_as_of_iterator_test.go", "resource_limiter_test.go", "sst_iterator_test.go", "sst_test.go", diff --git a/pkg/storage/multi_iterator_test.go b/pkg/storage/multi_iterator_test.go index aa84e9acb46e..b6ed9456a2dc 100644 --- a/pkg/storage/multi_iterator_test.go +++ b/pkg/storage/multi_iterator_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" ) func TestMultiIterator(t *testing.T) { @@ -37,13 +38,6 @@ func TestMultiIterator(t *testing.T) { // MultiIterator, which is fully iterated (using either NextKey or Next) and // turned back into a string in the same format as `input`. This is compared // to expectedNextKey or expectedNext. - // - // Input is a string containing key, timestamp, value tuples: first a single - // character key, then a single character timestamp walltime. If the - // character after the timestamp is an M, this entry is a "metadata" key - // (timestamp=0, sorts before any non-0 timestamp, and no value). If the - // character after the timestamp is an X, this entry is a deletion - // tombstone. Otherwise the value is the same as the timestamp. tests := []struct { inputs []string expectedNextKey string @@ -86,73 +80,84 @@ func TestMultiIterator(t *testing.T) { for _, input := range test.inputs { batch := pebble.NewBatch() defer batch.Close() - for i := 0; ; { - if i == len(input) { - break - } - k := []byte{input[i]} - ts := hlc.Timestamp{WallTime: int64(input[i+1])} - var v []byte - if i+1 < len(input) && input[i+1] == 'M' { - ts = hlc.Timestamp{} - v = nil - } else if i+2 < len(input) && input[i+2] == 'X' { - v = nil - i++ - } else { - v = []byte{input[i+1]} - } - i += 2 - if ts.IsEmpty() { - if err := batch.PutUnversioned(k, v); err != nil { - t.Fatalf("%+v", err) - } - } else { - if err := batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v); err != nil { - t.Fatalf("%+v", err) - } - } - } + populateBatch(t, batch, input) iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) defer iter.Close() iters = append(iters, iter) } - subtests := []struct { - name string - expected string - fn func(SimpleMVCCIterator) - }{ + subtests := []iterSubtest{ {"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey}, {"Next", test.expectedNext, (SimpleMVCCIterator).Next}, } for _, subtest := range subtests { t.Run(subtest.name, func(t *testing.T) { - var output bytes.Buffer it := MakeMultiIterator(iters) - for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) { - ok, err := it.Valid() - if err != nil { - t.Fatalf("unexpected error: %+v", err) - } - if !ok { - break - } - output.Write(it.UnsafeKey().Key) - if it.UnsafeKey().Timestamp.IsEmpty() { - output.WriteRune('M') - } else { - output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) - if len(it.UnsafeValue()) == 0 { - output.WriteRune('X') - } - } - } - if actual := output.String(); actual != subtest.expected { - t.Errorf("got %q expected %q", actual, subtest.expected) - } + iterateSimpleMultiIter(t, it, subtest) }) } }) } } + +// populateBatch populates a pebble batch with a series of MVCC key values. +// input is a string containing key, timestamp, value tuples: first a single +// character key, then a single character timestamp walltime. If the +// character after the timestamp is an M, this entry is a "metadata" key +// (timestamp=0, sorts before any non-0 timestamp, and no value). If the +// character after the timestamp is an X, this entry is a deletion +// tombstone. Otherwise the value is the same as the timestamp. +func populateBatch(t *testing.T, batch Batch, input string) { + for i := 0; ; { + if i == len(input) { + break + } + k := []byte{input[i]} + ts := hlc.Timestamp{WallTime: int64(input[i+1])} + var v []byte + if i+1 < len(input) && input[i+1] == 'M' { + ts = hlc.Timestamp{} + v = nil + } else if i+2 < len(input) && input[i+2] == 'X' { + v = nil + i++ + } else { + v = []byte{input[i+1]} + } + i += 2 + if ts.IsEmpty() { + require.NoError(t, batch.PutUnversioned(k, v)) + } else { + require.NoError(t, batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v)) + } + } +} + +type iterSubtest struct { + name string + expected string + fn func(SimpleMVCCIterator) +} + +// iterateSimpleMultiIter iterates through a simpleMVCCIterator for expected values, +// and assumes that populateBatch populated the keys for the iterator. +func iterateSimpleMultiIter(t *testing.T, it SimpleMVCCIterator, subtest iterSubtest) { + var output bytes.Buffer + for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + output.Write(it.UnsafeKey().Key) + if it.UnsafeKey().Timestamp.IsEmpty() { + output.WriteRune('M') + } else { + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + if len(it.UnsafeValue()) == 0 { + output.WriteRune('X') + } + } + } + require.Equal(t, subtest.expected, output.String()) +} diff --git a/pkg/storage/read_as_of_iterator.go b/pkg/storage/read_as_of_iterator.go new file mode 100644 index 000000000000..55a2c948c4f8 --- /dev/null +++ b/pkg/storage/read_as_of_iterator.go @@ -0,0 +1,118 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "github.com/cockroachdb/cockroach/pkg/util/hlc" + +// ReadAsOfIterator wraps a SimpleMVCCIterator and only surfaces the latest +// valid key of a given MVCC key that is also below the asOf timestamp, if set. +// Further, the iterator does not surface delete tombstones, nor any MVCC keys +// shadowed by delete tombstones below the asOf timestamp, if set. The iterator +// assumes that it will not encounter any write intents. +type ReadAsOfIterator struct { + iter SimpleMVCCIterator + + // asOf is the latest timestamp of a key surfaced by the iterator. + asOf hlc.Timestamp +} + +var _ SimpleMVCCIterator = &ReadAsOfIterator{} + +// Close closes the underlying iterator. +func (f *ReadAsOfIterator) Close() { + f.iter.Close() +} + +// SeekGE advances the iterator to the first key in the engine which is >= the +// provided key that obeys the ReadAsOfIterator key constraints. +func (f *ReadAsOfIterator) SeekGE(originalKey MVCCKey) { + // To ensure SeekGE seeks to a key that isn't shadowed by a tombstone that the + // ReadAsOfIterator would have skipped (i.e. a tombstone below asOf), seek to + // the key with the latest possible timestamp that the iterator could surface + // (i.e. asOf, if set) and iterate to the next valid key at or below the caller's + // key that also obeys the iterator's constraints. + synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} + f.iter.SeekGE(synthetic) + + if ok := f.advance(); ok && f.UnsafeKey().Less(originalKey) { + // The following is true: + // originalKey.Key == f.UnsafeKey && + // f.asOf timestamp (if set) >= current timestamp > originalKey timestamp. + // + // This implies the caller is seeking to a key that is shadowed by a valid + // key that obeys the iterator 's constraints. The caller's key is NOT the + // latest key of the given MVCC key; therefore, skip to the next MVCC key. + f.NextKey() + } +} + +// Valid implements the simpleMVCCIterator. +func (f *ReadAsOfIterator) Valid() (bool, error) { + return f.iter.Valid() +} + +// Next advances the iterator to the next valid MVCC key obeying the iterator's +// constraints. Note that Next and NextKey have the same implementation because +// the iterator only surfaces the latest valid key of a given MVCC key below the +// asOf timestamp. +func (f *ReadAsOfIterator) Next() { + f.NextKey() +} + +// NextKey advances the iterator to the next valid MVCC key obeying the +// iterator's constraints. NextKey() is only guaranteed to surface a key that +// obeys the iterator's constraints if the iterator was already on a key that +// obeys the constraints. To ensure this, initialize the iterator with a SeekGE +// call before any calls to NextKey(). +func (f *ReadAsOfIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +// UnsafeValue returns the current value as a byte slice, but the memory is +// invalidated on the next call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeValue() []byte { + return f.iter.UnsafeValue() +} + +// advance moves past keys with timestamps later than f.asOf and skips MVCC keys +// whose latest value (subject to f.asOF) has been deleted. Note that advance +// moves past keys above asOF before evaluating tombstones, implying the +// iterator will never call f.iter.NextKey() on a tombstone with a timestamp +// later than f.asOF. +func (f *ReadAsOfIterator) advance() bool { + for { + if ok, err := f.Valid(); err != nil || !ok { + // No valid keys found. + return false + } else if !f.asOf.IsEmpty() && f.asOf.Less(f.iter.UnsafeKey().Timestamp) { + // Skip keys above the asOf timestamp. + f.iter.Next() + } else if len(f.iter.UnsafeValue()) == 0 { + // Skip to the next MVCC key if we find a tombstone. + f.iter.NextKey() + } else { + // On a valid key. + return true + } + } +} + +// NewReadAsOfIterator constructs a ReadAsOfIterator. +func NewReadAsOfIterator(iter SimpleMVCCIterator, asOf hlc.Timestamp) *ReadAsOfIterator { + return &ReadAsOfIterator{iter: iter, asOf: asOf} +} diff --git a/pkg/storage/read_as_of_iterator_test.go b/pkg/storage/read_as_of_iterator_test.go new file mode 100644 index 000000000000..71af34d4e0b7 --- /dev/null +++ b/pkg/storage/read_as_of_iterator_test.go @@ -0,0 +1,186 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +type asOfTest struct { + input string + expectedNextKey string + asOf string +} + +func TestReadAsOfIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + // The test turns each `input` into a batch for the readAsOfIterator, fully + // iterates the iterator, and puts the surfaced keys into a string in the same + // format as `input`. The test then compares the output to 'expectedNextKey'. + // The 'asOf' field represents the wall time of the hlc.Timestamp for the + // readAsOfIterator. + tests := []asOfTest{ + // Ensure nextkey works as expected. + {input: "b1c1", expectedNextKey: "b1c1", asOf: ""}, + {input: "b2b1", expectedNextKey: "b2", asOf: ""}, + + // Ensure AOST is an inclusive upper bound. + {input: "b1", expectedNextKey: "b1", asOf: "1"}, + {input: "b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip within keys. + {input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip across keys. + {input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, + + // Ensure next key captures at most one mvcc key per key after an asOf skip. + {input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, + + // Ensure an AOST 'next' takes precedence over a tombstone 'nextkey'. + {input: "b2Xb1c1", expectedNextKey: "c1", asOf: ""}, + {input: "b2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, + + // Ensure clean iteration over double tombstone. + {input: "a1Xb2Xb1c1", expectedNextKey: "c1", asOf: ""}, + {input: "a1Xb2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, + + // Ensure tombstone is skipped after an AOST skip. + {input: "b3c2Xc1d1", expectedNextKey: "d1", asOf: "2"}, + {input: "b3c2Xc1d1", expectedNextKey: "c1d1", asOf: "1"}, + + // Ensure key before delete tombstone gets read if under AOST. + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, + {input: "b2b1Xc1", expectedNextKey: "c1", asOf: "1"}, + } + + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + subtests := []iterSubtest{ + {"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey}, + } + for _, subtest := range subtests { + t.Run(subtest.name, func(t *testing.T) { + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewReadAsOfIterator(iter, asOf) + iterateSimpleMultiIter(t, it, subtest) + }) + } + }) + } +} + +func TestReadAsOfIteratorSeek(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + tests := []struct { + input string + seekKey string + expected string + asOf string + }{ + // Ensure vanilla seek works. + {"a1b1", "a1", "a1", ""}, + + // Ensure seek always returns the latest key of an MVCC key. + {"a2a1b1", "a1", "b1", ""}, + {"a2a1b1", "a1", "b1", "2"}, + {"a2a1b1", "a1", "a1", "1"}, + + // Ensure out of bounds seek fails gracefully. + {"a1", "b1", "notOK", ""}, + + // Ensure the asOf timestamp moves the iterator during a seek. + {"a2a1", "a2", "a1", "1"}, + {"a2b1", "a2", "b1", "1"}, + + // Ensure seek does not return on a tombstone. + {"a3Xa1b1", "a3", "b1", ""}, + + // Ensure seek does not return on a key shadowed by a tombstone. + {"a3Xa2a1b1", "a2", "b1", ""}, + {"a3Xa2a1b1", "a2", "b1", "3"}, + {"a3a2Xa1b1", "a1", "b1", ""}, + {"a3a2Xa1b2Xb1c1", "a1", "c1", ""}, + + // Ensure we can seek to a key right before a tombstone. + {"a2Xa1b2b1Xc1", "a1", "b2", ""}, + + // Ensure AOST 'next' takes precendence over tombstone 'nextkey'. + {"a4a3Xa1b1", "a3", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a2", "2"}, + } + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewReadAsOfIterator(iter, asOf) + var output bytes.Buffer + + seekKey := MVCCKey{ + Key: []byte{test.seekKey[0]}, + Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, + } + it.SeekGE(seekKey) + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + if test.expected == "notOK" { + return + } + require.NoError(t, err, "seek not ok") + } + output.Write(it.UnsafeKey().Key) + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + require.Equal(t, test.expected, output.String()) + }) + } +} diff --git a/pkg/storage/sst_iterator_test.go b/pkg/storage/sst_iterator_test.go index 69870d293c10..06163f71e24d 100644 --- a/pkg/storage/sst_iterator_test.go +++ b/pkg/storage/sst_iterator_test.go @@ -83,7 +83,8 @@ func TestSSTIterator(t *testing.T) { sst := MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() var allKVs []MVCCKeyValue - for i := 0; i < 10; i++ { + maxWallTime := 10 + for i := 0; i < maxWallTime; i++ { kv := MVCCKeyValue{ Key: MVCCKey{ Key: []byte{'A' + byte(i)}, @@ -129,4 +130,26 @@ func TestSSTIterator(t *testing.T) { defer iter.Close() runTestSSTIterator(t, iter, allKVs) }) + t.Run("AsOf", func(t *testing.T) { + iter, err := NewMemSSTIterator(sstFile.Data(), false) + if err != nil { + t.Fatalf("%+v", err) + } + defer iter.Close() + asOfTimes := []hlc.Timestamp{ + {WallTime: int64(maxWallTime / 2)}, + {WallTime: int64(maxWallTime)}, + {}} + for _, asOf := range asOfTimes { + var asOfKVs []MVCCKeyValue + for _, kv := range allKVs { + if !asOf.IsEmpty() && asOf.Less(kv.Key.Timestamp) { + continue + } + asOfKVs = append(asOfKVs, kv) + } + asOfIter := NewReadAsOfIterator(iter, asOf) + runTestSSTIterator(t, asOfIter, asOfKVs) + } + }) }