Skip to content

Commit

Permalink
rangefeed,storage: use time-bound iterator for with-diff catchup scan
Browse files Browse the repository at this point in the history
The MVCCIncrementalIterator already has support for selectively
seeing versions beyond the time bounds, via the NextIgnoringTime
method. This method is sufficient for with-diff catchup scans -- the
changes to MVCCIncrementalIterator here are limited to comment
improvements.

To utilize NextIgnoringTime in the CatchUpIterator we need to
distinguish between the reasons that a catchup scan wants to
iterate to an older version of a key. These are now handled by
different methods via the simpleCatchupIter interface. There
is a simpleCatchupIterAdapter to provide a trivial implementation
when the underlying iterator is a SimpleMVCCIterator.

There is more testing of MVCCIncrementalIterator's NextIgnoringTime,
and there is now a TestCatchupScan that tests with and without diff.
BenchmarkCatchUpScan now also benchmarks with-diff. Results for
linear-keys, where time-bound iteration shows an improvement:

BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=true/perc=0.00-16         	       3	 397238811 ns/op	253609173 B/op	 3006262 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=true/perc=50.00-16        	       6	 192314659 ns/op	126847284 B/op	 1503167 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=true/perc=75.00-16        	      12	  93243340 ns/op	63433929 B/op	  751606 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=true/perc=95.00-16        	      66	  17636059 ns/op	12691501 B/op	  150356 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=true/perc=99.00-16        	     296	   3566355 ns/op	 2560979 B/op	   30126 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=false/perc=0.00-16        	       2	 587807376 ns/op	253689672 B/op	 3006770 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=false/perc=50.00-16       	       4	 286110763 ns/op	126893106 B/op	 1503413 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=false/perc=75.00-16       	       8	 140012384 ns/op	63458643 B/op	  751740 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=false/perc=95.00-16       	      43	  26210520 ns/op	12695466 B/op	  150380 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=true/withDiff=false/perc=99.00-16       	     200	   5381206 ns/op	 2561444 B/op	   30133 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=true/perc=0.00-16        	       3	 379387200 ns/op	253577896 B/op	 3006228 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=true/perc=50.00-16       	       4	 294533788 ns/op	130799366 B/op	 1503677 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=true/perc=75.00-16       	       4	 254943713 ns/op	69418826 B/op	  752487 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=true/perc=95.00-16       	       5	 220269848 ns/op	20291392 B/op	  151411 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=true/perc=99.00-16       	       5	 211813333 ns/op	10473760 B/op	   31231 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=false/perc=0.00-16       	       3	 379575618 ns/op	253591349 B/op	 3006275 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=false/perc=50.00-16      	       4	 282490494 ns/op	126852270 B/op	 1503388 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=false/perc=75.00-16      	       5	 241938049 ns/op	63463176 B/op	  751985 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=false/perc=95.00-16      	       5	 210848534 ns/op	12741640 B/op	  150944 allocs/op
BenchmarkCatchUpScan/linear-keys/useTBI=false/withDiff=false/perc=99.00-16      	       6	 196175587 ns/op	 2602912 B/op	   30652 allocs/op

Fixes cockroachdb#78974

Release note: None
  • Loading branch information
