From 4b2ba3c3e3ca49e92c816b973a9e13095ff726df Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 22 Feb 2022 16:41:20 -0500 Subject: [PATCH] backupccl: create readAsOfMVCCIterator Previously while ingesting SSTs in the restore processor, `processRestoreSpanEntry` would manually skip delete tombstones and keys with timestamps later than the AS OF SYSTEM TIME timestamp provided in the RESTORE command. This PR introduces the readAsOfMVCCIterator which wraps a SimpleMVCCIterator such that all keys returned are not tombstones and will have timestamps less than the AOST timestamp. This PR also hooks this iterator into the restore processor. Some methods in this iterator may get updated as #71155 gets addressed. Fixes #77276 Release justification: low risk, high benefit changes to existing functionality Release note: None --- pkg/ccl/backupccl/restore_data_processor.go | 25 +--- pkg/storage/read_as_of_iterator.go | 121 ++++++++++++++++++++ pkg/storage/sst_iterator_test.go | 25 +++- 3 files changed, 149 insertions(+), 22 deletions(-) create mode 100644 pkg/storage/read_as_of_iterator.go diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 504532bf911c..6c070abb5b3c 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -265,7 +265,7 @@ func inputReader( type mergedSST struct { entry execinfrapb.RestoreSpanEntry - iter storage.SimpleMVCCIterator + iter *storage.ReadAsOfIterator cleanup func() } @@ -313,7 +313,7 @@ func (rd *restoreDataProcessor) openSSTs( mSST := mergedSST{ entry: entry, - iter: multiIter, + iter: storage.MakeReadAsOfIterator(multiIter, rd.spec.RestoreTime), cleanup: cleanup, } @@ -399,6 +399,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( entry := sst.entry iter := sst.iter + defer sst.cleanup() writeAtBatchTS := restoreAtNow.Get(&evalCtx.Settings.SV) @@ -443,38 +444,20 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} - for iter.SeekGE(startKeyMVCC); ; { + for iter.SeekGE(startKeyMVCC); ; iter.NextKey() { ok, err := iter.Valid() if err != nil { return summary, err } - if !ok { - break - } - - if !rd.spec.RestoreTime.IsEmpty() { - // TODO(dan): If we have to skip past a lot of versions to find the - // latest one before args.EndTime, then this could be slow. - if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) { - iter.Next() - continue - } - } if !ok || !iter.UnsafeKey().Less(endKeyMVCC) { break } - if len(iter.UnsafeValue()) == 0 { - // Value is deleted. - iter.NextKey() - continue - } keyScratch = append(keyScratch[:0], iter.UnsafeKey().Key...) valueScratch = append(valueScratch[:0], iter.UnsafeValue()...) key := storage.MVCCKey{Key: keyScratch, Timestamp: iter.UnsafeKey().Timestamp} value := roachpb.Value{RawBytes: valueScratch} - iter.NextKey() key.Key, ok, err = kr.RewriteKey(key.Key) if err != nil { diff --git a/pkg/storage/read_as_of_iterator.go b/pkg/storage/read_as_of_iterator.go new file mode 100644 index 000000000000..223892a5107c --- /dev/null +++ b/pkg/storage/read_as_of_iterator.go @@ -0,0 +1,121 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "github.com/cockroachdb/cockroach/pkg/util/hlc" + +// ReadAsOfIterator wraps an iterator that implements the SimpleMVCCIterator +// such that all MVCC keys returned are not delete tombstones and will have +// timestamps less than the endtime timestamp, unless that key is invalid. +type ReadAsOfIterator struct { + iter SimpleMVCCIterator + + // endtime is the latest timestamp an MVCC key will have that the + // ReadAsOfIterator will return + endTime hlc.Timestamp + + // valid is the latest boolean value returned by Valid() + valid bool +} + +var _ SimpleMVCCIterator = &ReadAsOfIterator{} + +// Close no-ops. The caller is responsible for closing the underlying iterator. +func (f *ReadAsOfIterator) Close() { +} + +// SeekGE advances the iterator to the first key in the engine which is >= the +// provided key and less than f.endtime. +// +// If Valid() is false after SeekGE() returns, the iterator may be at an invalid +// key with a timestamp greater than or equal to f.endtime. +func (f *ReadAsOfIterator) SeekGE(key MVCCKey) { + f.iter.SeekGE(key) + f.advance() +} + +// Valid must be called after any call to Seek(), Next(), or similar methods. It +// returns (true, nil) if the iterator points to a valid key (it is undefined to +// call UnsafeKey(), UnsafeValue(), or similar methods unless Valid() has +// returned (true, nil)). It returns (false, nil) if the iterator has moved past +// the end of the valid range, or (false, err) if an error has occurred. Valid() +// will never return true with a non-nil error. +func (f *ReadAsOfIterator) Valid() (bool, error) { + var err error + f.valid, err = f.iter.Valid() + return f.valid, err +} + +// Next advances the iterator to the next valid key/value in the iteration that +// has a timestamp less than f.endtime. If Valid() is false after NextKey() +// returns, the iterator may be at an invalid key with a timestamp greater than +// or equal to f.endtime. +func (f *ReadAsOfIterator) Next() { + f.iter.Next() + f.advance() +} + +// NextKey advances the iterator to the next valid MVCC key that has a timestamp +// less than f.endtime. If Valid() is false after NextKey() returns, the +// iterator may be at an invalid key with a timestamp greater than or equal to +// f.endtime. +// +// This operation is distinct from Next which advances to the next version of +// the current key or the next key if the iterator is currently located at the +// last version for a key. +func (f *ReadAsOfIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +// UnsafeValue returns the current value as a byte slice, but the memory is +// invalidated on the next call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeValue() []byte { + return f.iter.UnsafeValue() +} + +// advance checks that the current key is valid, and if it is, advances past +// keys with timestamps later than the f.endTime and keys whose value has +// been deleted. +func (f *ReadAsOfIterator) advance() { + if _, _ = f.Valid(); !f.valid { + return + } + if !f.endTime.IsEmpty() { + for f.endTime.Less(f.iter.UnsafeKey().Timestamp) { + f.iter.Next() + if _, _ = f.Valid(); !f.valid { + return + } + } + } + if len(f.iter.UnsafeValue()) == 0 { + f.iter.NextKey() + if _, _ = f.Valid(); !f.valid { + return + } + } +} + +// MakeReadAsOfIterator constructs a ReadAsOfIterator which wraps a +// SimpleMVCCIterator such that all MVCC keys returned will not be tombstones +// and will have timestamps less than the endtime timestamp, unless that key is +// invalid. The caller is responsible for closing the underlying +// SimpleMVCCIterator. +func MakeReadAsOfIterator(iter SimpleMVCCIterator, endtime hlc.Timestamp) *ReadAsOfIterator { + return &ReadAsOfIterator{iter: iter, endTime: endtime} +} diff --git a/pkg/storage/sst_iterator_test.go b/pkg/storage/sst_iterator_test.go index 69870d293c10..83b17fa20127 100644 --- a/pkg/storage/sst_iterator_test.go +++ b/pkg/storage/sst_iterator_test.go @@ -83,7 +83,8 @@ func TestSSTIterator(t *testing.T) { sst := MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() var allKVs []MVCCKeyValue - for i := 0; i < 10; i++ { + maxWallTime := 10 + for i := 0; i < maxWallTime; i++ { kv := MVCCKeyValue{ Key: MVCCKey{ Key: []byte{'A' + byte(i)}, @@ -129,4 +130,26 @@ func TestSSTIterator(t *testing.T) { defer iter.Close() runTestSSTIterator(t, iter, allKVs) }) + t.Run("AsOf", func(t *testing.T) { + iter, err := NewMemSSTIterator(sstFile.Data(), false) + if err != nil { + t.Fatalf("%+v", err) + } + defer iter.Close() + asOfTimes := []hlc.Timestamp{ + {WallTime: int64(maxWallTime / 2)}, + {WallTime: int64(maxWallTime)}, + {}} + for _, asOf := range asOfTimes { + var asOfKVs []MVCCKeyValue + for _, kv := range allKVs { + if !asOf.IsEmpty() && asOf.Less(kv.Key.Timestamp) { + continue + } + asOfKVs = append(asOfKVs, kv) + } + asOfIter := MakeReadAsOfIterator(iter, asOf) + runTestSSTIterator(t, asOfIter, asOfKVs) + } + }) }