Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: create readAsOfIterator for RESTORE #77281

Merged
merged 1 commit into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func inputReader(

type mergedSST struct {
entry execinfrapb.RestoreSpanEntry
iter storage.SimpleMVCCIterator
iter *storage.ReadAsOfIterator
cleanup func()
}

Expand Down Expand Up @@ -300,8 +300,10 @@ func (rd *restoreDataProcessor) openSSTs(
// channel.
sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
multiIter := storage.MakeMultiIterator(itersToSend)
readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime)

cleanup := func() {
readAsOfIter.Close()
multiIter.Close()
for _, iter := range itersToSend {
iter.Close()
Expand All @@ -316,7 +318,7 @@ func (rd *restoreDataProcessor) openSSTs(

mSST := mergedSST{
entry: entry,
iter: multiIter,
iter: readAsOfIter,
cleanup: cleanup,
}

Expand Down Expand Up @@ -449,39 +451,21 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key},
storage.MVCCKey{Key: entry.Span.EndKey}

for iter.SeekGE(startKeyMVCC); ; {
for iter.SeekGE(startKeyMVCC); ; iter.NextKey() {
ok, err := iter.Valid()
if err != nil {
return summary, err
}
if !ok {
break
}

if !rd.spec.RestoreTime.IsEmpty() {
// TODO(dan): If we have to skip past a lot of versions to find the
// latest one before args.EndTime, then this could be slow.
if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) {
iter.Next()
continue
}
}

if !ok || !iter.UnsafeKey().Less(endKeyMVCC) {
break
}
if len(iter.UnsafeValue()) == 0 {
// Value is deleted.
iter.NextKey()
continue
}

key := iter.UnsafeKey()
keyScratch = append(keyScratch[:0], key.Key...)
key.Key = keyScratch
valueScratch = append(valueScratch[:0], iter.UnsafeValue()...)
value := roachpb.Value{RawBytes: valueScratch}
iter.NextKey()

key.Key, ok, err = kr.RewriteKey(key.Key)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"pebble_iterator.go",
"pebble_merge.go",
"pebble_mvcc_scanner.go",
"read_as_of_iterator.go",
"replicas_storage.go",
"resource_limiter.go",
"row_counter.go",
Expand Down Expand Up @@ -116,6 +117,7 @@ go_test(
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"read_as_of_iterator_test.go",
"resource_limiter_test.go",
"sst_iterator_test.go",
"sst_test.go",
Expand Down
127 changes: 66 additions & 61 deletions pkg/storage/multi_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestMultiIterator(t *testing.T) {
Expand All @@ -37,13 +38,6 @@ func TestMultiIterator(t *testing.T) {
// MultiIterator, which is fully iterated (using either NextKey or Next) and
// turned back into a string in the same format as `input`. This is compared
// to expectedNextKey or expectedNext.
//
// Input is a string containing key, timestamp, value tuples: first a single
// character key, then a single character timestamp walltime. If the
// character after the timestamp is an M, this entry is a "metadata" key
// (timestamp=0, sorts before any non-0 timestamp, and no value). If the
// character after the timestamp is an X, this entry is a deletion
// tombstone. Otherwise the value is the same as the timestamp.
tests := []struct {
inputs []string
expectedNextKey string
Expand Down Expand Up @@ -86,73 +80,84 @@ func TestMultiIterator(t *testing.T) {
for _, input := range test.inputs {
batch := pebble.NewBatch()
defer batch.Close()
for i := 0; ; {
if i == len(input) {
break
}
k := []byte{input[i]}
ts := hlc.Timestamp{WallTime: int64(input[i+1])}
var v []byte
if i+1 < len(input) && input[i+1] == 'M' {
ts = hlc.Timestamp{}
v = nil
} else if i+2 < len(input) && input[i+2] == 'X' {
v = nil
i++
} else {
v = []byte{input[i+1]}
}
i += 2
if ts.IsEmpty() {
if err := batch.PutUnversioned(k, v); err != nil {
t.Fatalf("%+v", err)
}
} else {
if err := batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v); err != nil {
t.Fatalf("%+v", err)
}
}
}
populateBatch(t, batch, input)
iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iters = append(iters, iter)
}

subtests := []struct {
name string
expected string
fn func(SimpleMVCCIterator)
}{
subtests := []iterSubtest{
{"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey},
{"Next", test.expectedNext, (SimpleMVCCIterator).Next},
}
for _, subtest := range subtests {
t.Run(subtest.name, func(t *testing.T) {
var output bytes.Buffer
it := MakeMultiIterator(iters)
for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) {
ok, err := it.Valid()
if err != nil {
t.Fatalf("unexpected error: %+v", err)
}
if !ok {
break
}
output.Write(it.UnsafeKey().Key)
if it.UnsafeKey().Timestamp.IsEmpty() {
output.WriteRune('M')
} else {
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
if len(it.UnsafeValue()) == 0 {
output.WriteRune('X')
}
}
}
if actual := output.String(); actual != subtest.expected {
t.Errorf("got %q expected %q", actual, subtest.expected)
}
iterateSimpleMultiIter(t, it, subtest)
})
}
})
}
}

// populateBatch populates a pebble batch with a series of MVCC key values.
// input is a string containing key, timestamp, value tuples: first a single
// character key, then a single character timestamp walltime. If the
// character after the timestamp is an M, this entry is a "metadata" key
// (timestamp=0, sorts before any non-0 timestamp, and no value). If the
// character after the timestamp is an X, this entry is a deletion
// tombstone. Otherwise the value is the same as the timestamp.
func populateBatch(t *testing.T, batch Batch, input string) {
for i := 0; ; {
if i == len(input) {
break
}
k := []byte{input[i]}
ts := hlc.Timestamp{WallTime: int64(input[i+1])}
var v []byte
if i+1 < len(input) && input[i+1] == 'M' {
ts = hlc.Timestamp{}
v = nil
} else if i+2 < len(input) && input[i+2] == 'X' {
v = nil
i++
} else {
v = []byte{input[i+1]}
}
i += 2
if ts.IsEmpty() {
require.NoError(t, batch.PutUnversioned(k, v))
} else {
require.NoError(t, batch.PutRawMVCC(MVCCKey{Key: k, Timestamp: ts}, v))
}
}
}

type iterSubtest struct {
name string
expected string
fn func(SimpleMVCCIterator)
}

// iterateSimpleMultiIter iterates through a simpleMVCCIterator for expected values,
// and assumes that populateBatch populated the keys for the iterator.
func iterateSimpleMultiIter(t *testing.T, it SimpleMVCCIterator, subtest iterSubtest) {
var output bytes.Buffer
for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
output.Write(it.UnsafeKey().Key)
if it.UnsafeKey().Timestamp.IsEmpty() {
output.WriteRune('M')
} else {
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
if len(it.UnsafeValue()) == 0 {
output.WriteRune('X')
}
}
}
require.Equal(t, subtest.expected, output.String())
}
118 changes: 118 additions & 0 deletions pkg/storage/read_as_of_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import "github.com/cockroachdb/cockroach/pkg/util/hlc"

// ReadAsOfIterator wraps a SimpleMVCCIterator and only surfaces the latest
// valid key of a given MVCC key that is also below the asOf timestamp, if set.
// Further, the iterator does not surface delete tombstones, nor any MVCC keys
// shadowed by delete tombstones below the asOf timestamp, if set. The iterator
// assumes that it will not encounter any write intents.
type ReadAsOfIterator struct {
msbutler marked this conversation as resolved.
Show resolved Hide resolved
iter SimpleMVCCIterator

// asOf is the latest timestamp of a key surfaced by the iterator.
asOf hlc.Timestamp
}

var _ SimpleMVCCIterator = &ReadAsOfIterator{}

// Close closes the underlying iterator.
func (f *ReadAsOfIterator) Close() {
f.iter.Close()
}

// SeekGE advances the iterator to the first key in the engine which is >= the
// provided key that obeys the ReadAsOfIterator key constraints.
func (f *ReadAsOfIterator) SeekGE(originalKey MVCCKey) {
// To ensure SeekGE seeks to a key that isn't shadowed by a tombstone that the
// ReadAsOfIterator would have skipped (i.e. a tombstone below asOf), seek to
// the key with the latest possible timestamp that the iterator could surface
// (i.e. asOf, if set) and iterate to the next valid key at or below the caller's
// key that also obeys the iterator's constraints.
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf}
f.iter.SeekGE(synthetic)

if ok := f.advance(); ok && f.UnsafeKey().Less(originalKey) {
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved
// The following is true:
// originalKey.Key == f.UnsafeKey &&
// f.asOf timestamp (if set) >= current timestamp > originalKey timestamp.
//
// This implies the caller is seeking to a key that is shadowed by a valid
// key that obeys the iterator 's constraints. The caller's key is NOT the
// latest key of the given MVCC key; therefore, skip to the next MVCC key.
f.NextKey()
}
}

// Valid implements the simpleMVCCIterator.
func (f *ReadAsOfIterator) Valid() (bool, error) {
return f.iter.Valid()
}

// Next advances the iterator to the next valid MVCC key obeying the iterator's
// constraints. Note that Next and NextKey have the same implementation because
// the iterator only surfaces the latest valid key of a given MVCC key below the
// asOf timestamp.
func (f *ReadAsOfIterator) Next() {
f.NextKey()
}

// NextKey advances the iterator to the next valid MVCC key obeying the
// iterator's constraints. NextKey() is only guaranteed to surface a key that
// obeys the iterator's constraints if the iterator was already on a key that
// obeys the constraints. To ensure this, initialize the iterator with a SeekGE
// call before any calls to NextKey().
func (f *ReadAsOfIterator) NextKey() {
f.iter.NextKey()
f.advance()
}

// UnsafeKey returns the current key, but the memory is invalidated on the next
// call to {NextKey,Seek}.
func (f *ReadAsOfIterator) UnsafeKey() MVCCKey {
return f.iter.UnsafeKey()
}

// UnsafeValue returns the current value as a byte slice, but the memory is
// invalidated on the next call to {NextKey,Seek}.
func (f *ReadAsOfIterator) UnsafeValue() []byte {
return f.iter.UnsafeValue()
}

// advance moves past keys with timestamps later than f.asOf and skips MVCC keys
// whose latest value (subject to f.asOF) has been deleted. Note that advance
// moves past keys above asOF before evaluating tombstones, implying the
// iterator will never call f.iter.NextKey() on a tombstone with a timestamp
// later than f.asOF.
func (f *ReadAsOfIterator) advance() bool {
msbutler marked this conversation as resolved.
Show resolved Hide resolved
for {
if ok, err := f.Valid(); err != nil || !ok {
// No valid keys found.
return false
} else if !f.asOf.IsEmpty() && f.asOf.Less(f.iter.UnsafeKey().Timestamp) {
// Skip keys above the asOf timestamp.
f.iter.Next()
} else if len(f.iter.UnsafeValue()) == 0 {
// Skip to the next MVCC key if we find a tombstone.
f.iter.NextKey()
} else {
// On a valid key.
return true
}
}
}

// NewReadAsOfIterator constructs a ReadAsOfIterator.
func NewReadAsOfIterator(iter SimpleMVCCIterator, asOf hlc.Timestamp) *ReadAsOfIterator {
return &ReadAsOfIterator{iter: iter, asOf: asOf}
}
Loading