From 067db690a82e32450f961dd02be43765fa21b5fd Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 13 Jun 2022 19:46:00 -0700 Subject: [PATCH] kvcoord: optimize batch truncation loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit optimizes the batch truncation loop for the case when the requests only use global keys. The optimized approach sorts the requests according to their start keys (with the Ascending scan direction) or in the reverse order of their end keys (with the Descending scan direction). Then on each `Truncate` call it looks only at a subset of the requests (that haven't been fully processed yet and don't start after the current range), allowing us to avoid many unnecessary key comparisons. Please see a comment on the new `truncateAsc` and `truncateDesc` functions for more details and examples. The optimized approach actually has a worse time complexity in the absolute worst case (when each request is a range-spanning one and actually spans all of the ranges against which the requests are truncated) - because of the need to sort the requests upfront - but in practice, it is much faster, especially with point requests. ``` name old time/op new time/op delta TruncateLoop/asc/reqs=128/ranges=4/type=get-24 59.8µs ± 1% 33.9µs ± 3% -43.38% (p=0.000 n=10+10) TruncateLoop/asc/reqs=128/ranges=4/type=scan-24 83.0µs ± 4% 69.7µs ± 7% -15.98% (p=0.000 n=10+10) TruncateLoop/asc/reqs=128/ranges=64/type=get-24 865µs ± 1% 62µs ± 1% -92.84% (p=0.000 n=10+10) TruncateLoop/asc/reqs=128/ranges=64/type=scan-24 1.07ms ± 5% 0.46ms ± 8% -56.95% (p=0.000 n=10+10) TruncateLoop/asc/reqs=16384/ranges=4/type=get-24 7.09ms ± 0% 5.99ms ± 1% -15.56% (p=0.000 n=10+9) TruncateLoop/asc/reqs=16384/ranges=4/type=scan-24 9.22ms ± 0% 10.52ms ± 1% +14.08% (p=0.000 n=9+10) TruncateLoop/asc/reqs=16384/ranges=64/type=get-24 108ms ± 0% 6ms ± 2% -94.50% (p=0.000 n=8+10) TruncateLoop/asc/reqs=16384/ranges=64/type=scan-24 129ms ± 1% 71ms ± 1% -45.06% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=get-24 60.2µs ± 1% 36.0µs ± 2% -40.14% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=scan-24 82.4µs ± 8% 72.1µs ± 4% -12.51% (p=0.000 n=10+9) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=get-24 862µs ± 1% 72µs ± 1% -91.65% (p=0.000 n=10+9) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=scan-24 1.06ms ± 4% 0.49ms ± 8% -53.39% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=get-24 7.24ms ± 0% 6.46ms ± 1% -10.74% (p=0.000 n=10+9) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=scan-24 10.1ms ± 2% 9.8ms ± 2% -2.36% (p=0.000 n=10+9) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=get-24 107ms ± 0% 7ms ± 0% -93.57% (p=0.000 n=8+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=scan-24 122ms ± 0% 80ms ± 1% -34.32% (p=0.000 n=9+9) TruncateLoop/desc/reqs=128/ranges=4/type=get-24 78.9µs ± 1% 36.4µs ± 3% -53.81% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 79.4µs ± 4% 52.3µs ± 5% -34.14% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 1.16ms ± 1% 0.07ms ± 1% -94.39% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 1.01ms ± 4% 0.46ms ± 5% -54.56% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 9.42ms ± 0% 6.26ms ± 1% -33.52% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 8.41ms ± 1% 9.22ms ± 1% +9.54% (p=0.000 n=10+9) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 145ms ± 0% 6ms ± 1% -95.63% (p=0.000 n=9+9) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 125ms ± 1% 67ms ± 1% -46.31% (p=0.000 n=9+9) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=get-24 77.8µs ± 1% 39.6µs ± 2% -49.10% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=scan-24 74.0µs ± 3% 63.0µs ± 6% -14.92% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=get-24 1.16ms ± 1% 0.08ms ± 1% -93.47% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=scan-24 1.04ms ± 5% 0.47ms ± 7% -54.65% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=get-24 9.50ms ± 0% 6.73ms ± 1% -29.21% (p=0.000 n=9+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=scan-24 8.88ms ± 1% 13.24ms ± 1% +49.04% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=get-24 146ms ± 0% 7ms ± 1% -94.98% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=scan-24 125ms ± 1% 74ms ± 1% -40.75% (p=0.000 n=10+9) name old alloc/op new alloc/op delta TruncateLoop/asc/reqs=128/ranges=4/type=get-24 7.58kB ± 0% 21.00kB ±11% +176.84% (p=0.000 n=7+10) TruncateLoop/asc/reqs=128/ranges=4/type=scan-24 39.8kB ± 6% 49.1kB ±15% +23.46% (p=0.000 n=9+10) TruncateLoop/asc/reqs=128/ranges=64/type=get-24 6.48kB ± 5% 18.25kB ± 2% +181.79% (p=0.000 n=10+9) TruncateLoop/asc/reqs=128/ranges=64/type=scan-24 428kB ±20% 368kB ±13% -13.85% (p=0.003 n=10+10) TruncateLoop/asc/reqs=16384/ranges=4/type=get-24 1.60MB ± 0% 2.91MB ± 0% +82.49% (p=0.000 n=8+10) TruncateLoop/asc/reqs=16384/ranges=4/type=scan-24 5.24MB ± 0% 5.89MB ± 0% +12.41% (p=0.000 n=10+8) TruncateLoop/asc/reqs=16384/ranges=64/type=get-24 1.15MB ± 4% 2.41MB ± 1% +110.09% (p=0.000 n=10+10) TruncateLoop/asc/reqs=16384/ranges=64/type=scan-24 69.8MB ± 1% 64.6MB ± 1% -7.55% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=get-24 9.77kB ±22% 21.98kB ± 4% +125.07% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=scan-24 38.9kB ±23% 49.9kB ± 2% +28.28% (p=0.000 n=10+8) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=get-24 6.56kB ± 4% 20.20kB ± 3% +208.11% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=scan-24 407kB ±15% 372kB ±13% -8.68% (p=0.043 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=get-24 1.65MB ± 0% 3.62MB ± 0% +118.55% (p=0.000 n=8+8) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=scan-24 6.60MB ± 2% 5.65MB ± 1% -14.38% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=get-24 1.10MB ± 5% 2.77MB ± 1% +152.58% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=scan-24 60.5MB ± 1% 67.9MB ± 1% +12.19% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=get-24 17.2kB ±10% 21.5kB ± 1% +24.91% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 35.5kB ±12% 29.1kB ± 4% -17.83% (p=0.000 n=10+9) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 138kB ± 1% 20kB ± 3% -85.34% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 344kB ±15% 363kB ±10% +5.50% (p=0.035 n=10+10) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 2.78MB ± 0% 3.35MB ± 0% +20.24% (p=0.000 n=8+10) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 4.42MB ± 6% 5.29MB ± 0% +19.67% (p=0.000 n=10+8) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 17.9MB ± 0% 2.7MB ± 2% -85.21% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 65.3MB ± 0% 61.0MB ± 1% -6.65% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=get-24 15.9kB ± 3% 26.7kB ± 1% +67.87% (p=0.000 n=10+9) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=scan-24 29.4kB ± 6% 41.6kB ± 5% +41.50% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=get-24 138kB ± 0% 23kB ± 3% -83.61% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=scan-24 390kB ±19% 350kB ±11% -10.16% (p=0.015 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=get-24 2.69MB ± 4% 3.51MB ± 1% +30.22% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=scan-24 4.89MB ± 1% 8.19MB ± 0% +67.68% (p=0.000 n=10+9) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=get-24 17.9MB ± 0% 3.0MB ± 2% -83.34% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=scan-24 65.4MB ± 1% 62.9MB ± 1% -3.81% (p=0.000 n=10+10) name old allocs/op new allocs/op delta TruncateLoop/asc/reqs=128/ranges=4/type=get-24 50.0 ± 0% 52.4 ±10% +4.80% (p=0.017 n=8+10) TruncateLoop/asc/reqs=128/ranges=4/type=scan-24 569 ±12% 557 ±15% ~ (p=0.617 n=10+10) TruncateLoop/asc/reqs=128/ranges=64/type=get-24 207 ± 4% 210 ± 5% ~ (p=0.380 n=10+10) TruncateLoop/asc/reqs=128/ranges=64/type=scan-24 6.97k ±13% 6.02k ± 9% -13.64% (p=0.000 n=10+10) TruncateLoop/asc/reqs=16384/ranges=4/type=get-24 126 ± 0% 122 ± 0% -3.17% (p=0.002 n=8+10) TruncateLoop/asc/reqs=16384/ranges=4/type=scan-24 51.9k ± 1% 42.8k ± 1% -17.59% (p=0.000 n=10+9) TruncateLoop/asc/reqs=16384/ranges=64/type=get-24 1.12k ± 1% 1.12k ± 1% +0.43% (p=0.027 n=10+10) TruncateLoop/asc/reqs=16384/ranges=64/type=scan-24 786k ± 1% 714k ± 1% -9.13% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=get-24 41.2 ± 3% 58.0 ± 3% +40.78% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=4/type=scan-24 574 ±18% 532 ±11% ~ (p=0.143 n=10+10) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=get-24 205 ± 2% 234 ± 3% +14.26% (p=0.000 n=9+9) TruncateLoop/asc/preserveOrder/reqs=128/ranges=64/type=scan-24 6.70k ± 9% 6.00k ± 9% -10.40% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=get-24 127 ± 0% 125 ± 0% -1.57% (p=0.001 n=8+9) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=4/type=scan-24 63.7k ± 1% 27.8k ± 2% -56.34% (p=0.000 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=get-24 1.14k ± 0% 1.14k ± 1% ~ (p=0.515 n=10+10) TruncateLoop/asc/preserveOrder/reqs=16384/ranges=64/type=scan-24 696k ± 1% 752k ± 1% +7.97% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=get-24 554 ± 1% 169 ± 2% -69.52% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 519 ± 9% 268 ± 9% -48.32% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 8.38k ± 0% 0.33k ± 3% -96.06% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 5.90k ±10% 5.89k ± 5% ~ (p=0.796 n=10+10) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 65.7k ± 0% 16.5k ± 0% -74.87% (p=0.002 n=8+10) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 39.5k ± 1% 27.8k ± 2% -29.54% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 1.05M ± 0% 0.02M ± 0% -98.33% (p=0.000 n=9+10) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 741k ± 0% 679k ± 1% -8.32% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=get-24 559 ± 0% 182 ± 2% -67.42% (p=0.000 n=9+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=4/type=scan-24 438 ± 8% 404 ±13% -7.76% (p=0.014 n=9+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=get-24 8.39k ± 0% 0.36k ± 5% -95.75% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=128/ranges=64/type=scan-24 6.38k ±11% 5.56k ± 9% -12.85% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=get-24 65.7k ± 0% 16.5k ± 0% -74.84% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=4/type=scan-24 46.8k ± 1% 67.6k ± 1% +44.65% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=get-24 1.05M ± 0% 0.02M ± 0% -98.33% (p=0.000 n=10+10) TruncateLoop/desc/preserveOrder/reqs=16384/ranges=64/type=scan-24 739k ± 1% 694k ± 1% -6.08% (p=0.000 n=10+10) ``` The truncation loops for the optimized strategy are very similar in both directions, so I tried to extract the differences out into an interface. However, this showed non-trivial slow down and increase in allocations, so I chose to have some duplicated code to get the best performance. Here is a snippet of the comparison when the interface was prototyped: ``` name old time/op new time/op delta TruncateLoop/desc/reqs=128/ranges=4/type=get-24 36.9µs ± 3% 44.5µs ± 3% +20.55% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 74.8µs ± 5% 88.8µs ± 3% +18.73% (p=0.000 n=9+10) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 64.9µs ± 1% 78.3µs ± 1% +20.72% (p=0.000 n=10+9) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 471µs ± 8% 682µs ±13% +44.73% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 6.34ms ± 1% 7.39ms ± 0% +16.47% (p=0.000 n=10+9) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 11.2ms ± 1% 12.4ms ± 1% +10.36% (p=0.000 n=10+9) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 6.40ms ± 2% 7.39ms ± 1% +15.47% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 70.9ms ± 1% 102.0ms ± 2% +43.87% (p=0.000 n=9+9) name old alloc/op new alloc/op delta TruncateLoop/desc/reqs=128/ranges=4/type=get-24 22.2kB ± 9% 30.4kB ± 0% +36.55% (p=0.000 n=10+7) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 52.2kB ± 5% 67.6kB ± 4% +29.47% (p=0.000 n=8+10) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 20.0kB ± 2% 32.2kB ± 1% +60.86% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 372kB ±13% 600kB ±10% +61.29% (p=0.000 n=10+8) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 3.24MB ± 0% 4.45MB ± 0% +37.42% (p=0.000 n=8+7) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 6.61MB ± 0% 7.86MB ± 0% +18.90% (p=0.000 n=10+9) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 2.75MB ± 2% 3.74MB ± 1% +36.03% (p=0.000 n=10+10) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 65.7MB ± 1% 97.2MB ± 1% +47.95% (p=0.000 n=10+10) name old allocs/op new allocs/op delta TruncateLoop/desc/reqs=128/ranges=4/type=get-24 177 ± 2% 314 ± 0% +77.40% (p=0.000 n=10+8) TruncateLoop/desc/reqs=128/ranges=4/type=scan-24 597 ± 8% 847 ± 8% +41.89% (p=0.000 n=9+10) TruncateLoop/desc/reqs=128/ranges=64/type=get-24 329 ± 3% 531 ± 2% +61.40% (p=0.000 n=10+10) TruncateLoop/desc/reqs=128/ranges=64/type=scan-24 6.02k ± 9% 9.56k ± 6% +58.80% (p=0.000 n=10+8) TruncateLoop/desc/reqs=16384/ranges=4/type=get-24 16.5k ± 0% 32.9k ± 0% +99.17% (p=0.000 n=8+8) TruncateLoop/desc/reqs=16384/ranges=4/type=scan-24 53.5k ± 1% 73.1k ± 1% +36.69% (p=0.000 n=10+9) TruncateLoop/desc/reqs=16384/ranges=64/type=get-24 17.5k ± 0% 33.9k ± 0% +94.02% (p=0.000 n=6+10) TruncateLoop/desc/reqs=16384/ranges=64/type=scan-24 727k ± 1% 1194k ± 1% +64.31% (p=0.000 n=10+10) ``` An additional knob is introduced to the batch truncation helper to indicate whether the helper can take ownership of the passed-in requests slice and reorder it as it pleases. This is the case for the Streamer, but the DistSender relies on the order of requests not being modified, so the helper makes a copy of the slice. Some tests needed an adjustment since now we process requests not necessarily in the original order, so the population of ResumeSpans might be different. Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + pkg/kv/kvclient/kvcoord/batch.go | 651 +++++++++++++++++- pkg/kv/kvclient/kvcoord/batch_test.go | 222 +++--- pkg/kv/kvclient/kvcoord/dist_sender.go | 10 +- .../kvcoord/dist_sender_server_test.go | 52 +- pkg/kv/kvclient/kvstreamer/streamer.go | 8 +- 6 files changed, 819 insertions(+), 125 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8ed46a6818ab..1ccded24e24b 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/storage/enginepb", "//pkg/util", + "//pkg/util/buildutil", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/envutil", diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index afe5ce16d30b..a70006af3873 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -11,8 +11,11 @@ package kvcoord import ( + "sort" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) @@ -41,12 +44,114 @@ import ( // ri.Seek(scanDir, seekKey) // } // +// The helper utilizes two different strategies depending on whether the +// requests use local keys or not: +// +// - a "legacy" strategy is used when requests use local keys. This strategy +// utilizes "legacy" methods that operate on the original requests without +// keeping any additional bookkeeping. In particular, it leads to truncating +// already processed requests as well as to iterating over the fully processed +// requests when searching for the next seek key. +// +// - an "optimized" strategy is used when requests only use global keys. +// Although this strategy has the same worst-case complexity of O(N * R) as +// the "legacy" strategy (where N is the number of requests, R is the number +// of ranges that all requests fit int, the worst-case is achieved when all +// requests are range-spanning and each request spans all R ranges), in +// practice it is much faster. See the comments on truncateAsc() and +// truncateDesc() for the details. +// +// The gist of the optimized strategy is sorting all of the requests according +// to the keys upfront and then, on each truncation iteration, examining only a +// subset of requests that might overlap with the current range boundaries. The +// strategy is careful to update the internal state so that the ordering of +// unprocessed requests is maintained. +// +// The key insight is best shown by an example. Imagine that we have two +// requests Scan(a, c) and Scan(b, d) (which are ordered according to start keys +// with the Ascending scan direction), and on the first iteration we're +// truncating to range [a, b). Only the first request overlaps with the range, +// so we include Scan(a, b) into the truncation response, and, crucially, we can +// update the header of the first request to be [b, c) to track the remaining +// part of the first request. We can update the header in-place without breaking +// the ordering property. type BatchTruncationHelper struct { - scanDir ScanDirection + scanDir ScanDirection + // requests are the original requests this helper needs to process (possibly + // in non-original order). requests []roachpb.RequestUnion // mustPreserveOrder indicates whether the requests must be returned by // Truncate() in the original order. mustPreserveOrder bool + // foundLocalKey, if true, indicates whether some of the requests reference + // the local keys. When true, the helper falls back to the legacy methods. + foundLocalKey bool + + // Fields below are only used if the optimized strategy is used. + + // headers contains the parts of the corresponding requests that have not + // been processed yet. For range-spanning requests: + // - with the Ascending direction, we're advancing the start key of the + // header once a prefix of the request is returned by Truncate(); + // - with the Descending direction, we're moving the end key of the header + // backwards once a suffix of the request is returned by Truncate(). We also + // ensure that all requests have an end key (meaning that for point requests + // as well as range-spanning requests we populate the end key as + // startKey.Next()). + // + // All keys in the headers are global. + headers []roachpb.RequestHeader + // positions stores the corresponding indices of requests in the original + // requests slice. Once request is fully processed, it's position value + // becomes negative. + positions []int + // isRange indicates whether the corresponding request is a range-spanning + // one. + isRange []bool + // startIdx tracks the "smallest" request (according to the order of the + // original start keys) that might not have been fully processed. In other + // words, all requests in range [0, startIdx) have negative positions + // values. + startIdx int + // helper is only initialized and used if mustPreserveOrder is true. + helper orderRestorationHelper +} + +// Len implements the sort.Interface interface. +func (h *BatchTruncationHelper) Len() int { + return len(h.requests) +} + +// Swap implements the sort.Interface interface. +func (h *BatchTruncationHelper) Swap(i, j int) { + h.requests[i], h.requests[j] = h.requests[j], h.requests[i] + h.headers[i], h.headers[j] = h.headers[j], h.headers[i] + h.positions[i], h.positions[j] = h.positions[j], h.positions[i] + h.isRange[i], h.isRange[j] = h.isRange[j], h.isRange[i] +} + +// ascBatchTruncationHelper is used for the Ascending scan direction in order to +// sort the requests in the ascending order of the start keys. +type ascBatchTruncationHelper struct { + *BatchTruncationHelper +} + +var _ sort.Interface = ascBatchTruncationHelper{} + +func (h ascBatchTruncationHelper) Less(i, j int) bool { + return h.headers[i].Key.Compare(h.headers[j].Key) < 0 +} + +// descBatchTruncationHelper is used for the Descending scan direction in order +// to sort the requests in the descending order of the end keys. +type descBatchTruncationHelper struct { + *BatchTruncationHelper +} + +var _ sort.Interface = descBatchTruncationHelper{} + +func (h descBatchTruncationHelper) Less(i, j int) bool { + return h.headers[i].EndKey.Compare(h.headers[j].EndKey) > 0 } // MakeBatchTruncationHelper returns a new BatchTruncationHelper for the given @@ -55,13 +160,72 @@ type BatchTruncationHelper struct { // mustPreserveOrder, if true, indicates that the caller requires that requests // are returned by Truncate() in the original order (i.e. with strictly // increasing positions values). +// +// If canReorderRequestsSlice is true, then the helper will hold on to the given +// slice and might reorder the requests within it (although each request will +// not be modified "deeply" - i.e. its header won't be updated or anything like +// that). Set it to false when the caller cares about the slice not being +// mutated in any way. func MakeBatchTruncationHelper( - scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool, + scanDir ScanDirection, + requests []roachpb.RequestUnion, + mustPreserveOrder bool, + canReorderRequestsSlice bool, ) (BatchTruncationHelper, error) { var ret BatchTruncationHelper ret.scanDir = scanDir ret.requests = requests ret.mustPreserveOrder = mustPreserveOrder + // Determine whether we can use the optimized strategy before making any + // allocations. + for i := range requests { + header := requests[i].GetInner().Header() + if keys.IsLocal(header.Key) { + ret.foundLocalKey = true + return ret, nil + } + } + // We can use the optimized strategy, so set up all of the internal state. + if !canReorderRequestsSlice { + // If we can't reorder the original requests slice, we must make a copy. + ret.requests = make([]roachpb.RequestUnion, len(requests)) + copy(ret.requests, requests) + } + ret.headers = make([]roachpb.RequestHeader, len(requests)) + ret.positions = make([]int, len(requests)) + ret.isRange = make([]bool, len(requests)) + // Populate the internal state as well as perform some sanity checks on the + // requests. + for i := range requests { + req := requests[i].GetInner() + ret.headers[i] = req.Header() + ret.positions[i] = i + ret.isRange[i] = roachpb.IsRange(req) + if ret.isRange[i] { + // We're dealing with a range-spanning request. + if l, r := keys.IsLocal(ret.headers[i].Key), keys.IsLocal(ret.headers[i].EndKey); (l && !r) || (!l && r) { + return BatchTruncationHelper{}, errors.AssertionFailedf("local key mixed with global key in range") + } + } else if len(ret.headers[i].EndKey) > 0 { + return BatchTruncationHelper{}, errors.AssertionFailedf("%T is not a range command, but EndKey is set", req) + } + } + if scanDir == Ascending { + sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: &ret}) + } else { + // With the Descending scan direction, we have to convert all point + // requests into range-spanning requests that include only a single + // point. + for i := range ret.headers { + if len(ret.headers[i].EndKey) == 0 { + ret.headers[i].EndKey = ret.headers[i].Key.Next() + } + } + sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: &ret}) + } + if ret.mustPreserveOrder { + ret.helper.init(len(requests)) + } return ret, nil } @@ -94,9 +258,26 @@ func MakeBatchTruncationHelper( func (h *BatchTruncationHelper) Truncate( rs roachpb.RSpan, ) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) { - truncReqs, positions, err := truncateLegacy(h.requests, rs) - if err != nil { - return nil, nil, nil, err + var truncReqs []roachpb.RequestUnion + var positions []int + var err error + if !h.foundLocalKey { + if h.scanDir == Ascending { + truncReqs, positions, err = h.truncateAsc(rs) + } else { + truncReqs, positions, err = h.truncateDesc(rs) + } + if err != nil { + return nil, nil, nil, err + } + if h.mustPreserveOrder { + truncReqs, positions = h.helper.restoreOrder(truncReqs, positions) + } + } else { + truncReqs, positions, err = truncateLegacy(h.requests, rs) + if err != nil { + return nil, nil, nil, err + } } var seekKey roachpb.RKey if h.scanDir == Ascending { @@ -107,17 +288,327 @@ func (h *BatchTruncationHelper) Truncate( // one, and unless both descriptors are stale, the next descriptor's // StartKey would move us to the beginning of the current range, // resulting in a duplicate scan. - seekKey, err = nextLegacy(h.requests, rs.EndKey) + seekKey, err = h.next(rs.EndKey) } else { // In next iteration, query previous range. // We use the StartKey of the current descriptor as opposed to the // EndKey of the previous one since that doesn't have bugs when // stale descriptors come into play. - seekKey, err = prevLegacy(h.requests, rs.Key) + seekKey, err = h.prev(rs.Key) } return truncReqs, positions, seekKey, err } +// truncateAsc is the optimized strategy for Truncate() with the Ascending scan +// direction when requests only use global keys. +// +// The first step of this strategy is to reorder all requests according to their +// start keys (this is done in Init()). Then, on every call to truncateAsc(), we +// only look at a subset of original requests that might overlap with rs and +// ignore already processed requests entirely. +// +// Let's go through an example. Say we have seven original requests: +// +// requests : Scan(i, l), Get(d), Scan(h, k), Scan(g, i), Get(i), Scan(d, f), Scan(b, h) +// positions: 0 1 2 3 4 5 6 +// +// as well three ranges to iterate over: +// +// ranges: range[a, e), range[e, i), range[i, m). +// +// In Init(), we have reordered the requests according to their start keys: +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: 6 1 5 3 2 4 0 +// headers : [b, h) [d) [d, f) [g, i) [h, k) [i) [i, l) +// +// On the first call to Truncate(), we're using the range [a, e). We only need +// to look at the first four requests since the fourth request starts after the +// end of the current range, and, due to the ordering, all following requests +// too. We truncate first three requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 2nd request Get(d) as fully processed. +// +// The first call prepares +// truncReqs = [Scan(b, e), Get(d), Scan(d, e)], positions = [6, 1, 5] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: 6 -1 5 3 2 4 0 +// headers : [e, h) [e, f) [g, i) [h, k) [i) [i, l) +// +// Then the optimized next() function determines the seekKey as 'e' and keeps +// the startIdx at 0. +// +// On the second call to Truncate(), we're using the range [e, i). We only need +// to look at the first six requests since the sixth request starts after the +// end of the current range, and, due to the ordering, all following requests +// too. We truncate first five requests to the range boundaries (skipping the +// second that has been fully processed already), update the headers to refer to +// the unprocessed parts of the corresponding requests, and mark the 1st, the +// 3rd, and the 4th requests as fully processed. +// +// The second call prepares +// truncReqs = [Scan(e, h), Scan(e, f), Scan(g, i), Scan(h, i)], positions = [6, 5, 3, 2] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: -1 -1 -1 -1 2 4 0 +// headers : [i, k) [i) [i, l) +// +// Then the optimized next() function determines the seekKey as 'i' and sets +// the startIdx at 4 (meaning that all first four requests have been fully +// processed). +// +// On the third call to Truncate(), we're using the range [i, m). We only look +// at the 5th, 6th, and 7th requests (because of the value of startIdx). All +// requests are contained within the range, so we include them into the return +// value and mark all of them as processed. +// +// The third call prepares +// truncReqs = [Scan(i, k), Get(i), Scan(i, l)], positions = [2, 4, 0] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: -1 -1 -1 -1 -1 -1 -1 +// headers : +// +// Then the optimized next() function determines the seekKey as KeyMax and sets +// the startIdx at 7 (meaning that all requests have been fully processed), and +// we're done. +// +// NOTE: for all requests, headers always keep track of the unprocessed part of +// the request and is such that the ordering of the keys in the headers is +// preserved when the requests are truncated. +// +// Note that this function is very similar to truncateDesc(), and we could +// extract out the differences into an interface; however, this leads to +// non-trivial slowdown and increase in allocations, so we choose to duplicate +// the code for performance. +func (h *BatchTruncationHelper) truncateAsc( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, error) { + var truncReqs []roachpb.RequestUnion + var positions []int + for i := h.startIdx; i < len(h.positions); i++ { + pos := h.positions[i] + if pos < 0 { + // This request has already been fully processed, so there is no + // need to look at it. + continue + } + header := h.headers[i] + // rs.EndKey can't be local because it contains range split points, + // which are never local. + ek := rs.EndKey.AsRawKey() + if ek.Compare(header.Key) <= 0 { + // All of the remaining requests start after this range, so we're + // done. + break + } + if !h.isRange[i] { + // This is a point request, and the key is contained within this + // range, so we include the request as is and mark it as "fully + // processed". + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + continue + } + // We're dealing with a range-spanning request. + if buildutil.CrdbTestBuild { + // rs.Key can't be local because it contains range split points, + // which are never local. + if header.Key.Compare(rs.Key.AsRawKey()) < 0 { + return nil, nil, errors.AssertionFailedf( + "unexpectedly header.Key %s is less than rs %s", header.Key, rs, + ) + } + } + inner := h.requests[i].GetInner() + if header.EndKey.Compare(ek) <= 0 { + // This is the last part of this request since it is fully contained + // within this range, so we mark the request as "fully processed". + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + if origStartKey := inner.Header().Key; origStartKey.Equal(header.Key) { + // This range-spanning request fits within a single range, so we + // can just use the original request. + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + continue + } + } else { + header.EndKey = ek + // Adjust the start key of the header so that it contained only the + // unprocessed suffix of the request. + h.headers[i].Key = header.EndKey + } + shallowCopy := inner.ShallowCopy() + shallowCopy.SetHeader(header) + truncReqs = append(truncReqs, roachpb.RequestUnion{}) + truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy) + positions = append(positions, pos) + } + return truncReqs, positions, nil +} + +// truncateDesc is the optimized strategy for Truncate() with the Descending +// scan direction when requests only use global keys. +// +// The first step of this strategy is to reorder all requests according to their +// end keys with the descending direction (this is done in Init()). Then, on +// every call to truncateDesc(), we only look at a subset of original requests +// that might overlap with rs and ignore already processed requests entirely. +// +// Let's go through an example. Say we have seven original requests: +// +// requests : Scan(i, l), Get(d), Scan(h, k), Scan(g, i), Get(i), Scan(d, f), Scan(b, h) +// positions: 0 1 2 3 4 5 6 +// +// as well three ranges to iterate over: +// +// ranges: range[i, m), range[e, i), range[a, e). +// +// In Init(), we have reordered the requests according to their end keys with +// the descending direction (below, i' denotes Key("i").Next()): +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: 0 2 4 3 6 5 1 +// headers : [i, l) [h, k) [i, i') [g, i) [b, h) [d, f) [d, d') +// +// On the first call to Truncate(), we're using the range [i, m). We only need +// to look at the first four requests since the fourth request ends before the +// start of the current range, and, due to the ordering, all following requests +// too. We truncate first three requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 1st and the 3rd requests as fully processed. +// +// The first call prepares +// truncReqs = [Scan(i, l), Scan(i, k), Get(i)], positions = [0, 2, 4] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 2 -1 3 6 5 1 +// headers : [h, i) [g, i) [b, h) [d, f) [d, d') +// +// Then the optimized prev() function determines the seekKey as 'i' and moves +// the startIdx to 1. +// +// On the second call to Truncate(), we're using the range [e, i). We skip +// looking at the first request entirely (due to value of startIdx) and only +// need to look at all remaining requests (skipping the third one since it's fully +// processed). We truncate the requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 2nd and the 4th requests as fully processed. +// +// The second call prepares +// truncReqs = [Scan(h, i), Scan(g, i), Scan(e, h), Scan(e, f)], positions = [2, 3, 6, 5] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 -1 -1 -1 6 5 1 +// headers : [b, e) [d, e) [d, d') +// +// Then the optimized prev() function determines the seekKey as 'e' and sets +// the startIdx at 4 (meaning that all first four requests have been fully +// processed). +// +// On the third call to Truncate(), we're using the range [a, e). We only look +// at the 5th, 6th, and 7th requests (because of the value of startIdx). All +// requests are contained within the range, so we include them into the return +// value and mark all of them as processed. +// +// The third call prepares +// truncReqs = [Scan(b, e), Scan(d, e), Get(d)], positions = [6, 5, 1] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 -1 -1 -1 -1 -1 -1 +// headers : +// +// Then the optimized prev() function determines the seekKey as KeyMin and sets +// the startIdx at 7 (meaning that all requests have been fully processed), and +// we're done. +// +// NOTE: for all requests, headers always keep track of the unprocessed part of +// the request and is such that the ordering of the end keys in the headers is +// preserved when the requests are truncated. +// +// Note that this function is very similar to truncateAsc(), and we could +// extract out the differences into an interface; however, this leads to +// non-trivial slowdown and increase in allocations, so we choose to duplicate +// the code for performance. +func (h *BatchTruncationHelper) truncateDesc( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, error) { + var truncReqs []roachpb.RequestUnion + var positions []int + for i := h.startIdx; i < len(h.positions); i++ { + pos := h.positions[i] + if pos < 0 { + // This request has already been fully processed, so there is no + // need to look at it. + continue + } + header := h.headers[i] + // rs.Key can't be local because it contains range split points, which + // are never local. + sk := rs.Key.AsRawKey() + if sk.Compare(header.EndKey) >= 0 { + // All of the remaining requests end before this range, so we're + // done. + break + } + if !h.isRange[i] { + // This is a point request, and the key is contained within this + // range, so we include the request as is and mark it as "fully + // processed". + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + continue + } + // We're dealing with a range-spanning request. + if buildutil.CrdbTestBuild { + // rs.EndKey can't be local because it contains range split points, + // which are never local. + if header.EndKey.Compare(rs.EndKey.AsRawKey()) > 0 { + return nil, nil, errors.AssertionFailedf( + "unexpectedly header.EndKey %s is greater than rs %s", header.Key, rs, + ) + } + } + inner := h.requests[i].GetInner() + if header.Key.Compare(sk) >= 0 { + // This is the last part of this request since it is fully contained + // within this range, so we mark the request as "fully processed". + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + if origEndKey := inner.Header().EndKey; len(origEndKey) == 0 || origEndKey.Equal(header.EndKey) { + // This range-spanning request fits within a single range, so we + // can just use the original request. + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + continue + } + } else { + header.Key = sk + // Adjust the end key of the header so that it contained only the + // unprocessed prefix of the request. + h.headers[i].EndKey = header.Key + } + shallowCopy := inner.ShallowCopy() + shallowCopy.SetHeader(header) + truncReqs = append(truncReqs, roachpb.RequestUnion{}) + truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy) + positions = append(positions, pos) + } + return truncReqs, positions, nil +} + var emptyHeader = roachpb.RequestHeader{} // truncateLegacy restricts all requests to the given key range and returns new, @@ -138,7 +629,7 @@ func truncateLegacy( if !roachpb.IsRange(args) { // This is a point request. if len(header.EndKey) > 0 { - return false, false, emptyHeader, errors.Errorf("%T is not a range command, but EndKey is set", args) + return false, false, emptyHeader, errors.AssertionFailedf("%T is not a range command, but EndKey is set", args) } keyAddr, err := keys.Addr(header.Key) if err != nil { @@ -161,7 +652,7 @@ func truncateLegacy( } if l, r := keys.IsLocal(header.Key), keys.IsLocal(header.EndKey); l || r { if !l || !r { - return false, false, emptyHeader, errors.Errorf("local key mixed with global key in range") + return false, false, emptyHeader, errors.AssertionFailedf("local key mixed with global key in range") } local = true } @@ -224,6 +715,37 @@ func truncateLegacy( return truncReqs, positions, nil } +// prev returns the next seek key for the range iterator with the Descending +// scan direction. +// +// Informally, a call `prev(k)` means: we've already executed the parts of +// `reqs` that intersect `[k, KeyMax)`; please tell me how far to the left the +// next relevant request begins. +func (h *BatchTruncationHelper) prev(k roachpb.RKey) (roachpb.RKey, error) { + if h.foundLocalKey { + return prevLegacy(h.requests, k) + } + // Skip over first startIdx-1 requests since they have been fully processed. + for i, pos := range h.positions[h.startIdx:] { + if pos < 0 { + continue + } + // This is the first request that hasn't been fully processed, so we can + // bump the startIdx to this request's index and use the end key of the + // unprocessed part for the next seek key. + // + // By construction, all requests after this one will have their end key + // greater or equal to this request's end key, thus, there is no need to + // iterate any further. See the comment on truncateDesc() for more + // details. + h.startIdx += i + return keys.Addr(h.headers[h.startIdx].EndKey) + } + // If we got to this point, then all requests have been fully processed. + h.startIdx = len(h.requests) + return roachpb.RKeyMin, nil +} + // prevLegacy gives the right boundary of the union of all requests which don't // affect keys larger than the given key. Note that a right boundary is // exclusive, that is, the returned RKey is to be used as the exclusive right @@ -292,6 +814,36 @@ func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, erro return candidate, nil } +// next returns the next seek key for the range iterator. +// +// Informally, a call `next(k)` means: we've already executed the parts of +// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the +// next relevant request begins. +func (h *BatchTruncationHelper) next(k roachpb.RKey) (roachpb.RKey, error) { + if h.foundLocalKey { + return nextLegacy(h.requests, k) + } + // Skip over first startIdx-1 requests since they have been fully processed. + for i, pos := range h.positions[h.startIdx:] { + if pos < 0 { + continue + } + // This is the first request that hasn't been fully processed, so we can + // bump the startIdx to this request's index and use the start key of + // the unprocessed part for the next seek key. + // + // By construction, all requests after this one will have their start + // key greater or equal to this request's start key, thus, there is no + // need to iterate any further. See the comment on truncateAsc() for + // more details. + h.startIdx += i + return keys.Addr(h.headers[h.startIdx].Key) + } + // If we got to this point, then all requests have been fully processed. + h.startIdx = len(h.requests) + return roachpb.RKeyMax, nil +} + // nextLegacy gives the left boundary of the union of all requests which don't // affect keys less than the given key. Note that the left boundary is // inclusive, that is, the returned RKey is the inclusive left endpoint of the @@ -332,3 +884,84 @@ func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, erro } return candidate, nil } + +// orderRestorationHelper is a utility struct that allows to restore the order +// of requests according to the positions values in O(N) time where N is the +// number of the original requests (i.e. before truncation). Benchmarks have +// shown that it is faster that a solution with explicitly sorting the requests +// in O(T log T) time where T is the number of truncated requests returned by +// a single Truncate() call. +type orderRestorationHelper struct { + // scratch is reused on the next call to restoreOrder() if it has enough + // capacity. + scratch []roachpb.RequestUnion + // found is used as a map indicating whether a request for a particular + // positions value is included into the truncated requests and at what + // index, -1 if the corresponding request is not found in the truncated + // requests. + // + // found has the length of N where N is the number of the original requests. + // It is reused on all restoreOrder() calls. + found []int +} + +func (h *orderRestorationHelper) init(numOriginalRequests int) { + h.found = make([]int, numOriginalRequests) + for i := range h.found { + h.found[i] = -1 + } +} + +// restoreOrder reorders truncReqs in the ascending order of the corresponding +// positions values. +// +// Let's go through a quick example. Say we have five original requests and the +// following setup: +// +// truncReqs = [Scan(a, c), Get(b), Scan(c, d)], positions = [3, 0, 4] +// +// We first populate the found map: +// +// found = [1, -1, -1, 0, 2] +// +// meaning that requests at positions 0, 3, 4 are present in truncReqs. Then we +// iterate over the found map, and for all non-negative found values, we include +// the corresponding request: +// 1. found[0] = 1 -> toReturn = [Get(b)] positions = [0] +// 2. found[1] = -1 -> skip +// 3. found[2] = -1 -> skip +// 4. found[3] = 0 -> toReturn = [Get(b), Scan(a, c)] positions = [0, 3] +// 5. found[4] = 2 -> toReturn = [Get(b), Scan(a, c), Scan(c, d)] positions = [0, 3, 4] +func (h *orderRestorationHelper) restoreOrder( + truncReqs []roachpb.RequestUnion, positions []int, +) ([]roachpb.RequestUnion, []int) { + if len(truncReqs) == 0 { + return truncReqs, positions + } + for i, pos := range positions { + h.found[pos] = i + } + var toReturn []roachpb.RequestUnion + if cap(h.scratch) >= len(positions) { + toReturn = h.scratch[:0] + } else { + toReturn = make([]roachpb.RequestUnion, 0, len(positions)) + } + positions = positions[:0] + for pos, found := range h.found { + if found < 0 { + // The request with positions value 'pos' is not included in + // truncReqs. + continue + } + toReturn = append(toReturn, truncReqs[found]) + positions = append(positions, pos) + // Lose the reference to the request so that we can keep truncReqs as + // the scratch space for the next call. + truncReqs[found] = roachpb.RequestUnion{} + // Make sure that the found map is set up for the next call. + h.found[pos] = -1 + } + h.scratch = truncReqs + return toReturn, positions +} diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index a167567f8082..4227a8d0699b 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -209,9 +209,10 @@ func TestBatchPrevNext(t *testing.T) { ba.Add(args) } const mustPreserveOrder = false - ascHelper, err := MakeBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder) + const canReorderRequestsSlice = false + ascHelper, err := MakeBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) require.NoError(t, err) - descHelper, err := MakeBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder) + descHelper, err := MakeBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice) require.NoError(t, err) if _, _, next, err := ascHelper.Truncate( roachpb.RSpan{ @@ -366,104 +367,139 @@ func TestTruncate(t *testing.T) { // preserves the ordering. continue } - for i, test := range testCases { - goldenOriginal := roachpb.BatchRequest{} - for _, ks := range test.keys { - if len(ks[1]) > 0 { - goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), - }, - IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, - }) - } else { - goldenOriginal.Add(&roachpb.GetRequest{ - RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, - }) - } + for _, canReorderRequestsSlice := range []bool{false, true} { + if isLegacy && canReorderRequestsSlice { + // This config is meaningless because truncateLegacy() + // doesn't reorder the original requests slice. + continue } - original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} - for i, request := range goldenOriginal.Requests { - original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) - } + for i, test := range testCases { + goldenOriginal := roachpb.BatchRequest{} + for _, ks := range test.keys { + if len(ks[1]) > 0 { + goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), + }, + IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, + }) + } else { + goldenOriginal.Add(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, + }) + } + } - var truncationHelper BatchTruncationHelper - if !isLegacy { - var err error - truncationHelper, err = MakeBatchTruncationHelper(Ascending, original.Requests, mustPreserveOrder) + original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} + for i, request := range goldenOriginal.Requests { + original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) + } + + var truncationHelper BatchTruncationHelper + if !isLegacy { + var err error + truncationHelper, err = MakeBatchTruncationHelper( + Ascending, original.Requests, mustPreserveOrder, canReorderRequestsSlice, + ) + if err != nil { + t.Errorf("%d: Init failure: %v", i, err) + continue + } + // We need to truncate all requests up to the start of + // the test range since this is assumed by Truncate(). + truncateKey := roachpb.RKey(test.from) + if truncateKey.Less(roachpb.RKey(test.desc[0])) { + truncateKey = roachpb.RKey(test.desc[0]) + } + _, _, _, err = truncationHelper.Truncate( + roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, + ) + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } + } + desc := &roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), + } + if len(desc.StartKey) == 0 { + desc.StartKey = roachpb.RKey(test.from) + } + if len(desc.EndKey) == 0 { + desc.EndKey = roachpb.RKey(test.to) + } + rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} + rs, err := rs.Intersect(desc) if err != nil { - t.Errorf("%d: Init failure: %v", i, err) + t.Errorf("%d: intersection failure: %v", i, err) continue } - // We need to truncate all requests up to the start of the - // test range since this is assumed by Truncate(). - truncateKey := roachpb.RKey(test.from) - if truncateKey.Less(roachpb.RKey(test.desc[0])) { - truncateKey = roachpb.RKey(test.desc[0]) - } - _, _, _, err = truncationHelper.Truncate( - roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, - ) - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) + reqs, pos, err := truncateLegacy(original.Requests, rs) + if isLegacy { + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue } + } else { + reqs, pos, _, err = truncationHelper.Truncate(rs) + } + if err != nil { + t.Errorf("%d: truncation failure: %v", i, err) continue } - } - desc := &roachpb.RangeDescriptor{ - StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), - } - if len(desc.StartKey) == 0 { - desc.StartKey = roachpb.RKey(test.from) - } - if len(desc.EndKey) == 0 { - desc.EndKey = roachpb.RKey(test.to) - } - rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} - rs, err := rs.Intersect(desc) - if err != nil { - t.Errorf("%d: intersection failure: %v", i, err) - continue - } - reqs, pos, err := truncateLegacy(original.Requests, rs) - if isLegacy { - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) + if !isLegacy && !mustPreserveOrder { + // Truncate can return results in an arbitrary order, so + // we need to restore the order according to positions. + scratch := &requestsWithPositions{reqs: reqs, positions: pos} + sort.Sort(scratch) + } + var numReqs int + for j, arg := range reqs { + req := arg.GetInner() + if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { + t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, + h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) + } else if len(h.Key) != 0 { + numReqs++ } - continue } - } else { - reqs, pos, _, err = truncationHelper.Truncate(rs) - } - if err != nil { - t.Errorf("%d: truncation failure: %v", i, err) - continue - } - if !isLegacy && !mustPreserveOrder { - // Truncate can return results in an arbitrary order, so we - // need to restore the order according to positions. - scratch := &requestsWithPositions{reqs: reqs, positions: pos} - sort.Sort(scratch) - } - var numReqs int - for j, arg := range reqs { - req := arg.GetInner() - if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { - t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, - h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) - } else if len(h.Key) != 0 { - numReqs++ + if num := len(pos); numReqs != num { + t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) + } + if isLegacy || !canReorderRequestsSlice { + if !reflect.DeepEqual(original, goldenOriginal) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } + } else { + // Modifying the order of requests in a BatchRequest is + // ok, but we want to make sure that each request hasn't + // been modified "deeply", so we try different + // permutations of the original requests. + matched := make([]bool, len(original.Requests)) + var matchedCount int + for _, goldenReq := range goldenOriginal.Requests { + for j, origReq := range original.Requests { + if matched[j] { + continue + } + if reflect.DeepEqual(goldenReq, origReq) { + matched[j] = true + matchedCount++ + break + } + } + } + if matchedCount != len(matched) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } } - } - if num := len(pos); numReqs != num { - t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) - } - if !reflect.DeepEqual(original, goldenOriginal) { - t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", - i, goldenOriginal, original) } } } @@ -525,7 +561,10 @@ func TestTruncateLoop(t *testing.T) { for _, scanDir := range []ScanDirection{Ascending, Descending} { for _, mustPreserveOrder := range []bool{false, true} { t.Run(fmt.Sprintf("run=%d/%s/order=%t", numRuns, scanDir, mustPreserveOrder), func(t *testing.T) { - helper, err := MakeBatchTruncationHelper(scanDir, requests, mustPreserveOrder) + const canReorderRequestsSlice = false + helper, err := MakeBatchTruncationHelper( + scanDir, requests, mustPreserveOrder, canReorderRequestsSlice, + ) require.NoError(t, err) for i := 0; i < len(ranges); i++ { curRangeRS := ranges[i] @@ -658,7 +697,10 @@ func BenchmarkTruncateLoop(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - h, err := MakeBatchTruncationHelper(scanDir, reqs, mustPreserveOrder) + const canReorderRequestsSlice = false + h, err := MakeBatchTruncationHelper( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + ) require.NoError(b, err) for _, rs := range rangeSpans { _, _, _, err := h.Truncate(rs) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e34fa7cc7e8f..0e3d0d4e28f9 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1333,7 +1333,15 @@ func (ds *DistSender) divideAndSendBatchToRanges( // being in the original order, so the helper must preserve the order if the // batch is not a read-only. mustPreserveOrder := !ba.IsReadOnly() - truncationHelper, err := MakeBatchTruncationHelper(scanDir, ba.Requests, mustPreserveOrder) + // The DistSender relies on the order of ba.Requests not being changed when + // it sets the ResumeSpans on the incomplete requests, so we ask the helper + // to not modify the ba.Requests slice. + // TODO(yuzefovich): refactor the DistSender so that the truncation helper + // could reorder requests as it pleases. + const canReorderRequestsSlice = false + truncationHelper, err := MakeBatchTruncationHelper( + scanDir, ba.Requests, mustPreserveOrder, canReorderRequestsSlice, + ) if err != nil { return nil, roachpb.NewError(err) } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 3e1bc5c0f082..34e5f8cfb662 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -196,9 +196,9 @@ func checkResumeSpanScanResults( for i, res := range results { // Check that satisfied scans don't have resume spans. if _, satisfied := expSatisfied[i]; satisfied { - require.Nil(t, res.ResumeSpan, "satisfied scan %d (%s) has ResumeSpan: %v", + require.Nilf(t, res.ResumeSpan, "satisfied scan %d (%s) has ResumeSpan: %v", i, spans[i], res.ResumeSpan) - require.Zero(t, res.ResumeReason, "satisfied scan %d (%s) has ResumeReason: %v", + require.Zerof(t, res.ResumeReason, "satisfied scan %d (%s) has ResumeReason: %v", i, spans[i], res.ResumeReason) continue } @@ -207,36 +207,36 @@ func checkResumeSpanScanResults( // The resume span should be identical to the original request if no // results have been produced, or should continue after the last result // otherwise. - require.NotNil(t, res.ResumeSpan, "scan %d (%s): no resume span", i, spans[i]) - require.NotZero(t, res.ResumeReason, "scan %d (%s): no resume reason. resume span: %+v", + require.NotNilf(t, res.ResumeSpan, "scan %d (%s): no resume span", i, spans[i]) + require.NotZerof(t, res.ResumeReason, "scan %d (%s): no resume reason. resume span: %+v", i, spans[i], res.ResumeSpan) - require.Equal(t, expReason, res.ResumeReason, + require.Equalf(t, expReason, res.ResumeReason, "scan %d (%s): unexpected resume reason", i, spans[i]) if !reverse { if len(res.Rows) == 0 { - require.GreaterOrEqual(t, string(res.ResumeSpan.Key), spans[i][0], + require.GreaterOrEqualf(t, string(res.ResumeSpan.Key), spans[i][0], "scan %d (%s): expected resume span %s to be at or above scan start", i, spans[i], res.ResumeSpan) - require.Less(t, string(res.ResumeSpan.Key), spans[i][1], + require.Lessf(t, string(res.ResumeSpan.Key), spans[i][1], "scan %d (%s): expected resume span %s to be below scan end", i, spans[i], res.ResumeSpan) } else { - require.Greater(t, string(res.ResumeSpan.Key), expResults[i][len(res.Rows)-1], + require.Greaterf(t, string(res.ResumeSpan.Key), expResults[i][len(res.Rows)-1], "scan %d (%s): expected resume span %s to be above last result", i, spans[i], res.ResumeSpan) } - require.Equal(t, spans[i][1], string(res.ResumeSpan.EndKey), + require.Equalf(t, spans[i][1], string(res.ResumeSpan.EndKey), "scan %d (%s): expected resume span %s to have same end key", i, spans[i], res.ResumeSpan) } else { if len(res.Rows) == 0 { - require.Greater(t, string(res.ResumeSpan.EndKey), spans[i][0], + require.Greaterf(t, string(res.ResumeSpan.EndKey), spans[i][0], "scan %d (%s): expected resume span %s to be above scan start", i, spans[i], res.ResumeSpan) - require.LessOrEqual(t, string(res.ResumeSpan.EndKey), spans[i][1], + require.LessOrEqualf(t, string(res.ResumeSpan.EndKey), spans[i][1], "scan %d (%s): expected resume span %s to be at or below scan end", i, spans[i], res.ResumeSpan) } else { - require.Less(t, string(res.ResumeSpan.EndKey), expResults[i][len(res.Rows)-1], + require.Lessf(t, string(res.ResumeSpan.EndKey), expResults[i][len(res.Rows)-1], "scan %d (%s): expected resume span %s to be below last result", i, spans[i], res.ResumeSpan) } - require.Equal(t, spans[i][0], string(res.ResumeSpan.Key), + require.Equalf(t, spans[i][0], string(res.ResumeSpan.Key), "scan %d (%s): expected resume span %s to have same start key", i, spans[i], res.ResumeSpan) } } @@ -349,6 +349,11 @@ func TestMultiRangeBoundedBatchScan(t *testing.T) { {"f2"}, } var expResultsReverse [][]string + // reverseProcessOrder contains indices into expResultsReverse ordered with + // the descending direction on the first key in each slice (this is the + // order in which the DistSender will process the corresponding ReverseScan + // requests). + reverseProcessOrder := []int{2, 3, 1, 0} for _, res := range expResults { var rres []string for i := len(res) - 1; i >= 0; i-- { @@ -412,7 +417,8 @@ func TestMultiRangeBoundedBatchScan(t *testing.T) { // The split contains keys [lastK..firstK]. firstK := sort.SearchStrings(keys, splits[s]) - 1 lastK := sort.SearchStrings(keys, splits[s-1]) - for j, res := range expResultsReverse { + for _, j := range reverseProcessOrder { + res := expResultsReverse[j] for expIdx := len(res) - 1; expIdx >= 0; expIdx-- { expK := res[expIdx] for k := firstK; k >= lastK; k-- { @@ -559,22 +565,22 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { }{ { name: "unsorted, non-overlapping, neither satisfied", - bound: 6, + bound: 3, spans: [][]string{ {"b1", "d"}, {"a", "b1"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3"}, + {}, {"a1", "a2", "a3"}, }, }, { name: "unsorted, non-overlapping, first satisfied", - bound: 6, + bound: 9, spans: [][]string{ - {"b1", "c"}, {"a", "b1"}, + {"b1", "c"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3"}, + {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, expSatisfied: []int{0}, }, @@ -650,17 +656,17 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { {"b", "g"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1"}, + {"b1"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, }, { name: "unsorted, overlapping, first satisfied", - bound: 7, + bound: 9, spans: [][]string{ {"b", "c"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1"}, + {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, expSatisfied: []int{0}, }, @@ -693,7 +699,7 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { {"b", "g"}, {"c", "f"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {}, {"a1", "a2", "a3", "b1"}, + {"b1"}, {}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, }, { diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 9fc25d382b22..bd8f65be529b 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -499,9 +499,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re // TODO(yuzefovich): introduce a fast path when all requests are contained // within a single range. // The streamer can process the responses in an arbitrary order, so we don't - // require the helper to preserve the order of requests. + // require the helper to preserve the order of requests and allow it to + // reorder the reqs slice too. const mustPreserveOrder = false - truncationHelper, err := kvcoord.MakeBatchTruncationHelper(scanDir, reqs, mustPreserveOrder) + const canReorderRequestsSlice = true + truncationHelper, err := kvcoord.MakeBatchTruncationHelper( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + ) if err != nil { return err }