From cc71281900e3079860cadd0cd67888841dc86e94 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 22 Feb 2022 16:41:20 -0500 Subject: [PATCH] backupccl: create and use readAsOfIterator in RESTORE Previously while ingesting SSTs in the restore processor, `processRestoreSpanEntry` would manually skip delete tombstones and keys with timestamps later than the AS OF SYSTEM TIME timestamp provided in the RESTORE command. This PR wraps the restore processor's iterator with a new readAsOfIterator which abstracts this logic away. The readAsOfIterator wraps a SimpleMVCCIterator ond only surfaces the latest valid key of a given MVCC key that is also below the iterator's AOST timestamp, if set. Further, the iterator does not surface delete tombstones, nor any MVCC keys shadowed by delete tombstones below the AOST timestamp, if set. Fixes #77276 Release note: None --- pkg/ccl/backupccl/restore_data_processor.go | 26 +-- pkg/storage/BUILD.bazel | 2 + pkg/storage/multi_iterator_test.go | 127 ++++++------- pkg/storage/read_as_of_iterator.go | 118 +++++++++++++ pkg/storage/read_as_of_iterator_test.go | 186 ++++++++++++++++++++ pkg/storage/sst_iterator_test.go | 25 ++- 6 files changed, 401 insertions(+), 83 deletions(-) create mode 100644 pkg/storage/read_as_of_iterator.go create mode 100644 pkg/storage/read_as_of_iterator_test.go diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 6eaac8b9892c..f34f3cfc1527 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -268,7 +268,7 @@ func inputReader( type mergedSST struct { entry execinfrapb.RestoreSpanEntry - iter storage.SimpleMVCCIterator + iter *storage.ReadAsOfIterator cleanup func() } @@ -300,8 +300,10 @@ func (rd *restoreDataProcessor) openSSTs( // channel. sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { multiIter := storage.MakeMultiIterator(itersToSend) + readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime) cleanup := func() { + readAsOfIter.Close() multiIter.Close() for _, iter := range itersToSend { iter.Close() @@ -316,7 +318,7 @@ func (rd *restoreDataProcessor) openSSTs( mSST := mergedSST{ entry: entry, - iter: multiIter, + iter: readAsOfIter, cleanup: cleanup, } @@ -449,39 +451,21 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} - for iter.SeekGE(startKeyMVCC); ; { + for iter.SeekGE(startKeyMVCC); ; iter.NextKey() { ok, err := iter.Valid() if err != nil { return summary, err } - if !ok { - break - } - - if !rd.spec.RestoreTime.IsEmpty() { - // TODO(dan): If we have to skip past a lot of versions to find the - // latest one before args.EndTime, then this could be slow. - if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) { - iter.Next() - continue - } - } if !ok || !iter.UnsafeKey().Less(endKeyMVCC) { break } - if len(iter.UnsafeValue()) == 0 { - // Value is deleted. - iter.NextKey() - continue - } key := iter.UnsafeKey() keyScratch = append(keyScratch[:0], key.Key...) key.Key = keyScratch valueScratch = append(valueScratch[:0], iter.UnsafeValue()...) value := roachpb.Value{RawBytes: valueScratch} - iter.NextKey() key.Key, ok, err = kr.RewriteKey(key.Key) if err != nil { diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 8f54ee576591..2a8ec647e8e6 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "pebble_iterator.go", "pebble_merge.go", "pebble_mvcc_scanner.go", + "read_as_of_iterator.go", "replicas_storage.go", "resource_limiter.go", "row_counter.go", @@ -116,6 +117,7 @@ go_test( "pebble_file_registry_test.go", "pebble_mvcc_scanner_test.go", "pebble_test.go", + "read_as_of_iterator_test.go", "resource_limiter_test.go", "sst_iterator_test.go", "sst_test.go", diff --git a/pkg/storage/multi_iterator_test.go b/pkg/storage/multi_iterator_test.go index aa84e9acb46e..b6ed9456a2dc 100644 --- a/pkg/storage/multi_iterator_test.go +++ b/pkg/storage/multi_iterator_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" ) func TestMultiIterator(t *testing.T) { @@ -37,13 +38,6 @@ func TestMultiIterator(t *testing.T) { // MultiIterator, which is fully iterated (using either NextKey or Next) and // turned back into a string in the same format as `input`. This is compared // to expectedNextKey or expectedNext. - // - // Input is a string containing key, timestamp, value tuples: first a single - // character key, then a single character timestamp walltime. If the - // character after the timestamp is an M, this entry is a "metadata" key - // (timestamp=0, sorts before any non-0 timestamp, and no value). If the - // character after the timestamp is an X, this entry is a deletion - // tombstone. Otherwise the value is the same as the timestamp. tests := []struct { inputs []string expectedNextKey string @@ -86,73 +80,84 @@ func TestMultiIterator(t *testing.T) { for _, input := range test.inputs { batch := pebble.NewBatch() defer batch.Close() - for i := 0; ; { - if i == len(input) { - break - } - k := []byte{input[i]} - ts := hlc.Timestamp{WallTime: int64(input[i+1])} - var v []byte - if i+1 < len(input) && input[i+1] == 'M' { - ts = hlc.Timestamp{} - v = nil - } else if i+2 < len(input) && input[i+2] == 'X' { - v = nil - i++ - } else { - v = []byte{input[i+1]} - } - i += 2 - if ts.IsEmpty() { - if err := batch.PutUnversioned(k, v); err != nil { - t.Fatalf("%+v", err) - } - } else { - if err := batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v); err != nil { - t.Fatalf("%+v", err) - } - } - } + populateBatch(t, batch, input) iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) defer iter.Close() iters = append(iters, iter) } - subtests := []struct { - name string - expected string - fn func(SimpleMVCCIterator) - }{ + subtests := []iterSubtest{ {"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey}, {"Next", test.expectedNext, (SimpleMVCCIterator).Next}, } for _, subtest := range subtests { t.Run(subtest.name, func(t *testing.T) { - var output bytes.Buffer it := MakeMultiIterator(iters) - for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) { - ok, err := it.Valid() - if err != nil { - t.Fatalf("unexpected error: %+v", err) - } - if !ok { - break - } - output.Write(it.UnsafeKey().Key) - if it.UnsafeKey().Timestamp.IsEmpty() { - output.WriteRune('M') - } else { - output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) - if len(it.UnsafeValue()) == 0 { - output.WriteRune('X') - } - } - } - if actual := output.String(); actual != subtest.expected { - t.Errorf("got %q expected %q", actual, subtest.expected) - } + iterateSimpleMultiIter(t, it, subtest) }) } }) } } + +// populateBatch populates a pebble batch with a series of MVCC key values. +// input is a string containing key, timestamp, value tuples: first a single +// character key, then a single character timestamp walltime. If the +// character after the timestamp is an M, this entry is a "metadata" key +// (timestamp=0, sorts before any non-0 timestamp, and no value). If the +// character after the timestamp is an X, this entry is a deletion +// tombstone. Otherwise the value is the same as the timestamp. +func populateBatch(t *testing.T, batch Batch, input string) { + for i := 0; ; { + if i == len(input) { + break + } + k := []byte{input[i]} + ts := hlc.Timestamp{WallTime: int64(input[i+1])} + var v []byte + if i+1 < len(input) && input[i+1] == 'M' { + ts = hlc.Timestamp{} + v = nil + } else if i+2 < len(input) && input[i+2] == 'X' { + v = nil + i++ + } else { + v = []byte{input[i+1]} + } + i += 2 + if ts.IsEmpty() { + require.NoError(t, batch.PutUnversioned(k, v)) + } else { + require.NoError(t, batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v)) + } + } +} + +type iterSubtest struct { + name string + expected string + fn func(SimpleMVCCIterator) +} + +// iterateSimpleMultiIter iterates through a simpleMVCCIterator for expected values, +// and assumes that populateBatch populated the keys for the iterator. +func iterateSimpleMultiIter(t *testing.T, it SimpleMVCCIterator, subtest iterSubtest) { + var output bytes.Buffer + for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) { + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + break + } + output.Write(it.UnsafeKey().Key) + if it.UnsafeKey().Timestamp.IsEmpty() { + output.WriteRune('M') + } else { + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + if len(it.UnsafeValue()) == 0 { + output.WriteRune('X') + } + } + } + require.Equal(t, subtest.expected, output.String()) +} diff --git a/pkg/storage/read_as_of_iterator.go b/pkg/storage/read_as_of_iterator.go new file mode 100644 index 000000000000..55a2c948c4f8 --- /dev/null +++ b/pkg/storage/read_as_of_iterator.go @@ -0,0 +1,118 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "github.com/cockroachdb/cockroach/pkg/util/hlc" + +// ReadAsOfIterator wraps a SimpleMVCCIterator and only surfaces the latest +// valid key of a given MVCC key that is also below the asOf timestamp, if set. +// Further, the iterator does not surface delete tombstones, nor any MVCC keys +// shadowed by delete tombstones below the asOf timestamp, if set. The iterator +// assumes that it will not encounter any write intents. +type ReadAsOfIterator struct { + iter SimpleMVCCIterator + + // asOf is the latest timestamp of a key surfaced by the iterator. + asOf hlc.Timestamp +} + +var _ SimpleMVCCIterator = &ReadAsOfIterator{} + +// Close closes the underlying iterator. +func (f *ReadAsOfIterator) Close() { + f.iter.Close() +} + +// SeekGE advances the iterator to the first key in the engine which is >= the +// provided key that obeys the ReadAsOfIterator key constraints. +func (f *ReadAsOfIterator) SeekGE(originalKey MVCCKey) { + // To ensure SeekGE seeks to a key that isn't shadowed by a tombstone that the + // ReadAsOfIterator would have skipped (i.e. a tombstone below asOf), seek to + // the key with the latest possible timestamp that the iterator could surface + // (i.e. asOf, if set) and iterate to the next valid key at or below the caller's + // key that also obeys the iterator's constraints. + synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} + f.iter.SeekGE(synthetic) + + if ok := f.advance(); ok && f.UnsafeKey().Less(originalKey) { + // The following is true: + // originalKey.Key == f.UnsafeKey && + // f.asOf timestamp (if set) >= current timestamp > originalKey timestamp. + // + // This implies the caller is seeking to a key that is shadowed by a valid + // key that obeys the iterator 's constraints. The caller's key is NOT the + // latest key of the given MVCC key; therefore, skip to the next MVCC key. + f.NextKey() + } +} + +// Valid implements the simpleMVCCIterator. +func (f *ReadAsOfIterator) Valid() (bool, error) { + return f.iter.Valid() +} + +// Next advances the iterator to the next valid MVCC key obeying the iterator's +// constraints. Note that Next and NextKey have the same implementation because +// the iterator only surfaces the latest valid key of a given MVCC key below the +// asOf timestamp. +func (f *ReadAsOfIterator) Next() { + f.NextKey() +} + +// NextKey advances the iterator to the next valid MVCC key obeying the +// iterator's constraints. NextKey() is only guaranteed to surface a key that +// obeys the iterator's constraints if the iterator was already on a key that +// obeys the constraints. To ensure this, initialize the iterator with a SeekGE +// call before any calls to NextKey(). +func (f *ReadAsOfIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +// UnsafeValue returns the current value as a byte slice, but the memory is +// invalidated on the next call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeValue() []byte { + return f.iter.UnsafeValue() +} + +// advance moves past keys with timestamps later than f.asOf and skips MVCC keys +// whose latest value (subject to f.asOF) has been deleted. Note that advance +// moves past keys above asOF before evaluating tombstones, implying the +// iterator will never call f.iter.NextKey() on a tombstone with a timestamp +// later than f.asOF. +func (f *ReadAsOfIterator) advance() bool { + for { + if ok, err := f.Valid(); err != nil || !ok { + // No valid keys found. + return false + } else if !f.asOf.IsEmpty() && f.asOf.Less(f.iter.UnsafeKey().Timestamp) { + // Skip keys above the asOf timestamp. + f.iter.Next() + } else if len(f.iter.UnsafeValue()) == 0 { + // Skip to the next MVCC key if we find a tombstone. + f.iter.NextKey() + } else { + // On a valid key. + return true + } + } +} + +// NewReadAsOfIterator constructs a ReadAsOfIterator. +func NewReadAsOfIterator(iter SimpleMVCCIterator, asOf hlc.Timestamp) *ReadAsOfIterator { + return &ReadAsOfIterator{iter: iter, asOf: asOf} +} diff --git a/pkg/storage/read_as_of_iterator_test.go b/pkg/storage/read_as_of_iterator_test.go new file mode 100644 index 000000000000..71af34d4e0b7 --- /dev/null +++ b/pkg/storage/read_as_of_iterator_test.go @@ -0,0 +1,186 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +type asOfTest struct { + input string + expectedNextKey string + asOf string +} + +func TestReadAsOfIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + // The test turns each `input` into a batch for the readAsOfIterator, fully + // iterates the iterator, and puts the surfaced keys into a string in the same + // format as `input`. The test then compares the output to 'expectedNextKey'. + // The 'asOf' field represents the wall time of the hlc.Timestamp for the + // readAsOfIterator. + tests := []asOfTest{ + // Ensure nextkey works as expected. + {input: "b1c1", expectedNextKey: "b1c1", asOf: ""}, + {input: "b2b1", expectedNextKey: "b2", asOf: ""}, + + // Ensure AOST is an inclusive upper bound. + {input: "b1", expectedNextKey: "b1", asOf: "1"}, + {input: "b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip within keys. + {input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip across keys. + {input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, + + // Ensure next key captures at most one mvcc key per key after an asOf skip. + {input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, + + // Ensure an AOST 'next' takes precedence over a tombstone 'nextkey'. + {input: "b2Xb1c1", expectedNextKey: "c1", asOf: ""}, + {input: "b2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, + + // Ensure clean iteration over double tombstone. + {input: "a1Xb2Xb1c1", expectedNextKey: "c1", asOf: ""}, + {input: "a1Xb2Xb1c1", expectedNextKey: "b1c1", asOf: "1"}, + + // Ensure tombstone is skipped after an AOST skip. + {input: "b3c2Xc1d1", expectedNextKey: "d1", asOf: "2"}, + {input: "b3c2Xc1d1", expectedNextKey: "c1d1", asOf: "1"}, + + // Ensure key before delete tombstone gets read if under AOST. + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, + {input: "b2b1Xc1", expectedNextKey: "c1", asOf: "1"}, + } + + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + subtests := []iterSubtest{ + {"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey}, + } + for _, subtest := range subtests { + t.Run(subtest.name, func(t *testing.T) { + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewReadAsOfIterator(iter, asOf) + iterateSimpleMultiIter(t, it, subtest) + }) + } + }) + } +} + +func TestReadAsOfIteratorSeek(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + tests := []struct { + input string + seekKey string + expected string + asOf string + }{ + // Ensure vanilla seek works. + {"a1b1", "a1", "a1", ""}, + + // Ensure seek always returns the latest key of an MVCC key. + {"a2a1b1", "a1", "b1", ""}, + {"a2a1b1", "a1", "b1", "2"}, + {"a2a1b1", "a1", "a1", "1"}, + + // Ensure out of bounds seek fails gracefully. + {"a1", "b1", "notOK", ""}, + + // Ensure the asOf timestamp moves the iterator during a seek. + {"a2a1", "a2", "a1", "1"}, + {"a2b1", "a2", "b1", "1"}, + + // Ensure seek does not return on a tombstone. + {"a3Xa1b1", "a3", "b1", ""}, + + // Ensure seek does not return on a key shadowed by a tombstone. + {"a3Xa2a1b1", "a2", "b1", ""}, + {"a3Xa2a1b1", "a2", "b1", "3"}, + {"a3a2Xa1b1", "a1", "b1", ""}, + {"a3a2Xa1b2Xb1c1", "a1", "c1", ""}, + + // Ensure we can seek to a key right before a tombstone. + {"a2Xa1b2b1Xc1", "a1", "b2", ""}, + + // Ensure AOST 'next' takes precendence over tombstone 'nextkey'. + {"a4a3Xa1b1", "a3", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a1", "1"}, + {"a4a3Xa2a1b1", "a2", "a2", "2"}, + } + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } + it := NewReadAsOfIterator(iter, asOf) + var output bytes.Buffer + + seekKey := MVCCKey{ + Key: []byte{test.seekKey[0]}, + Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, + } + it.SeekGE(seekKey) + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + if test.expected == "notOK" { + return + } + require.NoError(t, err, "seek not ok") + } + output.Write(it.UnsafeKey().Key) + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + require.Equal(t, test.expected, output.String()) + }) + } +} diff --git a/pkg/storage/sst_iterator_test.go b/pkg/storage/sst_iterator_test.go index 69870d293c10..06163f71e24d 100644 --- a/pkg/storage/sst_iterator_test.go +++ b/pkg/storage/sst_iterator_test.go @@ -83,7 +83,8 @@ func TestSSTIterator(t *testing.T) { sst := MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() var allKVs []MVCCKeyValue - for i := 0; i < 10; i++ { + maxWallTime := 10 + for i := 0; i < maxWallTime; i++ { kv := MVCCKeyValue{ Key: MVCCKey{ Key: []byte{'A' + byte(i)}, @@ -129,4 +130,26 @@ func TestSSTIterator(t *testing.T) { defer iter.Close() runTestSSTIterator(t, iter, allKVs) }) + t.Run("AsOf", func(t *testing.T) { + iter, err := NewMemSSTIterator(sstFile.Data(), false) + if err != nil { + t.Fatalf("%+v", err) + } + defer iter.Close() + asOfTimes := []hlc.Timestamp{ + {WallTime: int64(maxWallTime / 2)}, + {WallTime: int64(maxWallTime)}, + {}} + for _, asOf := range asOfTimes { + var asOfKVs []MVCCKeyValue + for _, kv := range allKVs { + if !asOf.IsEmpty() && asOf.Less(kv.Key.Timestamp) { + continue + } + asOfKVs = append(asOfKVs, kv) + } + asOfIter := NewReadAsOfIterator(iter, asOf) + runTestSSTIterator(t, asOfIter, asOfKVs) + } + }) }