Skip to content

Commit

Permalink
backupccl: create readAsOfMVCCIterator
Browse files Browse the repository at this point in the history
Previously while ingesting SSTs in the restore processor,
`processRestoreSpanEntry` would manually skip delete tombstones and keys with
timestamps later than the AS OF SYSTEM TIME timestamp provided in the RESTORE
command.

This PR introduces the readAsOfMVCCIterator which wraps a SimpleMVCCIterator
such that all keys returned are not tombstones and will have timestamps less
than the AOST timestamp. This PR also hooks this iterator into the restore
processor. Some methods in this iterator may get updated as #71155 gets
addressed.

Fixes #77276

Release justification: low risk, high benefit changes to existing functionality

Release note: None
  • Loading branch information
msbutler committed Mar 2, 2022
1 parent bb54797 commit ea04166
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 22 deletions.
25 changes: 4 additions & 21 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func inputReader(

type mergedSST struct {
entry execinfrapb.RestoreSpanEntry
iter storage.SimpleMVCCIterator
iter *storage.ReadAsOfIterator
cleanup func()
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (rd *restoreDataProcessor) openSSTs(

mSST := mergedSST{
entry: entry,
iter: multiIter,
iter: storage.MakeReadAsOfIterator(multiIter, rd.spec.RestoreTime),
cleanup: cleanup,
}

Expand Down Expand Up @@ -399,6 +399,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(

entry := sst.entry
iter := sst.iter

defer sst.cleanup()

writeAtBatchTS := restoreAtNow.Get(&evalCtx.Settings.SV)
Expand Down Expand Up @@ -443,38 +444,20 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key},
storage.MVCCKey{Key: entry.Span.EndKey}

for iter.SeekGE(startKeyMVCC); ; {
for iter.SeekGE(startKeyMVCC); ; iter.NextKey() {
ok, err := iter.Valid()
if err != nil {
return summary, err
}
if !ok {
break
}

if !rd.spec.RestoreTime.IsEmpty() {
// TODO(dan): If we have to skip past a lot of versions to find the
// latest one before args.EndTime, then this could be slow.
if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) {
iter.Next()
continue
}
}

if !ok || !iter.UnsafeKey().Less(endKeyMVCC) {
break
}
if len(iter.UnsafeValue()) == 0 {
// Value is deleted.
iter.NextKey()
continue
}

keyScratch = append(keyScratch[:0], iter.UnsafeKey().Key...)
valueScratch = append(valueScratch[:0], iter.UnsafeValue()...)
key := storage.MVCCKey{Key: keyScratch, Timestamp: iter.UnsafeKey().Timestamp}
value := roachpb.Value{RawBytes: valueScratch}
iter.NextKey()

key.Key, ok, err = kr.RewriteKey(key.Key)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"pebble_iterator.go",
"pebble_merge.go",
"pebble_mvcc_scanner.go",
"read_as_of_iterator.go",
"replicas_storage.go",
"resource_limiter.go",
"row_counter.go",
Expand Down
121 changes: 121 additions & 0 deletions pkg/storage/read_as_of_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import "github.com/cockroachdb/cockroach/pkg/util/hlc"

// ReadAsOfIterator wraps an iterator that implements the SimpleMVCCIterator
// such that all MVCC keys returned are not delete tombstones and will have
// timestamps less than the endtime timestamp, unless that key is invalid.
type ReadAsOfIterator struct {
iter SimpleMVCCIterator

// endtime is the latest timestamp an MVCC key will have that the
// ReadAsOfIterator will return
endTime hlc.Timestamp

// valid is the latest boolean value returned by Valid()
valid bool
}

var _ SimpleMVCCIterator = &ReadAsOfIterator{}

// Close no-ops. The caller is responsible for closing the underlying iterator.
func (f *ReadAsOfIterator) Close() {
}

// SeekGE advances the iterator to the first key in the engine which is >= the
// provided key and less than f.endtime.
//
// If Valid() is false after SeekGE() returns, the iterator may be at an invalid
// key with a timestamp greater than or equal to f.endtime.
func (f *ReadAsOfIterator) SeekGE(key MVCCKey) {
f.iter.SeekGE(key)
f.advance()
}

// Valid must be called after any call to Seek(), Next(), or similar methods. It
// returns (true, nil) if the iterator points to a valid key (it is undefined to
// call UnsafeKey(), UnsafeValue(), or similar methods unless Valid() has
// returned (true, nil)). It returns (false, nil) if the iterator has moved past
// the end of the valid range, or (false, err) if an error has occurred. Valid()
// will never return true with a non-nil error.
func (f *ReadAsOfIterator) Valid() (bool, error) {
var err error
f.valid, err = f.iter.Valid()
return f.valid, err
}

// Next advances the iterator to the next valid key/value in the iteration that
// has a timestamp less than f.endtime. If Valid() is false after NextKey()
// returns, the iterator may be at an invalid key with a timestamp greater than
// or equal to f.endtime.
func (f *ReadAsOfIterator) Next() {
f.iter.Next()
f.advance()
}

// NextKey advances the iterator to the next valid MVCC key that has a timestamp
// less than f.endtime. If Valid() is false after NextKey() returns, the
// iterator may be at an invalid key with a timestamp greater than or equal to
// f.endtime.
//
// This operation is distinct from Next which advances to the next version of
// the current key or the next key if the iterator is currently located at the
// last version for a key.
func (f *ReadAsOfIterator) NextKey() {
f.iter.NextKey()
f.advance()
}

// UnsafeKey returns the current key, but the memory is invalidated on the next
// call to {NextKey,Seek}.
func (f *ReadAsOfIterator) UnsafeKey() MVCCKey {
return f.iter.UnsafeKey()
}

// UnsafeValue returns the current value as a byte slice, but the memory is
// invalidated on the next call to {NextKey,Seek}.
func (f *ReadAsOfIterator) UnsafeValue() []byte {
return f.iter.UnsafeValue()
}

// advance checks that the current key is valid, and if it is, advances past
// keys with timestamps later than the f.endTime and keys whose value has
// been deleted.
func (f *ReadAsOfIterator) advance() {
if _, _ = f.Valid(); !f.valid {
return
}
if !f.endTime.IsEmpty() {
for f.endTime.Less(f.iter.UnsafeKey().Timestamp) {
f.iter.Next()
if _, _ = f.Valid(); !f.valid {
return
}
}
}
for len(f.iter.UnsafeValue()) == 0 {
f.iter.NextKey()
if _, _ = f.Valid(); !f.valid {
return
}
}
}

// MakeReadAsOfIterator constructs a ReadAsOfIterator which wraps a
// SimpleMVCCIterator such that all MVCC keys returned will not be tombstones
// and will have timestamps less than the endtime timestamp, unless that key is
// invalid. The caller is responsible for closing the underlying
// SimpleMVCCIterator.
func MakeReadAsOfIterator(iter SimpleMVCCIterator, endtime hlc.Timestamp) *ReadAsOfIterator {
return &ReadAsOfIterator{iter: iter, endTime: endtime}
}
25 changes: 24 additions & 1 deletion pkg/storage/sst_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func TestSSTIterator(t *testing.T) {
sst := MakeIngestionSSTWriter(ctx, st, sstFile)
defer sst.Close()
var allKVs []MVCCKeyValue
for i := 0; i < 10; i++ {
maxWallTime := 10
for i := 0; i < maxWallTime; i++ {
kv := MVCCKeyValue{
Key: MVCCKey{
Key: []byte{'A' + byte(i)},
Expand Down Expand Up @@ -129,4 +130,26 @@ func TestSSTIterator(t *testing.T) {
defer iter.Close()
runTestSSTIterator(t, iter, allKVs)
})
t.Run("AsOf", func(t *testing.T) {
iter, err := NewMemSSTIterator(sstFile.Data(), false)
if err != nil {
t.Fatalf("%+v", err)
}
defer iter.Close()
asOfTimes := []hlc.Timestamp{
{WallTime: int64(maxWallTime / 2)},
{WallTime: int64(maxWallTime)},
{}}
for _, asOf := range asOfTimes {
var asOfKVs []MVCCKeyValue
for _, kv := range allKVs {
if !asOf.IsEmpty() && asOf.Less(kv.Key.Timestamp) {
continue
}
asOfKVs = append(asOfKVs, kv)
}
asOfIter := MakeReadAsOfIterator(iter, asOf)
runTestSSTIterator(t, asOfIter, asOfKVs)
}
})
}

0 comments on commit ea04166

Please sign in to comment.