From d90e2041099865f8df7a4fd24276a5ce91a27364 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 13 Jun 2022 19:19:11 -0700 Subject: [PATCH] kvcoord: introduce batch truncation helper This commit introduces a batch truncation helper that encompasses logic of truncating requests to the boundaries of a single range as well as returning the next key to seek the range iterator to. The helper is now used both in the DistSender as well as in the Streamer. No modification to the actual logic of `Truncate` nor `Next`/`prev` functions has been done other than incorporating the return of the next seek key into `Truncate` function itself. This is needed since the following commit will tightly couple the truncation process with the next seek key determination in order to optimize it. The helper can be configured with a knob indicating whether `Truncate` needs to return requests in the original order. This behavior is necessary by the BatchRequests that contain writes since in several spots we rely on the ordering assumptions (e.g. of increasing values of `Sequence`). The following adjustments were made to the tests: - `BenchmarkTruncate` has been renamed to `BenchmarkTruncateLegacy` - `TestTruncate` has been refactored to exercise the new and the old code-paths - `TestBatchPrevNext` has been refactored to run through the new code path, also a few test cases have been adjusted slightly. This commit also introduces some unit tests for the new code path when it runs in a loop over multiple ranges as well as a corresponding benchmark. Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 - pkg/kv/kvclient/kvcoord/batch.go | 176 +++++-- pkg/kv/kvclient/kvcoord/batch_test.go | 574 ++++++++++++++++++++++- pkg/kv/kvclient/kvcoord/dist_sender.go | 57 +-- pkg/kv/kvclient/kvcoord/truncate_test.go | 242 ---------- pkg/kv/kvclient/kvstreamer/streamer.go | 32 +- 6 files changed, 740 insertions(+), 342 deletions(-) delete mode 100644 pkg/kv/kvclient/kvcoord/truncate_test.go diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 57974cb4d0d8..8ed46a6818ab 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -120,7 +120,6 @@ go_test( "send_test.go", "split_test.go", "transport_test.go", - "truncate_test.go", "txn_coord_sender_savepoints_test.go", "txn_coord_sender_server_test.go", "txn_coord_sender_test.go", diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index 880eb57bfa4a..75c03f592f9a 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -16,19 +16,119 @@ import ( "github.com/cockroachdb/errors" ) -var emptyHeader = roachpb.RequestHeader{} +// BatchTruncationHelper is a utility struct that helps with truncating requests +// to range boundaries as well as figuring out the next key to seek to for the +// range iterator. +// +// The caller should not use the helper if all requests fit within a single +// range since the helper has non-trivial setup cost. +// +// It is designed to be used roughly as follows: +// +// rs := keys.Range(requests) +// ri.Seek(scanDir, rs.Key) +// if !ri.NeedAnother(rs) { +// // All requests fit within a single range, don't use the helper. +// ... +// } +// helper.Init(scanDir, requests) +// for ri.Valid() { +// curRangeRS := rs.Intersect(ri.Token().Desc()) +// curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS) +// // Process curRangeReqs that touch a single range and then use positions +// // to reassemble the result. +// ... +// ri.Seek(scanDir, seekKey) +// } +// +type BatchTruncationHelper struct { + scanDir ScanDirection + requests []roachpb.RequestUnion + // mustPreserveOrder indicates whether the requests must be returned by + // Truncate() in the original order. + mustPreserveOrder bool +} + +// Init initializes the helper for the given requests. +// +// mustPreserveOrder, if true, indicates that the caller requires that requests +// are returned by Truncate() in the original order (i.e. with strictly +// increasing positions values). +func (h *BatchTruncationHelper) Init( + scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool, +) error { + h.scanDir = scanDir + h.requests = requests + h.mustPreserveOrder = mustPreserveOrder + return nil +} // Truncate restricts all requests to the given key range and returns new, // truncated, requests. All returned requests are "truncated" to the given span, // and requests which are found to not overlap the given span at all are +// removed. A mapping of response index to request index is returned. It also +// returns the next seek key for the range iterator. With Ascending scan +// direction, the next seek key is such that requests in [RKeyMin, seekKey) +// range have been processed, with Descending scan direction, it is such that +// requests in [seekKey, RKeyMax) range have been processed. +// +// For example, if +// +// reqs = Put[a], Put[c], Put[b], +// rs = [a,bb], +// BatchTruncationHelper.Init(Ascending, reqs) +// +// then BatchTruncationHelper.Truncate(rs) returns (Put[a], Put[b]), positions +// [0,2] as well as seekKey 'c'. +// +// Truncate returns the requests in an arbitrary order (meaning that positions +// return values might not be ascending), unless mustPreserveOrder was true in +// Init(). +// +// NOTE: it is assumed that +// 1. Truncate has been called on the previous ranges that intersect with +// keys.Range(reqs); +// 2. rs is intersected with the current range boundaries. +func (h *BatchTruncationHelper) Truncate( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) { + truncReqs, positions, err := truncateLegacy(h.requests, rs) + if err != nil { + return nil, nil, nil, err + } + var seekKey roachpb.RKey + if h.scanDir == Ascending { + // In next iteration, query next range. + // It's important that we use the EndKey of the current descriptor + // as opposed to the StartKey of the next one: if the former is stale, + // it's possible that the next range has since merged the subsequent + // one, and unless both descriptors are stale, the next descriptor's + // StartKey would move us to the beginning of the current range, + // resulting in a duplicate scan. + seekKey, err = nextLegacy(h.requests, rs.EndKey) + } else { + // In next iteration, query previous range. + // We use the StartKey of the current descriptor as opposed to the + // EndKey of the previous one since that doesn't have bugs when + // stale descriptors come into play. + seekKey, err = prevLegacy(h.requests, rs.Key) + } + return truncReqs, positions, seekKey, err +} + +var emptyHeader = roachpb.RequestHeader{} + +// truncateLegacy restricts all requests to the given key range and returns new, +// truncated, requests. All returned requests are "truncated" to the given span, +// and requests which are found to not overlap the given span at all are // removed. A mapping of response index to request index is returned. For // example, if // // reqs = Put[a], Put[c], Put[b], // rs = [a,bb], // -// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. -func Truncate( +// then truncateLegacy(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. +func truncateLegacy( reqs []roachpb.RequestUnion, rs roachpb.RSpan, ) ([]roachpb.RequestUnion, []int, error) { truncateOne := func(args roachpb.Request) (hasRequest bool, changed bool, _ roachpb.RequestHeader, _ error) { @@ -64,26 +164,26 @@ func Truncate( local = true } if keyAddr.Less(rs.Key) { - // rs.Key can't be local because it contains range split points, which - // are never local. + // rs.Key can't be local because it contains range split points, + // which are never local. changed = true if !local { header.Key = rs.Key.AsRawKey() } else { - // The local start key should be truncated to the boundary of local keys which - // address to rs.Key. + // The local start key should be truncated to the boundary of + // local keys which address to rs.Key. header.Key = keys.MakeRangeKeyPrefix(rs.Key) } } if !endKeyAddr.Less(rs.EndKey) { - // rs.EndKey can't be local because it contains range split points, which - // are never local. + // rs.EndKey can't be local because it contains range split points, + // which are never local. changed = true if !local { header.EndKey = rs.EndKey.AsRawKey() } else { - // The local end key should be truncated to the boundary of local keys which - // address to rs.EndKey. + // The local end key should be truncated to the boundary of + // local keys which address to rs.EndKey. header.EndKey = keys.MakeRangeKeyPrefix(rs.EndKey) } } @@ -122,18 +222,15 @@ func Truncate( return truncReqs, positions, nil } -// prev gives the right boundary of the union of all requests which don't +// prevLegacy gives the right boundary of the union of all requests which don't // affect keys larger than the given key. Note that a right boundary is -// exclusive, that is, the returned RKey is to be used as the exclusive -// right endpoint in finding the next range to query. +// exclusive, that is, the returned RKey is to be used as the exclusive right +// endpoint in finding the next range to query. // -// Informally, a call `prev(reqs, k)` means: we've already executed the parts -// of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the +// Informally, a call `prevLegacy(reqs, k)` means: we've already executed the +// parts of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the // left the next relevant request begins. -// -// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull -// 'keys' into 'roachpb'. -func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { +func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { candidate := roachpb.RKeyMin for _, union := range reqs { inner := union.GetInner() @@ -144,16 +241,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { } endKey := h.EndKey if len(endKey) == 0 { - // If we have a point request for `x < k` then that request has not been - // satisfied (since the batch has only been executed for keys `>=k`). We - // treat `x` as `[x, x.Next())` which does the right thing below. This - // also works when `x > k` or `x=k` as the logic below will skip `x`. + // If we have a point request for `x < k` then that request has not + // been satisfied (since the batch has only been executed for keys + // `>=k`). We treat `x` as `[x, x.Next())` which does the right + // thing below. This also works when `x > k` or `x=k` as the logic + // below will skip `x`. // // Note that if the key is /Local/x/something, then instead of using - // /Local/x/something.Next() as the end key, we rely on AddrUpperBound to - // handle local keys. In particular, AddrUpperBound will turn it into - // `x\x00`, so we're looking at the key-range `[x, x.Next())`. This is - // exactly what we want as the local key is contained in that range. + // /Local/x/something.Next() as the end key, we rely on + // AddrUpperBound to handle local keys. In particular, + // AddrUpperBound will turn it into `x\x00`, so we're looking at the + // key-range `[x, x.Next())`. This is exactly what we want as the + // local key is contained in that range. // // See TestBatchPrevNext for test cases with commentary. endKey = h.Key.Next() @@ -191,18 +290,15 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { return candidate, nil } -// Next gives the left boundary of the union of all requests which don't affect -// keys less than the given key. Note that the left boundary is inclusive, that -// is, the returned RKey is the inclusive left endpoint of the keys the request -// should operate on next. -// -// Informally, a call `Next(reqs, k)` means: we've already executed the parts of -// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the -// next relevant request begins. -// -// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull -// 'keys' into 'proto'. -func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { +// nextLegacy gives the left boundary of the union of all requests which don't +// affect keys less than the given key. Note that the left boundary is +// inclusive, that is, the returned RKey is the inclusive left endpoint of the +// keys the request should operate on next. +// +// Informally, a call `nextLegacy(reqs, k)` means: we've already executed the +// parts of `reqs` that intersect `[KeyMin, k)`; please tell me how far to the +// right the next relevant request begins. +func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { candidate := roachpb.RKeyMax for _, union := range reqs { inner := union.GetInner() diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index 2fae3611dcaf..99b31d0e8578 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -12,15 +12,24 @@ package kvcoord import ( "bytes" + "fmt" + "reflect" + "sort" "testing" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" ) -// TestBatchPrevNext tests prev() and next() +// TestBatchPrevNext tests the seeking behavior of the +// BatchTruncationHelper.Truncate. func TestBatchPrevNext(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -147,23 +156,27 @@ func TestBatchPrevNext(t *testing.T) { expBW: "c\x00", }, // Explanations below are an exercise for the reader, but it's very similar to above. - {spans: span(loc("b"), ""), key: "a", + {spans: span(loc("b"), loc("c")), key: "a", expFW: "b", expBW: min, }, - {spans: span(loc("b"), ""), key: "b", + {spans: span(loc("b"), loc("c")), key: "b", expFW: "b", expBW: min, }, - {spans: span(loc("b"), ""), key: "b\x00", + // Note that such a span results in an invalid ScanRequest, but it's ok + // for the purposes of this test. + {spans: span(loc("b"), loc("b")), key: "b\x00", expFW: max, // Handled `key >= b\x00`, so next we'll have to chip away at `[KeyMin, b\x00)`. Note // how this doesn't return `b` which would be incorrect as `[KeyMin, b)` does not // contain `loc(b)`. expBW: "b\x00", }, + // Note that such a span results in an invalid ScanRequest, but it's ok + // for the purposes of this test. { - spans: span(loc("a"), "", loc("b"), ""), key: "b", + spans: span(loc("a"), loc("a"), loc("b"), loc("b")), key: "b", // We've dealt with any key that addresses to `< b`, and `loc(b)` is not // covered by it. `loc(b)` lives between `b` and `b\x00`, so we start at // `b`. @@ -193,12 +206,24 @@ func TestBatchPrevNext(t *testing.T) { args.Key, args.EndKey = span.Key, span.EndKey ba.Add(args) } - if next, err := Next(ba.Requests, roachpb.RKey(test.key)); err != nil { + var ascHelper, descHelper BatchTruncationHelper + const mustPreserveOrder = false + require.NoError(t, ascHelper.Init(Ascending, ba.Requests, mustPreserveOrder)) + require.NoError(t, descHelper.Init(Descending, ba.Requests, mustPreserveOrder)) + if _, _, next, err := ascHelper.Truncate( + roachpb.RSpan{ + Key: roachpb.RKeyMin, + EndKey: roachpb.RKey(test.key)}, + ); err != nil { t.Error(err) } else if !bytes.Equal(next, roachpb.Key(test.expFW)) { t.Errorf("next: expected %q, got %q", test.expFW, next) } - if prev, err := prev(ba.Requests, roachpb.RKey(test.key)); err != nil { + if _, _, prev, err := descHelper.Truncate( + roachpb.RSpan{ + Key: roachpb.RKey(test.key), + EndKey: roachpb.RKeyMax}, + ); err != nil { t.Error(err) } else if !bytes.Equal(prev, roachpb.Key(test.expBW)) { t.Errorf("prev: expected %q, got %q", test.expBW, prev) @@ -206,3 +231,538 @@ func TestBatchPrevNext(t *testing.T) { }) } } + +type requestsWithPositions struct { + reqs []roachpb.RequestUnion + positions []int +} + +var _ sort.Interface = &requestsWithPositions{} + +func (r *requestsWithPositions) Len() int { + return len(r.positions) +} + +func (r *requestsWithPositions) Less(i, j int) bool { + return r.positions[i] < r.positions[j] +} + +func (r *requestsWithPositions) Swap(i, j int) { + r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i] + r.positions[i], r.positions[j] = r.positions[j], r.positions[i] +} + +// TestTruncate verifies the truncation logic of the BatchTruncationHelper over +// a single range as well as truncateLegacy() function directly. +func TestTruncate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + loc := func(s string) string { + return string(keys.RangeDescriptorKey(roachpb.RKey(s))) + } + locPrefix := func(s string) string { + return string(keys.MakeRangeKeyPrefix(roachpb.RKey(s))) + } + testCases := []struct { + keys [][2]string + expKeys [][2]string + from, to string + desc [2]string // optional, defaults to {from,to} + err string + }{ + { + // Keys inside of active range. + keys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, + expKeys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, + from: "a", to: "q\x00", + }, + { + // Keys outside of active range. + keys: [][2]string{{"a"}, {"a", "b"}, {"q"}, {"q", "z"}}, + expKeys: [][2]string{{}, {}, {}, {}}, + from: "b", to: "q", + }, + { + // Range-local keys inside of active range. + keys: [][2]string{{loc("b")}, {loc("c")}}, + expKeys: [][2]string{{loc("b")}, {loc("c")}}, + from: "b", to: "e", + }, + { + // Range-local key outside of active range. + keys: [][2]string{{loc("a")}}, + expKeys: [][2]string{{}}, + from: "b", to: "e", + }, + { + // Range-local range contained in active range. + keys: [][2]string{{loc("b"), loc("e") + "\x00"}}, + expKeys: [][2]string{{loc("b"), loc("e") + "\x00"}}, + from: "b", to: "e\x00", + }, + { + // Range-local range not contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{}}, + from: "c", to: "e", + }, + { + // Range-local range not contained in active range. + keys: [][2]string{{loc("a"), locPrefix("b")}, {loc("e"), loc("f")}}, + expKeys: [][2]string{{}, {}}, + from: "b", to: "e", + }, + { + // Range-local range partially contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{loc("a"), locPrefix("b")}}, + from: "a", to: "b", + }, + { + // Range-local range partially contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{locPrefix("b"), loc("b")}}, + from: "b", to: "e", + }, + { + // Range-local range contained in active range. + keys: [][2]string{{locPrefix("b"), loc("b")}}, + expKeys: [][2]string{{locPrefix("b"), loc("b")}}, + from: "b", to: "c", + }, + { + // Mixed range-local vs global key range. + keys: [][2]string{{loc("c"), "d\x00"}}, + from: "b", to: "e", + err: "local key mixed with global key", + }, + { + // Key range touching and intersecting active range. + keys: [][2]string{{"a", "b"}, {"a", "c"}, {"p", "q"}, {"p", "r"}, {"a", "z"}}, + expKeys: [][2]string{{"b", "c"}, {"p", "q"}, {"p", "q"}, {"b", "q"}}, + from: "b", to: "q", + }, + // Active key range is intersection of descriptor and [from,to). + { + keys: [][2]string{{"c", "q"}}, + expKeys: [][2]string{{"d", "p"}}, + from: "a", to: "z", + desc: [2]string{"d", "p"}, + }, + { + keys: [][2]string{{"c", "q"}}, + expKeys: [][2]string{{"d", "p"}}, + from: "d", to: "p", + desc: [2]string{"a", "z"}, + }, + } + + for _, isLegacy := range []bool{false, true} { + for _, mustPreserveOrder := range []bool{false, true} { + if isLegacy && mustPreserveOrder { + // This config is meaningless because truncateLegacy() always + // preserves the ordering. + continue + } + for i, test := range testCases { + goldenOriginal := roachpb.BatchRequest{} + for _, ks := range test.keys { + if len(ks[1]) > 0 { + goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), + }, + IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, + }) + } else { + goldenOriginal.Add(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, + }) + } + } + + original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} + for i, request := range goldenOriginal.Requests { + original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) + } + + var truncationHelper BatchTruncationHelper + if !isLegacy { + if err := truncationHelper.Init(Ascending, original.Requests, mustPreserveOrder); err != nil { + t.Errorf("%d: Init failure: %v", i, err) + continue + } + // We need to truncate all requests up to the start of the + // test range since this is assumed by Truncate(). + truncateKey := roachpb.RKey(test.from) + if truncateKey.Less(roachpb.RKey(test.desc[0])) { + truncateKey = roachpb.RKey(test.desc[0]) + } + _, _, _, err := truncationHelper.Truncate( + roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, + ) + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } + } + desc := &roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), + } + if len(desc.StartKey) == 0 { + desc.StartKey = roachpb.RKey(test.from) + } + if len(desc.EndKey) == 0 { + desc.EndKey = roachpb.RKey(test.to) + } + rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} + rs, err := rs.Intersect(desc) + if err != nil { + t.Errorf("%d: intersection failure: %v", i, err) + continue + } + reqs, pos, err := truncateLegacy(original.Requests, rs) + if isLegacy { + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } + } else { + reqs, pos, _, err = truncationHelper.Truncate(rs) + } + if err != nil { + t.Errorf("%d: truncation failure: %v", i, err) + continue + } + if !isLegacy && !mustPreserveOrder { + // Truncate can return results in an arbitrary order, so we + // need to restore the order according to positions. + scratch := &requestsWithPositions{reqs: reqs, positions: pos} + sort.Sort(scratch) + } + var numReqs int + for j, arg := range reqs { + req := arg.GetInner() + if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { + t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, + h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) + } else if len(h.Key) != 0 { + numReqs++ + } + } + if num := len(pos); numReqs != num { + t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) + } + if !reflect.DeepEqual(original, goldenOriginal) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } + } + } + } +} + +// TestTruncateLoop verifies that using the BatchTruncationHelper in a loop over +// multiple ranges works as expected. +func TestTruncateLoop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + makeRangeDesc := func(boundaries [2]string) roachpb.RangeDescriptor { + return roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(boundaries[0]), EndKey: roachpb.RKey(boundaries[1]), + } + } + makeGetRequest := func(key string) roachpb.RequestUnion { + var req roachpb.RequestUnion + req.MustSetInner(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(key)}, + }) + return req + } + makeScanRequest := func(start, end string) roachpb.RequestUnion { + var req roachpb.RequestUnion + req.MustSetInner(&roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(start), EndKey: roachpb.Key(end), + }, + }) + return req + } + + type iterationCase struct { + rangeBoundaries [2]string + // When both strings are non-empty, then it represents a ScanRequest, if + // the second string is empty, then it is a GetRequest. + requests [][2]string + positions []int + ascSeekKey roachpb.RKey + descSeekKey roachpb.RKey + } + getRequests := func(reqs [][2]string) []roachpb.RequestUnion { + var requests []roachpb.RequestUnion + for _, req := range reqs { + if req[1] == "" { + requests = append(requests, makeGetRequest(req[0])) + } else { + requests = append(requests, makeScanRequest(req[0], req[1])) + } + } + return requests + } + assertExpected := func(tc iterationCase, reqs []roachpb.RequestUnion, positions []int) { + // Since requests can be truncated in an arbitrary order, we will sort + // them according to their positions. + scratch := requestsWithPositions{reqs: reqs, positions: positions} + sort.Sort(&scratch) + require.True(t, reflect.DeepEqual(getRequests(tc.requests), reqs)) + require.True(t, reflect.DeepEqual(tc.positions, positions)) + } + + for _, scanDir := range []ScanDirection{Ascending, Descending} { + for _, testCase := range []struct { + // When both strings are non-empty, then it represents a + // ScanRequest, if the second string is empty, then it is a + // GetRequest. + requests [][2]string + iteration []iterationCase + }{ + // A test case with three ranges and requests touching each of those + // ranges. + { + requests: [][2]string{ + {"i", "l"}, {"d"}, {"h", "k"}, {"g", "i"}, {"i"}, {"d", "f"}, {"b", "j"}, + }, + iteration: []iterationCase{ + { + rangeBoundaries: [2]string{"a", "e"}, + requests: [][2]string{{"d"}, {"d", "e"}, {"b", "e"}}, + positions: []int{1, 5, 6}, + ascSeekKey: roachpb.RKey("e"), + descSeekKey: roachpb.RKeyMin, + }, + { + rangeBoundaries: [2]string{"e", "i"}, + requests: [][2]string{{"h", "i"}, {"g", "i"}, {"e", "f"}, {"e", "i"}}, + positions: []int{2, 3, 5, 6}, + ascSeekKey: roachpb.RKey("i"), + descSeekKey: roachpb.RKey("e"), + }, + { + rangeBoundaries: [2]string{"i", "m"}, + requests: [][2]string{{"i", "l"}, {"i", "k"}, {"i"}, {"i", "j"}}, + positions: []int{0, 2, 4, 6}, + ascSeekKey: roachpb.RKeyMax, + descSeekKey: roachpb.RKey("i"), + }, + }, + }, + // A test case where each request fits within a single range, and + // there is a "gap" range that doesn't touch any of the requests. + { + requests: [][2]string{ + {"k", "l"}, {"d"}, {"j", "k"}, {"c", "d"}, {"k"}, {"a", "b"}, + }, + iteration: []iterationCase{ + { + rangeBoundaries: [2]string{"a", "e"}, + requests: [][2]string{{"d"}, {"c", "d"}, {"a", "b"}}, + positions: []int{1, 3, 5}, + ascSeekKey: roachpb.RKey("j"), + descSeekKey: roachpb.RKeyMin, + }, + { + rangeBoundaries: [2]string{"e", "i"}, + requests: nil, + positions: nil, + ascSeekKey: roachpb.RKey("j"), + descSeekKey: roachpb.RKey("d").Next(), + }, + { + rangeBoundaries: [2]string{"i", "m"}, + requests: [][2]string{{"k", "l"}, {"j", "k"}, {"k"}}, + positions: []int{0, 2, 4}, + ascSeekKey: roachpb.RKeyMax, + descSeekKey: roachpb.RKey("d").Next(), + }, + }, + }, + } { + requests := getRequests(testCase.requests) + rs, err := keys.Range(requests) + require.NoError(t, err) + var helper BatchTruncationHelper + const mustPreserveOrder = false + require.NoError(t, helper.Init(scanDir, requests, mustPreserveOrder)) + for i := 0; i < len(testCase.iteration); i++ { + tc := testCase.iteration[i] + if scanDir == Descending { + // Test cases are written assuming the Ascending scan + // direction, so we have to iterate in reverse here. + tc = testCase.iteration[len(testCase.iteration)-1-i] + } + desc := makeRangeDesc(tc.rangeBoundaries) + curRangeRS, err := rs.Intersect(&desc) + require.NoError(t, err) + reqs, positions, seekKey, err := helper.Truncate(curRangeRS) + require.NoError(t, err) + assertExpected(tc, reqs, positions) + expectedSeekKey := tc.ascSeekKey + if scanDir == Descending { + expectedSeekKey = tc.descSeekKey + } + require.Equal(t, expectedSeekKey, seekKey) + } + } + } +} + +func BenchmarkTruncateLoop(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + rng, _ := randutil.NewTestRand() + randomKey := func() []byte { + const keyLength = 8 + res := make([]byte, keyLength) + for i := range res { + // Plus two is needed to skip local key space. + res[i] = byte(2 + rng.Intn(253)) + } + return res + } + for _, scanDir := range []ScanDirection{Ascending, Descending} { + for _, mustPreserveOrder := range []bool{false, true} { + for _, numRequests := range []int{128, 16384} { + for _, numRanges := range []int{4, 64} { + // We'll split the whole key space into numRanges ranges + // using random numRanges-1 split points. + splitPoints := make([][]byte, numRanges-1) + for i := range splitPoints { + splitPoints[i] = randomKey() + } + // Sort all the split points so that we can treat them as + // boundaries of consecutive ranges. + for i := range splitPoints { + for j := i + 1; j < len(splitPoints); j++ { + if bytes.Compare(splitPoints[i], splitPoints[j]) > 0 { + splitPoints[i], splitPoints[j] = splitPoints[j], splitPoints[i] + } + } + } + rangeSpans := make([]roachpb.RSpan, numRanges) + rangeSpans[0].Key = roachpb.RKeyMin + rangeSpans[numRanges-1].EndKey = roachpb.RKeyMax + for i := range splitPoints { + rangeSpans[i].EndKey = splitPoints[i] + rangeSpans[i+1].Key = splitPoints[i] + } + scanDirStr := "asc" + if scanDir == Descending { + scanDirStr = "desc" + // Reverse all the range spans for the Descending scan + // direction. + for i, j := 0, numRanges-1; i < j; i, j = i+1, j-1 { + rangeSpans[i], rangeSpans[j] = rangeSpans[j], rangeSpans[i] + } + } + var orderStr string + if mustPreserveOrder { + orderStr = "/preserveOrder" + } + for _, requestType := range []string{"get", "scan"} { + b.Run(fmt.Sprintf( + "%s%s/reqs=%d/ranges=%d/type=%s", + scanDirStr, orderStr, numRequests, numRanges, requestType, + ), func(b *testing.B) { + reqs := make([]roachpb.RequestUnion, numRequests) + switch requestType { + case "get": + for i := 0; i < numRequests; i++ { + var get roachpb.GetRequest + get.Key = randomKey() + reqs[i].MustSetInner(&get) + } + case "scan": + for i := 0; i < numRequests; i++ { + var scan roachpb.ScanRequest + startKey := randomKey() + endKey := randomKey() + for bytes.Equal(startKey, endKey) { + endKey = randomKey() + } + if bytes.Compare(startKey, endKey) > 0 { + startKey, endKey = endKey, startKey + } + scan.Key = startKey + scan.EndKey = endKey + reqs[i].MustSetInner(&scan) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + var h BatchTruncationHelper + require.NoError(b, h.Init(scanDir, reqs, mustPreserveOrder)) + for _, rs := range rangeSpans { + _, _, _, err := h.Truncate(rs) + require.NoError(b, err) + } + } + }) + } + } + } + } + } +} + +func BenchmarkTruncateLegacy(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + rng, _ := randutil.NewTestRand() + // For simplicity, we'll work with single-byte keys, so we divide up the + // single-byte key space into three parts, and we'll be truncating the + // requests according to the middle part. + rs := roachpb.RSpan{Key: roachpb.RKey([]byte{85}), EndKey: roachpb.RKey([]byte{170})} + randomKeyByte := func() byte { + // Plus two is needed to skip local key space. + return byte(2 + rng.Intn(253)) + } + for _, numRequests := range []int{1 << 5, 1 << 10, 1 << 15} { + for _, requestType := range []string{"get", "scan"} { + b.Run(fmt.Sprintf("reqs=%d/type=%s", numRequests, requestType), func(b *testing.B) { + reqs := make([]roachpb.RequestUnion, numRequests) + switch requestType { + case "get": + for i := 0; i < numRequests; i++ { + var get roachpb.GetRequest + get.Key = []byte{randomKeyByte()} + reqs[i].MustSetInner(&get) + } + case "scan": + for i := 0; i < numRequests; i++ { + var scan roachpb.ScanRequest + startKey := randomKeyByte() + endKey := randomKeyByte() + if endKey < startKey { + startKey, endKey = endKey, startKey + } + scan.Key = []byte{startKey} + scan.EndKey = []byte{endKey} + reqs[i].MustSetInner(&scan) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := truncateLegacy(reqs, rs) + require.NoError(b, err) + } + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e5f22213e6a0..8ef43a0df625 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1321,6 +1321,15 @@ func (ds *DistSender) divideAndSendBatchToRanges( canParallelize = canParallelize && !isExpensive } + var truncationHelper BatchTruncationHelper + // In several places that handle writes (kvserver.maybeStripInFlightWrites, + // storage.replayTransactionalWrite, possibly others) we rely on requests + // being in the original order, so the helper must preserve the order if the + // batch is not a read-only. + mustPreserveOrder := !ba.IsReadOnly() + if err := truncationHelper.Init(scanDir, ba.Requests, mustPreserveOrder); err != nil { + return nil, roachpb.NewError(err) + } // Iterate over the ranges that the batch touches. The iteration is done in // key order - the order of requests in the batch is not relevant for the // iteration. Each iteration sends for evaluation one sub-batch to one range. @@ -1340,33 +1349,6 @@ func (ds *DistSender) divideAndSendBatchToRanges( responseCh := make(chan response, 1) responseChs = append(responseChs, responseCh) - // Determine next seek key, taking a potentially sparse batch into - // consideration. - var err error - nextRS := rs - if scanDir == Descending { - // In next iteration, query previous range. - // We use the StartKey of the current descriptor as opposed to the - // EndKey of the previous one since that doesn't have bugs when - // stale descriptors come into play. - seekKey, err = prev(ba.Requests, ri.Desc().StartKey) - nextRS.EndKey = seekKey - } else { - // In next iteration, query next range. - // It's important that we use the EndKey of the current descriptor - // as opposed to the StartKey of the next one: if the former is stale, - // it's possible that the next range has since merged the subsequent - // one, and unless both descriptors are stale, the next descriptor's - // StartKey would move us to the beginning of the current range, - // resulting in a duplicate scan. - seekKey, err = Next(ba.Requests, ri.Desc().EndKey) - nextRS.Key = seekKey - } - if err != nil { - responseCh <- response{pErr: roachpb.NewError(err)} - return - } - // Truncate the request to range descriptor. curRangeRS, err := rs.Intersect(ri.Token().Desc()) if err != nil { @@ -1375,7 +1357,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( } curRangeBatch := ba var positions []int - curRangeBatch.Requests, positions, err = Truncate(ba.Requests, curRangeRS) + curRangeBatch.Requests, positions, seekKey, err = truncationHelper.Truncate(curRangeRS) if len(positions) == 0 && err == nil { // This shouldn't happen in the wild, but some tests exercise it. err = errors.Newf("truncation resulted in empty batch on %s: %s", rs, ba) @@ -1384,6 +1366,12 @@ func (ds *DistSender) divideAndSendBatchToRanges( responseCh <- response{pErr: roachpb.NewError(err)} return } + nextRS := rs + if scanDir == Ascending { + nextRS.Key = seekKey + } else { + nextRS.EndKey = seekKey + } lastRange := !ri.NeedAnother(rs) // Send the next partial batch to the first range in the "rs" span. @@ -1451,13 +1439,12 @@ func (ds *DistSender) divideAndSendBatchToRanges( } } - // The iteration is complete if the iterator's current range - // encompasses the remaining span, OR if the next span has - // inverted. This can happen if this method is invoked - // re-entrantly due to ranges being split or merged. In that case - // the batch request has all the original requests but the span is - // a sub-span of the original, causing next() and prev() methods - // to potentially return values which invert the span. + // The iteration is complete if the iterator's current range encompasses + // the remaining span, OR if the next span has inverted. This can happen + // if this method is invoked re-entrantly due to ranges being split or + // merged. In that case the batch request has all the original requests + // but the span is a sub-span of the original, causing Truncate() to + // potentially return the next seek key which inverts the span. if lastRange || !nextRS.Key.Less(nextRS.EndKey) { return } diff --git a/pkg/kv/kvclient/kvcoord/truncate_test.go b/pkg/kv/kvclient/kvcoord/truncate_test.go deleted file mode 100644 index 3be378cfda4e..000000000000 --- a/pkg/kv/kvclient/kvcoord/truncate_test.go +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright 2015 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvcoord - -import ( - "bytes" - "fmt" - "reflect" - "testing" - - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/stretchr/testify/require" -) - -func TestTruncate(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - loc := func(s string) string { - return string(keys.RangeDescriptorKey(roachpb.RKey(s))) - } - locPrefix := func(s string) string { - return string(keys.MakeRangeKeyPrefix(roachpb.RKey(s))) - } - testCases := []struct { - keys [][2]string - expKeys [][2]string - from, to string - desc [2]string // optional, defaults to {from,to} - err string - }{ - { - // Keys inside of active range. - keys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, - expKeys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, - from: "a", to: "q\x00", - }, - { - // Keys outside of active range. - keys: [][2]string{{"a"}, {"a", "b"}, {"q"}, {"q", "z"}}, - expKeys: [][2]string{{}, {}, {}, {}}, - from: "b", to: "q", - }, - { - // Range-local keys inside of active range. - keys: [][2]string{{loc("b")}, {loc("c")}}, - expKeys: [][2]string{{loc("b")}, {loc("c")}}, - from: "b", to: "e", - }, - { - // Range-local key outside of active range. - keys: [][2]string{{loc("a")}}, - expKeys: [][2]string{{}}, - from: "b", to: "e", - }, - { - // Range-local range contained in active range. - keys: [][2]string{{loc("b"), loc("e") + "\x00"}}, - expKeys: [][2]string{{loc("b"), loc("e") + "\x00"}}, - from: "b", to: "e\x00", - }, - { - // Range-local range not contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{}}, - from: "c", to: "e", - }, - { - // Range-local range not contained in active range. - keys: [][2]string{{loc("a"), locPrefix("b")}, {loc("e"), loc("f")}}, - expKeys: [][2]string{{}, {}}, - from: "b", to: "e", - }, - { - // Range-local range partially contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{loc("a"), locPrefix("b")}}, - from: "a", to: "b", - }, - { - // Range-local range partially contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{locPrefix("b"), loc("b")}}, - from: "b", to: "e", - }, - { - // Range-local range contained in active range. - keys: [][2]string{{locPrefix("b"), loc("b")}}, - expKeys: [][2]string{{locPrefix("b"), loc("b")}}, - from: "b", to: "c", - }, - { - // Mixed range-local vs global key range. - keys: [][2]string{{loc("c"), "d\x00"}}, - from: "b", to: "e", - err: "local key mixed with global key", - }, - { - // Key range touching and intersecting active range. - keys: [][2]string{{"a", "b"}, {"a", "c"}, {"p", "q"}, {"p", "r"}, {"a", "z"}}, - expKeys: [][2]string{{"b", "c"}, {"p", "q"}, {"p", "q"}, {"b", "q"}}, - from: "b", to: "q", - }, - // Active key range is intersection of descriptor and [from,to). - { - keys: [][2]string{{"c", "q"}}, - expKeys: [][2]string{{"d", "p"}}, - from: "a", to: "z", - desc: [2]string{"d", "p"}, - }, - { - keys: [][2]string{{"c", "q"}}, - expKeys: [][2]string{{"d", "p"}}, - from: "d", to: "p", - desc: [2]string{"a", "z"}, - }, - } - - for i, test := range testCases { - goldenOriginal := roachpb.BatchRequest{} - for _, ks := range test.keys { - if len(ks[1]) > 0 { - goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), - }, - IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, - }) - } else { - goldenOriginal.Add(&roachpb.GetRequest{ - RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, - }) - } - } - - original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} - for i, request := range goldenOriginal.Requests { - original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) - } - - desc := &roachpb.RangeDescriptor{ - StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), - } - if len(desc.StartKey) == 0 { - desc.StartKey = roachpb.RKey(test.from) - } - if len(desc.EndKey) == 0 { - desc.EndKey = roachpb.RKey(test.to) - } - rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} - rs, err := rs.Intersect(desc) - if err != nil { - t.Errorf("%d: intersection failure: %v", i, err) - continue - } - reqs, pos, err := Truncate(original.Requests, rs) - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) - } - continue - } - var numReqs int - for j, arg := range reqs { - req := arg.GetInner() - if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { - t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, - h.Key, h.EndKey, test.expKeys[j][0], test.expKeys[j][1]) - } else if len(h.Key) != 0 { - numReqs++ - } - } - if num := len(pos); numReqs != num { - t.Errorf("%d: counted %d requests, but truncation indicated %d", i, reqs, num) - } - if !reflect.DeepEqual(original, goldenOriginal) { - t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", - i, goldenOriginal, original) - } - } -} - -func BenchmarkTruncate(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - - rng, _ := randutil.NewTestRand() - // For simplicity, we'll work with single-byte keys, so we divide up the - // single-byte key space into three parts, and we'll be truncating the - // requests according to the middle part. - rs := roachpb.RSpan{Key: roachpb.RKey([]byte{85}), EndKey: roachpb.RKey([]byte{170})} - randomKeyByte := func() byte { - // Plus two is needed to skip local key space. - return byte(2 + rng.Intn(253)) - } - for _, numRequests := range []int{1 << 5, 1 << 10, 1 << 15} { - for _, requestType := range []string{"get", "scan"} { - b.Run(fmt.Sprintf("reqs=%d/type=%s", numRequests, requestType), func(b *testing.B) { - reqs := make([]roachpb.RequestUnion, numRequests) - switch requestType { - case "get": - for i := 0; i < numRequests; i++ { - var get roachpb.GetRequest - get.Key = []byte{randomKeyByte()} - reqs[i].MustSetInner(&get) - } - case "scan": - for i := 0; i < numRequests; i++ { - var scan roachpb.ScanRequest - startKey := randomKeyByte() - endKey := randomKeyByte() - if endKey < startKey { - startKey, endKey = endKey, startKey - } - scan.Key = []byte{startKey} - scan.EndKey = []byte{endKey} - reqs[i].MustSetInner(&scan) - } - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _, err := Truncate(reqs, rs) - require.NoError(b, err) - } - }) - } - } -} diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 6418c7e3964b..1b6330e895ca 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -450,6 +450,17 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re s.mu.Unlock() } }() + // TODO(yuzefovich): reuse truncation helpers between different Enqueue() + // calls. + // TODO(yuzefovich): introduce a fast path when all requests are contained + // within a single range. + var truncationHelper kvcoord.BatchTruncationHelper + // The streamer can process the responses in an arbitrary order, so we don't + // require the helper to preserve the order of requests. + const mustPreserveOrder = false + if err = truncationHelper.Init(scanDir, reqs, mustPreserveOrder); err != nil { + return err + } var reqsKeysScratch []roachpb.Key for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) { // Truncate the request span to the current range. @@ -458,10 +469,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re return err } // Find all requests that touch the current range. - singleRangeReqs, positions, err := kvcoord.Truncate(reqs, singleRangeSpan) + var singleRangeReqs []roachpb.RequestUnion + var positions []int + singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan) if err != nil { return err } + rs.Key = seekKey var subRequestIdx []int32 if !s.hints.SingleRowLookup { for i, pos := range positions { @@ -527,22 +541,6 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re } requestsToServe = append(requestsToServe, r) - - // Determine next seek key, taking potentially sparse requests into - // consideration. - // - // In next iteration, query next range. - // It's important that we use the EndKey of the current descriptor - // as opposed to the StartKey of the next one: if the former is stale, - // it's possible that the next range has since merged the subsequent - // one, and unless both descriptors are stale, the next descriptor's - // StartKey would move us to the beginning of the current range, - // resulting in a duplicate scan. - seekKey, err = kvcoord.Next(reqs, ri.Desc().EndKey) - rs.Key = seekKey - if err != nil { - return err - } } if streamerLocked {