diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index f55f13e6bedf..f0d70f310820 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -160,9 +160,26 @@ func (i *CatchUpIterator) CatchUpScan( // If write is inline, it doesn't have a timestamp so we don't // filter on the registration's starting timestamp. Instead, we // return all inline writes. + // + // TODO(ssd): Do we want to continue to + // support inline values here at all? TBI may + // miss inline values completely and normal + // iterators may result in the rangefeed not + // seeing some intermediate values. unsafeVal = meta.RawBytes } + // Ignore the version if it's not inline and its timestamp is at + // or before the registration's (exclusive) starting timestamp. + ts := unsafeKey.Timestamp + ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts)) + if ignore && !withDiff { + // Skip all the way to the next key. + // NB: fast-path to avoid value copy when !r.withDiff. + i.NextKey() + continue + } + // Determine whether the iterator moved to a new key. sameKey := bytes.Equal(unsafeKey.Key, lastKey) if !sameKey { @@ -173,23 +190,40 @@ func (i *CatchUpIterator) CatchUpScan( a, lastKey = a.Copy(unsafeKey.Key, 0) } key := lastKey - ts := unsafeKey.Timestamp - // Ignore the version if it's not inline and its timestamp is at - // or before the registration's (exclusive) starting timestamp. - ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts)) - if ignore && !withDiff { - // Skip all the way to the next key. - // NB: fast-path to avoid value copy when !r.withDiff. - i.NextKey() - continue - } + // INVARIANT: !ignore || withDiff + // + // Cases: + // + // - !ignore: we need to copy the unsafeVal to add to + // the reorderBuf to be output eventually, + // regardless of the value of withDiff + // + // - withDiff && ignore: we need to copy the unsafeVal + // only if there is already something in the + // reorderBuf for which we need to set the previous + // value. + if !ignore || (withDiff && len(reorderBuf) > 0) { + var val []byte + a, val = a.Copy(unsafeVal, 0) + if withDiff { + // Update the last version with its + // previous value (this version). + addPrevToLastEvent(val) + } - var val []byte - a, val = a.Copy(unsafeVal, 0) - if withDiff { - // Update the last version with its previous value (this version). - addPrevToLastEvent(val) + if !ignore { + // Add value to reorderBuf to be output. + var event roachpb.RangeFeedEvent + event.MustSetValue(&roachpb.RangeFeedValue{ + Key: key, + Value: roachpb.Value{ + RawBytes: val, + Timestamp: ts, + }, + }) + reorderBuf = append(reorderBuf, event) + } } if ignore { @@ -198,16 +232,6 @@ func (i *CatchUpIterator) CatchUpScan( } else { // Move to the next version of this key. i.Next() - - var event roachpb.RangeFeedEvent - event.MustSetValue(&roachpb.RangeFeedValue{ - Key: key, - Value: roachpb.Value{ - RawBytes: val, - Timestamp: ts, - }, - }) - reorderBuf = append(reorderBuf, event) } } diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 313e87c93786..3274025ddcaa 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -40,7 +40,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) { Key: startKey, EndKey: endKey, } - // fmt.Println(eng.GetMetrics().String()) + b.ResetTimer() for i := 0; i < b.N; i++ { iter := rangefeed.NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{ @@ -91,12 +91,19 @@ func BenchmarkCatchUpScan(b *testing.B) { numKeys: numKeys, valueBytes: valueBytes, }, - // mixed-case is a middling case. We write keys in - // random order but with the timestamps that keep - // marching forward. But, we set LBaseMaxBytes rather - // low and also return a read only engine to prevent - // read-based compactions after the initial data - // generation. + // mixed-case is a middling case. + // + // This case is trying to simulate a larger store, but + // with fewer bytes. If we did not reduce + // LBaseMaxBytes, almost all data would be in Lbase or + // L6, and TBI would be ineffective. By reducing + // LBaseMaxBytes, the data should spread out over more + // levels, like in a real store. The LSM state + // depicted below shows that this was only partially + // successful. + // + // We return a read only engine to prevent read-based + // compactions after the initial data generation. // // As of 2021-08-18 data generated using these // settings looked like: @@ -238,11 +245,9 @@ func setupData( } if opts.randomKeyOrder { - // Randomize the order in which the keys are written. - for i, n := 0, len(order); i < n-1; i++ { - j := i + rng.Intn(n-i) + rng.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] - } + }) } writeKey := func(batch storage.Batch, idx int, pos int) {