sumeerbhola authored and jayshrivastava committed Oct 24, 2022
1 parent a2541a0 commit 4128b60
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 74 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_test(
size = "small",
srcs = [
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
"processor_test.go",
"registry_test.go",
"resolved_timestamp_test.go",
Expand Down
86 changes: 60 additions & 26 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,34 @@ import (

// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
storage.SimpleMVCCIterator
simpleCatchupIter
close func()
}

// simpleCatchupIter is an extension of SimpleMVCCIterator that allows for the
// primary iterator to be implemented using a regular MVCCIterator or a
// (often) more efficient MVCCIncrementalIterator. When the caller wants to
// iterate to see older versions of a key, the desire of the caller needs to
// be expressed using one of two methods:
// - Next: when it wants to omit any versions that are not within the time
// bounds.
// - NextIgnoringTime: when it wants to see the next older version even if it
// is not within the time bounds.
type simpleCatchupIter interface {
storage.SimpleMVCCIterator
NextIgnoringTime()
}

type simpleCatchupIterAdapter struct {
storage.SimpleMVCCIterator
}

func (i simpleCatchupIterAdapter) NextIgnoringTime() {
i.SimpleMVCCIterator.Next()
}

var _ simpleCatchupIter = simpleCatchupIterAdapter{}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
// If useTBI is true, a time-bound iterator will be used if possible,
// configured with a start time taken from the RangeFeedRequest.
Expand All @@ -37,13 +61,8 @@ func NewCatchUpIterator(
ret := &CatchUpIterator{
close: closer,
}
// TODO(ssd): The withDiff option requires us to iterate over
// values arbitrarily in the past so that we can populate the
// previous value of a key. This is possible since the
// IncrementalIterator has a non-timebound iterator
// internally, but it is not yet implemented.
if useTBI && !args.WithDiff {
ret.SimpleMVCCIterator = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
if useTBI {
ret.simpleCatchupIter = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
// StartTime is exclusive but args.Timestamp
Expand All @@ -63,9 +82,10 @@ func NewCatchUpIterator(
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
})
} else {
ret.SimpleMVCCIterator = reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: args.Span.EndKey,
})
ret.simpleCatchupIter = simpleCatchupIterAdapter{SimpleMVCCIterator: iter}
}

return ret
Expand All @@ -74,7 +94,7 @@ func NewCatchUpIterator(
// Close closes the iterator and calls the instantiator-supplied close
// callback.
func (i *CatchUpIterator) Close() {
i.SimpleMVCCIterator.Close()
i.simpleCatchupIter.Close()
if i.close != nil {
i.close()
}
Expand All @@ -86,8 +106,8 @@ func (i *CatchUpIterator) Close() {
type outputEventFn func(e *roachpb.RangeFeedEvent) error

// CatchUpScan iterates over all changes for the given span of keys,
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn.
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn. catchUpTimestamp is exclusive.
func (i *CatchUpIterator) CatchUpScan(
startKey, endKey storage.MVCCKey,
catchUpTimestamp hlc.Timestamp,
Expand All @@ -107,6 +127,8 @@ func (i *CatchUpIterator) CatchUpScan(
if reorderBuf[l-1].Val.PrevValue.IsPresent() {
panic("RangeFeedValue.PrevVal unexpectedly set")
}
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l-1].Val.PrevValue.RawBytes = val
}
}
Expand Down Expand Up @@ -143,18 +165,23 @@ func (i *CatchUpIterator) CatchUpScan(
}
if !meta.IsInline() {
// This is an MVCCMetadata key for an intent. The catchUp scan
// only cares about committed values, so ignore this and skip
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
//
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
i.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
// only cares about committed values, so ignore this and skip past
// the corresponding provisional key-value. To do this, iterate to
// the provisional key-value, validate its timestamp, then iterate
// again. When using MVCCIncrementalIterator we know that the
// provisional value will also be within the time bounds so we use
// Next.
i.Next()
if ok, err := i.Valid(); err != nil {
return errors.Wrap(err, "iterating to provisional value for intent")
} else if !ok {
return errors.Errorf("expected provisional value for intent")
}
if !meta.Timestamp.ToTimestamp().EqOrdering(i.UnsafeKey().Timestamp) {
return errors.Errorf("expected provisional value for intent with ts %s, found %s",
meta.Timestamp, i.UnsafeKey().Timestamp)
}
i.Next()
continue
}

Expand Down Expand Up @@ -231,8 +258,15 @@ func (i *CatchUpIterator) CatchUpScan(
// Skip all the way to the next key.
i.NextKey()
} else {
// Move to the next version of this key.
i.Next()
// Move to the next version of this key (there may not be one, in which
// case it will move to the next key).
if withDiff {
// Need to see the next version even if it is older than the time
// bounds.
i.NextIgnoringTime()
} else {
i.Next()
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ func BenchmarkCatchUpScan(b *testing.B) {
b.Run(name, func(b *testing.B) {
for _, useTBI := range []bool{true, false} {
b.Run(fmt.Sprintf("useTBI=%v", useTBI), func(b *testing.B) {
// TODO(ssd): withDiff isn't currently supported by the TBI optimization.
for _, withDiff := range []bool{false} {
for _, withDiff := range []bool{true, false} {
b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) {
for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} {
wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1)))
Expand Down
136 changes: 136 additions & 0 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func TestCatchupScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var (
testKey1 = roachpb.Key("/db1")
testKey2 = roachpb.Key("/db2")

testValue1 = []byte("val1")
testValue2 = []byte("val2")
testValue3 = []byte("val3")
testValue4 = []byte("val4")

ts1 = hlc.Timestamp{WallTime: 1, Logical: 0}
ts2 = hlc.Timestamp{WallTime: 2, Logical: 0}
ts3 = hlc.Timestamp{WallTime: 3, Logical: 0}
ts4 = hlc.Timestamp{WallTime: 4, Logical: 0}
ts5 = hlc.Timestamp{WallTime: 4, Logical: 0}
)

makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp,
) (roachpb.Transaction, roachpb.Value) {
txnID := uuid.MakeV4()
txnMeta := enginepb.TxnMeta{
Key: key,
ID: txnID,
Epoch: 1,
WriteTimestamp: ts,
}
return roachpb.Transaction{
TxnMeta: txnMeta,
ReadTimestamp: ts,
}, roachpb.Value{
RawBytes: val,
}
}

makeKTV := func(key roachpb.Key, ts hlc.Timestamp, value []byte) storage.MVCCKeyValue {
return storage.MVCCKeyValue{Key: storage.MVCCKey{Key: key, Timestamp: ts}, Value: value}
}
// testKey1 has an intent and provisional value that will be skipped. Both
// testKey1 and testKey2 have a value that is older than what we need with
// the catchup scan, but will be read if a diff is desired.
kv1_1_1 := makeKTV(testKey1, ts1, testValue1)
kv1_2_2 := makeKTV(testKey1, ts2, testValue2)
kv1_3_3 := makeKTV(testKey1, ts3, testValue3)
kv1_4_4 := makeKTV(testKey1, ts4, testValue4)
txn, val := makeTxn(testKey1, testValue4, ts4)
kv2_1_1 := makeKTV(testKey2, ts1, testValue1)
kv2_2_2 := makeKTV(testKey2, ts2, testValue2)
kv2_5_3 := makeKTV(testKey2, ts5, testValue3)

eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
// Put with no intent.
for _, kv := range []storage.MVCCKeyValue{kv1_1_1, kv1_2_2, kv1_3_3, kv2_1_1, kv2_2_2, kv2_5_3} {
v := roachpb.Value{RawBytes: kv.Value}
if err := storage.MVCCPut(ctx, eng, nil, kv.Key.Key, kv.Key.Timestamp, v, nil); err != nil {
t.Fatal(err)
}
}
// Put with an intent.
if err := storage.MVCCPut(ctx, eng, nil, kv1_4_4.Key.Key, txn.ReadTimestamp, val, &txn); err != nil {
t.Fatal(err)
}
testutils.RunTrueAndFalse(t, "useTBI", func(t *testing.T, useTBI bool) {
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
// Inclusive, so want everything >= ts2
Timestamp: ts2,
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, useTBI, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
require.Equal(t, string(kv.Key.Key), string(event.Key))
require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp)
require.Equal(t, string(kv.Value), string(event.Value.RawBytes))
if withDiff {
// TODO(sumeer): uncomment after clarifying CatchUpScan behavior.
// require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp)
require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes))
} else {
require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp)
require.Equal(t, 0, len(event.PrevValue.RawBytes))
}
}
checkEquality(kv1_2_2, kv1_1_1, events[0])
checkEquality(kv1_3_3, kv1_2_2, events[1])
checkEquality(kv2_2_2, kv2_1_1, events[2])
checkEquality(kv2_5_3, kv2_2_2, events[3])
})
})
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIter
if iter == nil {
return nil
}
return func() *CatchUpIterator { return &CatchUpIterator{SimpleMVCCIterator: iter} }
return func() *CatchUpIterator {
return &CatchUpIterator{simpleCatchupIter: simpleCatchupIterAdapter{iter}}
}
}

func newTestRegistration(
Expand Down
Loading

0 comments on commit 4128b60

Please sign in to comment.