Skip to content

Commit

Permalink
address Erik's comments
Browse files Browse the repository at this point in the history
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
msbutler committed May 2, 2022
1 parent 4a1b2f7 commit 672746e
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 118 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (rd *restoreDataProcessor) openSSTs(
// channel.
sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error {
multiIter := storage.MakeMultiIterator(itersToSend)
readAsOfIter := storage.MakeReadAsOfIterator(multiIter, rd.spec.RestoreTime)
readAsOfIter := storage.NewReadAsOfIterator(multiIter, rd.spec.RestoreTime)

cleanup := func() {
readAsOfIter.Close()
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ go_test(
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"read_as_of_iterator_test.go",
"resource_limiter_test.go",
"sst_iterator_test.go",
"sst_test.go",
Expand Down
134 changes: 73 additions & 61 deletions pkg/storage/multi_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ func TestMultiIterator(t *testing.T) {
// MultiIterator, which is fully iterated (using either NextKey or Next) and
// turned back into a string in the same format as `input`. This is compared
// to expectedNextKey or expectedNext.
//
// Input is a string containing key, timestamp, value tuples: first a single
// character key, then a single character timestamp walltime. If the
// character after the timestamp is an M, this entry is a "metadata" key
// (timestamp=0, sorts before any non-0 timestamp, and no value). If the
// character after the timestamp is an X, this entry is a deletion
// tombstone. Otherwise the value is the same as the timestamp.
tests := []struct {
inputs []string
expectedNextKey string
Expand Down Expand Up @@ -86,73 +79,92 @@ func TestMultiIterator(t *testing.T) {
for _, input := range test.inputs {
batch := pebble.NewBatch()
defer batch.Close()
for i := 0; ; {
if i == len(input) {
break
}
k := []byte{input[i]}
ts := hlc.Timestamp{WallTime: int64(input[i+1])}
var v []byte
if i+1 < len(input) && input[i+1] == 'M' {
ts = hlc.Timestamp{}
v = nil
} else if i+2 < len(input) && input[i+2] == 'X' {
v = nil
i++
} else {
v = []byte{input[i+1]}
}
i += 2
if ts.IsEmpty() {
if err := batch.PutUnversioned(k, v); err != nil {
t.Fatalf("%+v", err)
}
} else {
if err := batch.PutMVCC(MVCCKey{Key: k, Timestamp: ts}, v); err != nil {
t.Fatalf("%+v", err)
}
}
}
populateBatch(t, batch, input)
iter := batch.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iters = append(iters, iter)
}

subtests := []struct {
name string
expected string
fn func(SimpleMVCCIterator)
}{
subtests := []iterSubtest{
{"NextKey", test.expectedNextKey, (SimpleMVCCIterator).NextKey},
{"Next", test.expectedNext, (SimpleMVCCIterator).Next},
}
for _, subtest := range subtests {
t.Run(subtest.name, func(t *testing.T) {
var output bytes.Buffer
it := MakeMultiIterator(iters)
for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) {
ok, err := it.Valid()
if err != nil {
t.Fatalf("unexpected error: %+v", err)
}
if !ok {
break
}
output.Write(it.UnsafeKey().Key)
if it.UnsafeKey().Timestamp.IsEmpty() {
output.WriteRune('M')
} else {
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
if len(it.UnsafeValue()) == 0 {
output.WriteRune('X')
}
}
}
if actual := output.String(); actual != subtest.expected {
t.Errorf("got %q expected %q", actual, subtest.expected)
}
iterateSimpleMultiIter(t, it, subtest)
})
}
})
}
}

// populateBatch populates a pebble batch with a series of MVCC key values.
// input is a string containing key, timestamp, value tuples: first a single
// character key, then a single character timestamp walltime. If the
// character after the timestamp is an M, this entry is a "metadata" key
// (timestamp=0, sorts before any non-0 timestamp, and no value). If the
// character after the timestamp is an X, this entry is a deletion
// tombstone. Otherwise the value is the same as the timestamp.
func populateBatch(t *testing.T, batch Batch, input string) {
for i := 0; ; {
if i == len(input) {
break
}
k := []byte{input[i]}
ts := hlc.Timestamp{WallTime: int64(input[i+1])}
var v []byte
if i+1 < len(input) && input[i+1] == 'M' {
ts = hlc.Timestamp{}
v = nil
} else if i+2 < len(input) && input[i+2] == 'X' {
v = nil
i++
} else {
v = []byte{input[i+1]}
}
i += 2
if ts.IsEmpty() {
if err := batch.PutUnversioned(k, v); err != nil {
t.Fatalf("%+v", err)
}
} else {
if err := batch.PutMVCC(MVCCKey{Key: k, Timestamp: ts}, v); err != nil {
t.Fatalf("%+v", err)
}
}
}
}

type iterSubtest struct {
name string
expected string
fn func(SimpleMVCCIterator)
}

// iterateSimpleMultiIter iterates through a simpleMVCCIterator for expected values,
// and assumes that populateBatch populated the keys for the iterator.
func iterateSimpleMultiIter(t *testing.T, it SimpleMVCCIterator, subtest iterSubtest) {
var output bytes.Buffer
for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; subtest.fn(it) {
ok, err := it.Valid()
if err != nil {
t.Fatalf("unexpected error: %+v", err)
}
if !ok {
break
}
output.Write(it.UnsafeKey().Key)
if it.UnsafeKey().Timestamp.IsEmpty() {
output.WriteRune('M')
} else {
output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime))
if len(it.UnsafeValue()) == 0 {
output.WriteRune('X')
}
}
}
if actual := output.String(); actual != subtest.expected {
t.Errorf("got %q expected %q", actual, subtest.expected)
}
}
146 changes: 91 additions & 55 deletions pkg/storage/read_as_of_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ package storage
import "github.com/cockroachdb/cockroach/pkg/util/hlc"

// ReadAsOfIterator wraps an iterator that implements the SimpleMVCCIterator
// such that all MVCC keys returned are not delete tombstones and will have
// timestamps less than the endtime timestamp, unless that key is invalid.
// such that any valid MVCC key returned will have a timestamp less than the
// asOf timestamp, is neither a delete tombstone, nor shadowed by a delete
// tombstone less than the asOf timestamp.
type ReadAsOfIterator struct {
iter SimpleMVCCIterator

// endtime is the latest timestamp an MVCC key will have that the
// asOf is the latest timestamp an MVCC key will have that the
// ReadAsOfIterator will return
endTime hlc.Timestamp

// valid is the latest boolean value returned by Valid()
valid bool
asOf hlc.Timestamp
}

var _ SimpleMVCCIterator = &ReadAsOfIterator{}
Expand All @@ -33,44 +31,44 @@ func (f *ReadAsOfIterator) Close() {
}

// SeekGE advances the iterator to the first key in the engine which is >= the
// provided key and less than f.endtime.
//
// If Valid() is false after SeekGE() returns, the iterator may be at an invalid
// key with a timestamp greater than or equal to f.endtime.
func (f *ReadAsOfIterator) SeekGE(key MVCCKey) {
f.iter.SeekGE(key)
f.advance()
// provided key that obeys the ReadAsOfIterator key constraints.
func (f *ReadAsOfIterator) SeekGE(originalKey MVCCKey) {
// To ensure SeekGE seeks to a key that isn't shadowed by a tombstone that the
// ReadAsOfIterator would have skipped (i.e. at a time earlier than the AOST),
// seek to the key with the latest possible timestamp, advance to the next
// valid key only if this valid key is less (at a newer timestamp, and same
// key) than the original key.
synthetic := MVCCKey{Key: originalKey.Key, Timestamp: hlc.MaxTimestamp}
f.iter.SeekGE(synthetic)
if ok := f.advance(); !ok {
return
}
for f.UnsafeKey().Less(originalKey) {
// The following is true:
// originalKey.Key == syntheticKey.key &&
// f.asOf timestamp >= current timestamp > originalKey timestamp
// move to the next key and advance to a key that obeys the asOf constraints.
f.iter.Next()
if ok := f.advance(); !ok {
return
}
}
}

// Valid must be called after any call to Seek(), Next(), or similar methods. It
// returns (true, nil) if the iterator points to a valid key (it is undefined to
// call UnsafeKey(), UnsafeValue(), or similar methods unless Valid() has
// returned (true, nil)). It returns (false, nil) if the iterator has moved past
// the end of the valid range, or (false, err) if an error has occurred. Valid()
// will never return true with a non-nil error.
// Valid implements the simpleMVCCIterator.
func (f *ReadAsOfIterator) Valid() (bool, error) {
var err error
f.valid, err = f.iter.Valid()
return f.valid, err
return f.iter.Valid()
}

// Next advances the iterator to the next valid key/value in the iteration that
// has a timestamp less than f.endtime. If Valid() is false after NextKey()
// returns, the iterator may be at an invalid key with a timestamp greater than
// or equal to f.endtime.
// has a timestamp less than f.asOf.
func (f *ReadAsOfIterator) Next() {
f.iter.Next()
f.advance()
}

// NextKey advances the iterator to the next valid MVCC key that has a timestamp
// less than f.endtime. If Valid() is false after NextKey() returns, the
// iterator may be at an invalid key with a timestamp greater than or equal to
// f.endtime.
//
// This operation is distinct from Next which advances to the next version of
// the current key or the next key if the iterator is currently located at the
// last version for a key.
// less than f.asOf.
func (f *ReadAsOfIterator) NextKey() {
f.iter.NextKey()
f.advance()
Expand All @@ -88,34 +86,72 @@ func (f *ReadAsOfIterator) UnsafeValue() []byte {
return f.iter.UnsafeValue()
}

// advance checks that the current key is valid, and if it is, advances past
// keys with timestamps later than the f.endTime and keys whose value has
// been deleted.
func (f *ReadAsOfIterator) advance() {
if _, _ = f.Valid(); !f.valid {
return
// advance moves past keys with timestamps later than the f.asOf and MVCC keys whose
// value has been deleted.
func (f *ReadAsOfIterator) advance() bool {
if ok, _ := f.Valid(); !ok {
return ok
}
if !f.endTime.IsEmpty() {
for f.endTime.Less(f.iter.UnsafeKey().Timestamp) {
f.iter.Next()
if _, _ = f.Valid(); !f.valid {
return
}

// If neither of the internal functions move the iterator, then the current
// key, if valid, is guaranteed to have a timestamp earlier than the asOf time and is
// not a deleted key. If either function moves the iterator, the timestamp and
// tombstone presence must be checked again.
var ok bool
moved := true
for {
ok, moved = f.advanceAsOf()
if !ok {
return ok
}
if moved {
continue
}
ok, moved = f.advanceDel()
if !ok {
return ok
}
if !moved {
break
}
}
for len(f.iter.UnsafeValue()) == 0 {
return true
}

// advanceDel moves the iterator to the next key if the current key is a delete
// tombstone.
func (f *ReadAsOfIterator) advanceDel() (bool, bool) {
var moved bool
if len(f.iter.UnsafeValue()) == 0 {
// Implies the latest version of the given key is a tombstone.
f.iter.NextKey()
if _, _ = f.Valid(); !f.valid {
return
moved = true
if ok, _ := f.Valid(); !ok {
return ok, moved
}
}
return true, moved
}

// advanceAsOf moves the iterator to the next key if the current key is later
// than the asOf timestamp.
func (f *ReadAsOfIterator) advanceAsOf() (bool, bool) {
var moved bool
if !f.asOf.IsEmpty() && f.asOf.Less(f.iter.UnsafeKey().Timestamp) {
f.iter.Next()
moved = true
if ok, _ := f.Valid(); !ok {
return ok, moved
}
}
return true, moved
}

// MakeReadAsOfIterator constructs a ReadAsOfIterator which wraps a
// SimpleMVCCIterator such that all MVCC keys returned will not be tombstones
// and will have timestamps less than the endtime timestamp, unless that key is
// invalid. The caller is responsible for closing the underlying
// SimpleMVCCIterator.
func MakeReadAsOfIterator(iter SimpleMVCCIterator, endtime hlc.Timestamp) *ReadAsOfIterator {
return &ReadAsOfIterator{iter: iter, endTime: endtime}
// NewReadAsOfIterator constructs a ReadAsOfIterator which wraps a
// SimpleMVCCIterator such that any valid MVCC key returned will have a
// timestamp less than the asOf timestamp, is neither a delete tombstone, nor
// shadowed by a delete tombstone less than the asOf timestamp. The caller is
// responsible for closing the underlying SimpleMVCCIterator.
func NewReadAsOfIterator(iter SimpleMVCCIterator, asOf hlc.Timestamp) *ReadAsOfIterator {
return &ReadAsOfIterator{iter: iter, asOf: asOf}
}
Loading

0 comments on commit 672746e

Please sign in to comment.