Skip to content

Commit

Permalink
kvstreamer: introduce a single range fast path for request truncation
Browse files Browse the repository at this point in the history
This commit introduces a fast path to avoid the usage of the batch
truncation helper when all requests are contained within a single range.
Some modifications needed to be made to the `txnKVStreamer` so that it
didn't nil out the requests slice - we now delay that until right before
the next call to `Enqueue`.

```
ame                                  old time/op    new time/op    delta
IndexJoin/Cockroach-24                  6.21ms ± 1%    5.96ms ± 2%  -4.08%  (p=0.000 n=8+10)
IndexJoinColumnFamilies/Cockroach-24    8.97ms ± 4%    8.79ms ± 7%    ~     (p=0.190 n=10+10)

name                                  old alloc/op   new alloc/op   delta
IndexJoin/Cockroach-24                  1.39MB ± 1%    1.27MB ± 1%  -7.97%  (p=0.000 n=10+10)
IndexJoinColumnFamilies/Cockroach-24    1.46MB ± 1%    1.34MB ± 0%  -8.04%  (p=0.000 n=9+7)

name                                  old allocs/op  new allocs/op  delta
IndexJoin/Cockroach-24                   7.20k ± 1%     7.16k ± 1%  -0.61%  (p=0.022 n=10+10)
IndexJoinColumnFamilies/Cockroach-24     12.0k ± 1%     11.9k ± 0%  -0.83%  (p=0.000 n=9+8)
```

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent 7cfa5d4 commit 35e502e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
60 changes: 38 additions & 22 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,35 +503,51 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
s.mu.Unlock()
}
}()
// TODO(yuzefovich): reuse truncation helpers between different Enqueue()
// calls.
// TODO(yuzefovich): introduce a fast path when all requests are contained
// within a single range.
// The streamer can process the responses in an arbitrary order, so we don't
// require the helper to preserve the order of requests and allow it to
// reorder the reqs slice too.
const mustPreserveOrder = false
const canReorderRequestsSlice = true
truncationHelper, err := kvcoord.MakeBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
)
if err != nil {
return err
allRequestsAreWithinSingleRange := !ri.NeedAnother(rs)
var truncationHelper kvcoord.BatchTruncationHelper
if !allRequestsAreWithinSingleRange {
// We only need the truncation helper if the requests span multiple
// ranges.
//
// The streamer can process the responses in an arbitrary order, so we
// don't require the helper to preserve the order of requests and allow
// it to reorder the reqs slice too.
const mustPreserveOrder = false
const canReorderRequestsSlice = true
// TODO(yuzefovich): reuse truncation helpers between different
// Enqueue() calls.
truncationHelper, err = kvcoord.MakeBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
)
if err != nil {
return err
}
}
var reqsKeysScratch []roachpb.Key
var newNumRangesPerScanRequestMemoryUsage int64
for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
if err != nil {
return err
}
// Find all requests that touch the current range.
var singleRangeReqs []roachpb.RequestUnion
var positions []int
singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan)
if err != nil {
return err
if allRequestsAreWithinSingleRange {
// All requests are within this range, so we can just use the
// enqueued requests directly.
singleRangeReqs = reqs
positions = make([]int, len(reqs))
for i := range positions {
positions[i] = i
}
seekKey = roachpb.RKeyMax
} else {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
if err != nil {
return err
}
singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan)
if err != nil {
return err
}
}
rs.Key = seekKey
var subRequestIdx []int32
Expand Down
26 changes: 18 additions & 8 deletions pkg/sql/row/kv_batch_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,30 @@ func (f *txnKVStreamer) SetupNextFetch(
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "Scan %s", spans)
}
reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, f.reqsScratch)
// Make sure to nil out the requests past the length that will be used in
// spansToRequests so that we lose references to the underlying Get and Scan
// requests (which could keep large byte slices alive) from the previous
// iteration.
//
// Note that we could not do this nil-ing out after Enqueue() returned on
// the previous iteration because in some cases the streamer will hold on to
// the slice (which is the case when the requests are contained within a
// single range). At the same time we don't want to push the responsibility
// of nil-ing the slice out because we (i.e. the txnKVStreamer) are the ones
// that keep the slice for reuse, and the streamer doesn't know anything
// about the slice reuse.
reqsScratch := f.reqsScratch[:cap(f.reqsScratch)]
for i := len(spans); i < len(reqsScratch); i++ {
reqsScratch[i] = roachpb.RequestUnion{}
}
reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, reqsScratch)
if err := f.streamer.Enqueue(ctx, reqs); err != nil {
return err
}
f.spans = spans
f.spanIDs = spanIDs
// Keep the reference to the requests slice in order to reuse in the future
// after making sure to nil out the requests in order to lose references to
// the underlying Get and Scan requests which could keep large byte slices
// alive.
// Keep the reference to the requests slice in order to reuse in the future.
f.reqsScratch = reqs
for i := range f.reqsScratch {
f.reqsScratch[i] = roachpb.RequestUnion{}
}
reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch))
return f.acc.ResizeTo(ctx, reqsScratchMemUsage)
}
Expand Down

0 comments on commit 35e502e

Please sign in to comment.