Skip to content

Commit

Permalink
kvserver: avoid unnecessary copies in rangefeed catchup-scan
Browse files Browse the repository at this point in the history
This avoids two copies in the catchup-scan code.  In the best case,
this provides a nice win even without TBI enabled:

```
name                                                               old time/op    new time/op    delta
CatchupScan/linear-keys/useTBI=false/withDiff=true/perc=99.00-16      316ms ± 1%     261ms ± 2%  -17.17%  (p=0.000 n=23+24)
CatchupScan/linear-keys/useTBI=false/withDiff=false/perc=99.00-16     257ms ± 2%     226ms ± 3%  -12.05%  (p=0.000 n=24+23)

name                                                               old alloc/op   new alloc/op   delta
CatchupScan/linear-keys/useTBI=false/withDiff=true/perc=99.00-16     79.0MB ± 0%    10.4MB ± 0%  -86.89%  (p=0.000 n=24+25)
CatchupScan/linear-keys/useTBI=false/withDiff=false/perc=99.00-16    10.4MB ± 0%     2.5MB ± 0%  -76.03%  (p=0.000 n=20+25)

name                                                               old allocs/op  new allocs/op  delta
CatchupScan/linear-keys/useTBI=false/withDiff=true/perc=99.00-16      35.8k ± 0%     31.4k ± 0%  -12.20%  (p=0.000 n=22+25)
CatchupScan/linear-keys/useTBI=false/withDiff=false/perc=99.00-16     31.4k ± 0%     30.8k ± 0%   -2.04%  (p=0.000 n=25+25)
```

Release justification: Performance improvement for catchup scans helps
address the cause of recent high-priority customer incidents.

Release note: None
  • Loading branch information
stevendanna committed Aug 31, 2021
1 parent 87c7f11 commit c6c9290
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
74 changes: 49 additions & 25 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,26 @@ func (i *CatchUpIterator) CatchUpScan(
// If write is inline, it doesn't have a timestamp so we don't
// filter on the registration's starting timestamp. Instead, we
// return all inline writes.
//
// TODO(ssd): Do we want to continue to
// support inline values here at all? TBI may
// miss inline values completely and normal
// iterators may result in the rangefeed not
// seeing some intermediate values.
unsafeVal = meta.RawBytes
}

// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ts := unsafeKey.Timestamp
ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts))
if ignore && !withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
i.NextKey()
continue
}

// Determine whether the iterator moved to a new key.
sameKey := bytes.Equal(unsafeKey.Key, lastKey)
if !sameKey {
Expand All @@ -173,23 +190,40 @@ func (i *CatchUpIterator) CatchUpScan(
a, lastKey = a.Copy(unsafeKey.Key, 0)
}
key := lastKey
ts := unsafeKey.Timestamp

// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ignore := !(ts.IsEmpty() || catchUpTimestamp.Less(ts))
if ignore && !withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
i.NextKey()
continue
}
// INVARIANT: !ignore || withDiff
//
// Cases:
//
// - !ignore: we need to copy the unsafeVal to add to
// the reorderBuf to be output eventually,
// regardless of the value of withDiff
//
// - withDiff && ignore: we need to copy the unsafeVal
// only if there is already something in the
// reorderBuf for which we need to set the previous
// value.
if !ignore || (withDiff && len(reorderBuf) > 0) {
var val []byte
a, val = a.Copy(unsafeVal, 0)
if withDiff {
// Update the last version with its
// previous value (this version).
addPrevToLastEvent(val)
}

var val []byte
a, val = a.Copy(unsafeVal, 0)
if withDiff {
// Update the last version with its previous value (this version).
addPrevToLastEvent(val)
if !ignore {
// Add value to reorderBuf to be output.
var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Key: key,
Value: roachpb.Value{
RawBytes: val,
Timestamp: ts,
},
})
reorderBuf = append(reorderBuf, event)
}
}

if ignore {
Expand All @@ -198,16 +232,6 @@ func (i *CatchUpIterator) CatchUpScan(
} else {
// Move to the next version of this key.
i.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Key: key,
Value: roachpb.Value{
RawBytes: val,
Timestamp: ts,
},
})
reorderBuf = append(reorderBuf, event)
}
}

Expand Down
27 changes: 16 additions & 11 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) {
Key: startKey,
EndKey: endKey,
}
// fmt.Println(eng.GetMetrics().String())

b.ResetTimer()
for i := 0; i < b.N; i++ {
iter := rangefeed.NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Expand Down Expand Up @@ -91,12 +91,19 @@ func BenchmarkCatchUpScan(b *testing.B) {
numKeys: numKeys,
valueBytes: valueBytes,
},
// mixed-case is a middling case. We write keys in
// random order but with the timestamps that keep
// marching forward. But, we set LBaseMaxBytes rather
// low and also return a read only engine to prevent
// read-based compactions after the initial data
// generation.
// mixed-case is a middling case.
//
// This case is trying to simulate a larger store, but
// with fewer bytes. If we did not reduce
// LBaseMaxBytes, almost all data would be in Lbase or
// L6, and TBI would be ineffective. By reducing
// LBaseMaxBytes, the data should spread out over more
// levels, like in a real store. The LSM state
// depicted below shows that this was only partially
// successful.
//
// We return a read only engine to prevent read-based
// compactions after the initial data generation.
//
// As of 2021-08-18 data generated using these
// settings looked like:
Expand Down Expand Up @@ -238,11 +245,9 @@ func setupData(
}

if opts.randomKeyOrder {
// Randomize the order in which the keys are written.
for i, n := 0, len(order); i < n-1; i++ {
j := i + rng.Intn(n-i)
rng.Shuffle(len(order), func(i, j int) {
order[i], order[j] = order[j], order[i]
}
})
}

writeKey := func(batch storage.Batch, idx int, pos int) {
Expand Down

0 comments on commit c6c9290

Please sign in to comment.