diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 57974cb4d0d8..8ed46a6818ab 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -120,7 +120,6 @@ go_test( "send_test.go", "split_test.go", "transport_test.go", - "truncate_test.go", "txn_coord_sender_savepoints_test.go", "txn_coord_sender_server_test.go", "txn_coord_sender_test.go", diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index 880eb57bfa4a..75c03f592f9a 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -16,19 +16,119 @@ import ( "github.com/cockroachdb/errors" ) -var emptyHeader = roachpb.RequestHeader{} +// BatchTruncationHelper is a utility struct that helps with truncating requests +// to range boundaries as well as figuring out the next key to seek to for the +// range iterator. +// +// The caller should not use the helper if all requests fit within a single +// range since the helper has non-trivial setup cost. +// +// It is designed to be used roughly as follows: +// +// rs := keys.Range(requests) +// ri.Seek(scanDir, rs.Key) +// if !ri.NeedAnother(rs) { +// // All requests fit within a single range, don't use the helper. +// ... +// } +// helper.Init(scanDir, requests) +// for ri.Valid() { +// curRangeRS := rs.Intersect(ri.Token().Desc()) +// curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS) +// // Process curRangeReqs that touch a single range and then use positions +// // to reassemble the result. +// ... +// ri.Seek(scanDir, seekKey) +// } +// +type BatchTruncationHelper struct { + scanDir ScanDirection + requests []roachpb.RequestUnion + // mustPreserveOrder indicates whether the requests must be returned by + // Truncate() in the original order. + mustPreserveOrder bool +} + +// Init initializes the helper for the given requests. +// +// mustPreserveOrder, if true, indicates that the caller requires that requests +// are returned by Truncate() in the original order (i.e. with strictly +// increasing positions values). +func (h *BatchTruncationHelper) Init( + scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool, +) error { + h.scanDir = scanDir + h.requests = requests + h.mustPreserveOrder = mustPreserveOrder + return nil +} // Truncate restricts all requests to the given key range and returns new, // truncated, requests. All returned requests are "truncated" to the given span, // and requests which are found to not overlap the given span at all are +// removed. A mapping of response index to request index is returned. It also +// returns the next seek key for the range iterator. With Ascending scan +// direction, the next seek key is such that requests in [RKeyMin, seekKey) +// range have been processed, with Descending scan direction, it is such that +// requests in [seekKey, RKeyMax) range have been processed. +// +// For example, if +// +// reqs = Put[a], Put[c], Put[b], +// rs = [a,bb], +// BatchTruncationHelper.Init(Ascending, reqs) +// +// then BatchTruncationHelper.Truncate(rs) returns (Put[a], Put[b]), positions +// [0,2] as well as seekKey 'c'. +// +// Truncate returns the requests in an arbitrary order (meaning that positions +// return values might not be ascending), unless mustPreserveOrder was true in +// Init(). +// +// NOTE: it is assumed that +// 1. Truncate has been called on the previous ranges that intersect with +// keys.Range(reqs); +// 2. rs is intersected with the current range boundaries. +func (h *BatchTruncationHelper) Truncate( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) { + truncReqs, positions, err := truncateLegacy(h.requests, rs) + if err != nil { + return nil, nil, nil, err + } + var seekKey roachpb.RKey + if h.scanDir == Ascending { + // In next iteration, query next range. + // It's important that we use the EndKey of the current descriptor + // as opposed to the StartKey of the next one: if the former is stale, + // it's possible that the next range has since merged the subsequent + // one, and unless both descriptors are stale, the next descriptor's + // StartKey would move us to the beginning of the current range, + // resulting in a duplicate scan. + seekKey, err = nextLegacy(h.requests, rs.EndKey) + } else { + // In next iteration, query previous range. + // We use the StartKey of the current descriptor as opposed to the + // EndKey of the previous one since that doesn't have bugs when + // stale descriptors come into play. + seekKey, err = prevLegacy(h.requests, rs.Key) + } + return truncReqs, positions, seekKey, err +} + +var emptyHeader = roachpb.RequestHeader{} + +// truncateLegacy restricts all requests to the given key range and returns new, +// truncated, requests. All returned requests are "truncated" to the given span, +// and requests which are found to not overlap the given span at all are // removed. A mapping of response index to request index is returned. For // example, if // // reqs = Put[a], Put[c], Put[b], // rs = [a,bb], // -// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. -func Truncate( +// then truncateLegacy(reqs,rs) returns (Put[a], Put[b]) and positions [0,2]. +func truncateLegacy( reqs []roachpb.RequestUnion, rs roachpb.RSpan, ) ([]roachpb.RequestUnion, []int, error) { truncateOne := func(args roachpb.Request) (hasRequest bool, changed bool, _ roachpb.RequestHeader, _ error) { @@ -64,26 +164,26 @@ func Truncate( local = true } if keyAddr.Less(rs.Key) { - // rs.Key can't be local because it contains range split points, which - // are never local. + // rs.Key can't be local because it contains range split points, + // which are never local. changed = true if !local { header.Key = rs.Key.AsRawKey() } else { - // The local start key should be truncated to the boundary of local keys which - // address to rs.Key. + // The local start key should be truncated to the boundary of + // local keys which address to rs.Key. header.Key = keys.MakeRangeKeyPrefix(rs.Key) } } if !endKeyAddr.Less(rs.EndKey) { - // rs.EndKey can't be local because it contains range split points, which - // are never local. + // rs.EndKey can't be local because it contains range split points, + // which are never local. changed = true if !local { header.EndKey = rs.EndKey.AsRawKey() } else { - // The local end key should be truncated to the boundary of local keys which - // address to rs.EndKey. + // The local end key should be truncated to the boundary of + // local keys which address to rs.EndKey. header.EndKey = keys.MakeRangeKeyPrefix(rs.EndKey) } } @@ -122,18 +222,15 @@ func Truncate( return truncReqs, positions, nil } -// prev gives the right boundary of the union of all requests which don't +// prevLegacy gives the right boundary of the union of all requests which don't // affect keys larger than the given key. Note that a right boundary is -// exclusive, that is, the returned RKey is to be used as the exclusive -// right endpoint in finding the next range to query. +// exclusive, that is, the returned RKey is to be used as the exclusive right +// endpoint in finding the next range to query. // -// Informally, a call `prev(reqs, k)` means: we've already executed the parts -// of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the +// Informally, a call `prevLegacy(reqs, k)` means: we've already executed the +// parts of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the // left the next relevant request begins. -// -// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull -// 'keys' into 'roachpb'. -func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { +func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { candidate := roachpb.RKeyMin for _, union := range reqs { inner := union.GetInner() @@ -144,16 +241,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { } endKey := h.EndKey if len(endKey) == 0 { - // If we have a point request for `x < k` then that request has not been - // satisfied (since the batch has only been executed for keys `>=k`). We - // treat `x` as `[x, x.Next())` which does the right thing below. This - // also works when `x > k` or `x=k` as the logic below will skip `x`. + // If we have a point request for `x < k` then that request has not + // been satisfied (since the batch has only been executed for keys + // `>=k`). We treat `x` as `[x, x.Next())` which does the right + // thing below. This also works when `x > k` or `x=k` as the logic + // below will skip `x`. // // Note that if the key is /Local/x/something, then instead of using - // /Local/x/something.Next() as the end key, we rely on AddrUpperBound to - // handle local keys. In particular, AddrUpperBound will turn it into - // `x\x00`, so we're looking at the key-range `[x, x.Next())`. This is - // exactly what we want as the local key is contained in that range. + // /Local/x/something.Next() as the end key, we rely on + // AddrUpperBound to handle local keys. In particular, + // AddrUpperBound will turn it into `x\x00`, so we're looking at the + // key-range `[x, x.Next())`. This is exactly what we want as the + // local key is contained in that range. // // See TestBatchPrevNext for test cases with commentary. endKey = h.Key.Next() @@ -191,18 +290,15 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { return candidate, nil } -// Next gives the left boundary of the union of all requests which don't affect -// keys less than the given key. Note that the left boundary is inclusive, that -// is, the returned RKey is the inclusive left endpoint of the keys the request -// should operate on next. -// -// Informally, a call `Next(reqs, k)` means: we've already executed the parts of -// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the -// next relevant request begins. -// -// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull -// 'keys' into 'proto'. -func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { +// nextLegacy gives the left boundary of the union of all requests which don't +// affect keys less than the given key. Note that the left boundary is +// inclusive, that is, the returned RKey is the inclusive left endpoint of the +// keys the request should operate on next. +// +// Informally, a call `nextLegacy(reqs, k)` means: we've already executed the +// parts of `reqs` that intersect `[KeyMin, k)`; please tell me how far to the +// right the next relevant request begins. +func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) { candidate := roachpb.RKeyMax for _, union := range reqs { inner := union.GetInner() diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index 2fae3611dcaf..99b31d0e8578 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -12,15 +12,24 @@ package kvcoord import ( "bytes" + "fmt" + "reflect" + "sort" "testing" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" ) -// TestBatchPrevNext tests prev() and next() +// TestBatchPrevNext tests the seeking behavior of the +// BatchTruncationHelper.Truncate. func TestBatchPrevNext(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -147,23 +156,27 @@ func TestBatchPrevNext(t *testing.T) { expBW: "c\x00", }, // Explanations below are an exercise for the reader, but it's very similar to above. - {spans: span(loc("b"), ""), key: "a", + {spans: span(loc("b"), loc("c")), key: "a", expFW: "b", expBW: min, }, - {spans: span(loc("b"), ""), key: "b", + {spans: span(loc("b"), loc("c")), key: "b", expFW: "b", expBW: min, }, - {spans: span(loc("b"), ""), key: "b\x00", + // Note that such a span results in an invalid ScanRequest, but it's ok + // for the purposes of this test. + {spans: span(loc("b"), loc("b")), key: "b\x00", expFW: max, // Handled `key >= b\x00`, so next we'll have to chip away at `[KeyMin, b\x00)`. Note // how this doesn't return `b` which would be incorrect as `[KeyMin, b)` does not // contain `loc(b)`. expBW: "b\x00", }, + // Note that such a span results in an invalid ScanRequest, but it's ok + // for the purposes of this test. { - spans: span(loc("a"), "", loc("b"), ""), key: "b", + spans: span(loc("a"), loc("a"), loc("b"), loc("b")), key: "b", // We've dealt with any key that addresses to `< b`, and `loc(b)` is not // covered by it. `loc(b)` lives between `b` and `b\x00`, so we start at // `b`. @@ -193,12 +206,24 @@ func TestBatchPrevNext(t *testing.T) { args.Key, args.EndKey = span.Key, span.EndKey ba.Add(args) } - if next, err := Next(ba.Requests, roachpb.RKey(test.key)); err != nil { + var ascHelper, descHelper BatchTruncationHelper + const mustPreserveOrder = false + require.NoError(t, ascHelper.Init(Ascending, ba.Requests, mustPreserveOrder)) + require.NoError(t, descHelper.Init(Descending, ba.Requests, mustPreserveOrder)) + if _, _, next, err := ascHelper.Truncate( + roachpb.RSpan{ + Key: roachpb.RKeyMin, + EndKey: roachpb.RKey(test.key)}, + ); err != nil { t.Error(err) } else if !bytes.Equal(next, roachpb.Key(test.expFW)) { t.Errorf("next: expected %q, got %q", test.expFW, next) } - if prev, err := prev(ba.Requests, roachpb.RKey(test.key)); err != nil { + if _, _, prev, err := descHelper.Truncate( + roachpb.RSpan{ + Key: roachpb.RKey(test.key), + EndKey: roachpb.RKeyMax}, + ); err != nil { t.Error(err) } else if !bytes.Equal(prev, roachpb.Key(test.expBW)) { t.Errorf("prev: expected %q, got %q", test.expBW, prev) @@ -206,3 +231,538 @@ func TestBatchPrevNext(t *testing.T) { }) } } + +type requestsWithPositions struct { + reqs []roachpb.RequestUnion + positions []int +} + +var _ sort.Interface = &requestsWithPositions{} + +func (r *requestsWithPositions) Len() int { + return len(r.positions) +} + +func (r *requestsWithPositions) Less(i, j int) bool { + return r.positions[i] < r.positions[j] +} + +func (r *requestsWithPositions) Swap(i, j int) { + r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i] + r.positions[i], r.positions[j] = r.positions[j], r.positions[i] +} + +// TestTruncate verifies the truncation logic of the BatchTruncationHelper over +// a single range as well as truncateLegacy() function directly. +func TestTruncate(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + loc := func(s string) string { + return string(keys.RangeDescriptorKey(roachpb.RKey(s))) + } + locPrefix := func(s string) string { + return string(keys.MakeRangeKeyPrefix(roachpb.RKey(s))) + } + testCases := []struct { + keys [][2]string + expKeys [][2]string + from, to string + desc [2]string // optional, defaults to {from,to} + err string + }{ + { + // Keys inside of active range. + keys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, + expKeys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, + from: "a", to: "q\x00", + }, + { + // Keys outside of active range. + keys: [][2]string{{"a"}, {"a", "b"}, {"q"}, {"q", "z"}}, + expKeys: [][2]string{{}, {}, {}, {}}, + from: "b", to: "q", + }, + { + // Range-local keys inside of active range. + keys: [][2]string{{loc("b")}, {loc("c")}}, + expKeys: [][2]string{{loc("b")}, {loc("c")}}, + from: "b", to: "e", + }, + { + // Range-local key outside of active range. + keys: [][2]string{{loc("a")}}, + expKeys: [][2]string{{}}, + from: "b", to: "e", + }, + { + // Range-local range contained in active range. + keys: [][2]string{{loc("b"), loc("e") + "\x00"}}, + expKeys: [][2]string{{loc("b"), loc("e") + "\x00"}}, + from: "b", to: "e\x00", + }, + { + // Range-local range not contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{}}, + from: "c", to: "e", + }, + { + // Range-local range not contained in active range. + keys: [][2]string{{loc("a"), locPrefix("b")}, {loc("e"), loc("f")}}, + expKeys: [][2]string{{}, {}}, + from: "b", to: "e", + }, + { + // Range-local range partially contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{loc("a"), locPrefix("b")}}, + from: "a", to: "b", + }, + { + // Range-local range partially contained in active range. + keys: [][2]string{{loc("a"), loc("b")}}, + expKeys: [][2]string{{locPrefix("b"), loc("b")}}, + from: "b", to: "e", + }, + { + // Range-local range contained in active range. + keys: [][2]string{{locPrefix("b"), loc("b")}}, + expKeys: [][2]string{{locPrefix("b"), loc("b")}}, + from: "b", to: "c", + }, + { + // Mixed range-local vs global key range. + keys: [][2]string{{loc("c"), "d\x00"}}, + from: "b", to: "e", + err: "local key mixed with global key", + }, + { + // Key range touching and intersecting active range. + keys: [][2]string{{"a", "b"}, {"a", "c"}, {"p", "q"}, {"p", "r"}, {"a", "z"}}, + expKeys: [][2]string{{"b", "c"}, {"p", "q"}, {"p", "q"}, {"b", "q"}}, + from: "b", to: "q", + }, + // Active key range is intersection of descriptor and [from,to). + { + keys: [][2]string{{"c", "q"}}, + expKeys: [][2]string{{"d", "p"}}, + from: "a", to: "z", + desc: [2]string{"d", "p"}, + }, + { + keys: [][2]string{{"c", "q"}}, + expKeys: [][2]string{{"d", "p"}}, + from: "d", to: "p", + desc: [2]string{"a", "z"}, + }, + } + + for _, isLegacy := range []bool{false, true} { + for _, mustPreserveOrder := range []bool{false, true} { + if isLegacy && mustPreserveOrder { + // This config is meaningless because truncateLegacy() always + // preserves the ordering. + continue + } + for i, test := range testCases { + goldenOriginal := roachpb.BatchRequest{} + for _, ks := range test.keys { + if len(ks[1]) > 0 { + goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), + }, + IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, + }) + } else { + goldenOriginal.Add(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, + }) + } + } + + original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} + for i, request := range goldenOriginal.Requests { + original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) + } + + var truncationHelper BatchTruncationHelper + if !isLegacy { + if err := truncationHelper.Init(Ascending, original.Requests, mustPreserveOrder); err != nil { + t.Errorf("%d: Init failure: %v", i, err) + continue + } + // We need to truncate all requests up to the start of the + // test range since this is assumed by Truncate(). + truncateKey := roachpb.RKey(test.from) + if truncateKey.Less(roachpb.RKey(test.desc[0])) { + truncateKey = roachpb.RKey(test.desc[0]) + } + _, _, _, err := truncationHelper.Truncate( + roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, + ) + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } + } + desc := &roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), + } + if len(desc.StartKey) == 0 { + desc.StartKey = roachpb.RKey(test.from) + } + if len(desc.EndKey) == 0 { + desc.EndKey = roachpb.RKey(test.to) + } + rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} + rs, err := rs.Intersect(desc) + if err != nil { + t.Errorf("%d: intersection failure: %v", i, err) + continue + } + reqs, pos, err := truncateLegacy(original.Requests, rs) + if isLegacy { + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } + } else { + reqs, pos, _, err = truncationHelper.Truncate(rs) + } + if err != nil { + t.Errorf("%d: truncation failure: %v", i, err) + continue + } + if !isLegacy && !mustPreserveOrder { + // Truncate can return results in an arbitrary order, so we + // need to restore the order according to positions. + scratch := &requestsWithPositions{reqs: reqs, positions: pos} + sort.Sort(scratch) + } + var numReqs int + for j, arg := range reqs { + req := arg.GetInner() + if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { + t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, + h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) + } else if len(h.Key) != 0 { + numReqs++ + } + } + if num := len(pos); numReqs != num { + t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) + } + if !reflect.DeepEqual(original, goldenOriginal) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } + } + } + } +} + +// TestTruncateLoop verifies that using the BatchTruncationHelper in a loop over +// multiple ranges works as expected. +func TestTruncateLoop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + makeRangeDesc := func(boundaries [2]string) roachpb.RangeDescriptor { + return roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(boundaries[0]), EndKey: roachpb.RKey(boundaries[1]), + } + } + makeGetRequest := func(key string) roachpb.RequestUnion { + var req roachpb.RequestUnion + req.MustSetInner(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(key)}, + }) + return req + } + makeScanRequest := func(start, end string) roachpb.RequestUnion { + var req roachpb.RequestUnion + req.MustSetInner(&roachpb.ScanRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(start), EndKey: roachpb.Key(end), + }, + }) + return req + } + + type iterationCase struct { + rangeBoundaries [2]string + // When both strings are non-empty, then it represents a ScanRequest, if + // the second string is empty, then it is a GetRequest. + requests [][2]string + positions []int + ascSeekKey roachpb.RKey + descSeekKey roachpb.RKey + } + getRequests := func(reqs [][2]string) []roachpb.RequestUnion { + var requests []roachpb.RequestUnion + for _, req := range reqs { + if req[1] == "" { + requests = append(requests, makeGetRequest(req[0])) + } else { + requests = append(requests, makeScanRequest(req[0], req[1])) + } + } + return requests + } + assertExpected := func(tc iterationCase, reqs []roachpb.RequestUnion, positions []int) { + // Since requests can be truncated in an arbitrary order, we will sort + // them according to their positions. + scratch := requestsWithPositions{reqs: reqs, positions: positions} + sort.Sort(&scratch) + require.True(t, reflect.DeepEqual(getRequests(tc.requests), reqs)) + require.True(t, reflect.DeepEqual(tc.positions, positions)) + } + + for _, scanDir := range []ScanDirection{Ascending, Descending} { + for _, testCase := range []struct { + // When both strings are non-empty, then it represents a + // ScanRequest, if the second string is empty, then it is a + // GetRequest. + requests [][2]string + iteration []iterationCase + }{ + // A test case with three ranges and requests touching each of those + // ranges. + { + requests: [][2]string{ + {"i", "l"}, {"d"}, {"h", "k"}, {"g", "i"}, {"i"}, {"d", "f"}, {"b", "j"}, + }, + iteration: []iterationCase{ + { + rangeBoundaries: [2]string{"a", "e"}, + requests: [][2]string{{"d"}, {"d", "e"}, {"b", "e"}}, + positions: []int{1, 5, 6}, + ascSeekKey: roachpb.RKey("e"), + descSeekKey: roachpb.RKeyMin, + }, + { + rangeBoundaries: [2]string{"e", "i"}, + requests: [][2]string{{"h", "i"}, {"g", "i"}, {"e", "f"}, {"e", "i"}}, + positions: []int{2, 3, 5, 6}, + ascSeekKey: roachpb.RKey("i"), + descSeekKey: roachpb.RKey("e"), + }, + { + rangeBoundaries: [2]string{"i", "m"}, + requests: [][2]string{{"i", "l"}, {"i", "k"}, {"i"}, {"i", "j"}}, + positions: []int{0, 2, 4, 6}, + ascSeekKey: roachpb.RKeyMax, + descSeekKey: roachpb.RKey("i"), + }, + }, + }, + // A test case where each request fits within a single range, and + // there is a "gap" range that doesn't touch any of the requests. + { + requests: [][2]string{ + {"k", "l"}, {"d"}, {"j", "k"}, {"c", "d"}, {"k"}, {"a", "b"}, + }, + iteration: []iterationCase{ + { + rangeBoundaries: [2]string{"a", "e"}, + requests: [][2]string{{"d"}, {"c", "d"}, {"a", "b"}}, + positions: []int{1, 3, 5}, + ascSeekKey: roachpb.RKey("j"), + descSeekKey: roachpb.RKeyMin, + }, + { + rangeBoundaries: [2]string{"e", "i"}, + requests: nil, + positions: nil, + ascSeekKey: roachpb.RKey("j"), + descSeekKey: roachpb.RKey("d").Next(), + }, + { + rangeBoundaries: [2]string{"i", "m"}, + requests: [][2]string{{"k", "l"}, {"j", "k"}, {"k"}}, + positions: []int{0, 2, 4}, + ascSeekKey: roachpb.RKeyMax, + descSeekKey: roachpb.RKey("d").Next(), + }, + }, + }, + } { + requests := getRequests(testCase.requests) + rs, err := keys.Range(requests) + require.NoError(t, err) + var helper BatchTruncationHelper + const mustPreserveOrder = false + require.NoError(t, helper.Init(scanDir, requests, mustPreserveOrder)) + for i := 0; i < len(testCase.iteration); i++ { + tc := testCase.iteration[i] + if scanDir == Descending { + // Test cases are written assuming the Ascending scan + // direction, so we have to iterate in reverse here. + tc = testCase.iteration[len(testCase.iteration)-1-i] + } + desc := makeRangeDesc(tc.rangeBoundaries) + curRangeRS, err := rs.Intersect(&desc) + require.NoError(t, err) + reqs, positions, seekKey, err := helper.Truncate(curRangeRS) + require.NoError(t, err) + assertExpected(tc, reqs, positions) + expectedSeekKey := tc.ascSeekKey + if scanDir == Descending { + expectedSeekKey = tc.descSeekKey + } + require.Equal(t, expectedSeekKey, seekKey) + } + } + } +} + +func BenchmarkTruncateLoop(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + rng, _ := randutil.NewTestRand() + randomKey := func() []byte { + const keyLength = 8 + res := make([]byte, keyLength) + for i := range res { + // Plus two is needed to skip local key space. + res[i] = byte(2 + rng.Intn(253)) + } + return res + } + for _, scanDir := range []ScanDirection{Ascending, Descending} { + for _, mustPreserveOrder := range []bool{false, true} { + for _, numRequests := range []int{128, 16384} { + for _, numRanges := range []int{4, 64} { + // We'll split the whole key space into numRanges ranges + // using random numRanges-1 split points. + splitPoints := make([][]byte, numRanges-1) + for i := range splitPoints { + splitPoints[i] = randomKey() + } + // Sort all the split points so that we can treat them as + // boundaries of consecutive ranges. + for i := range splitPoints { + for j := i + 1; j < len(splitPoints); j++ { + if bytes.Compare(splitPoints[i], splitPoints[j]) > 0 { + splitPoints[i], splitPoints[j] = splitPoints[j], splitPoints[i] + } + } + } + rangeSpans := make([]roachpb.RSpan, numRanges) + rangeSpans[0].Key = roachpb.RKeyMin + rangeSpans[numRanges-1].EndKey = roachpb.RKeyMax + for i := range splitPoints { + rangeSpans[i].EndKey = splitPoints[i] + rangeSpans[i+1].Key = splitPoints[i] + } + scanDirStr := "asc" + if scanDir == Descending { + scanDirStr = "desc" + // Reverse all the range spans for the Descending scan + // direction. + for i, j := 0, numRanges-1; i < j; i, j = i+1, j-1 { + rangeSpans[i], rangeSpans[j] = rangeSpans[j], rangeSpans[i] + } + } + var orderStr string + if mustPreserveOrder { + orderStr = "/preserveOrder" + } + for _, requestType := range []string{"get", "scan"} { + b.Run(fmt.Sprintf( + "%s%s/reqs=%d/ranges=%d/type=%s", + scanDirStr, orderStr, numRequests, numRanges, requestType, + ), func(b *testing.B) { + reqs := make([]roachpb.RequestUnion, numRequests) + switch requestType { + case "get": + for i := 0; i < numRequests; i++ { + var get roachpb.GetRequest + get.Key = randomKey() + reqs[i].MustSetInner(&get) + } + case "scan": + for i := 0; i < numRequests; i++ { + var scan roachpb.ScanRequest + startKey := randomKey() + endKey := randomKey() + for bytes.Equal(startKey, endKey) { + endKey = randomKey() + } + if bytes.Compare(startKey, endKey) > 0 { + startKey, endKey = endKey, startKey + } + scan.Key = startKey + scan.EndKey = endKey + reqs[i].MustSetInner(&scan) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + var h BatchTruncationHelper + require.NoError(b, h.Init(scanDir, reqs, mustPreserveOrder)) + for _, rs := range rangeSpans { + _, _, _, err := h.Truncate(rs) + require.NoError(b, err) + } + } + }) + } + } + } + } + } +} + +func BenchmarkTruncateLegacy(b *testing.B) { + defer leaktest.AfterTest(b)() + defer log.Scope(b).Close(b) + + rng, _ := randutil.NewTestRand() + // For simplicity, we'll work with single-byte keys, so we divide up the + // single-byte key space into three parts, and we'll be truncating the + // requests according to the middle part. + rs := roachpb.RSpan{Key: roachpb.RKey([]byte{85}), EndKey: roachpb.RKey([]byte{170})} + randomKeyByte := func() byte { + // Plus two is needed to skip local key space. + return byte(2 + rng.Intn(253)) + } + for _, numRequests := range []int{1 << 5, 1 << 10, 1 << 15} { + for _, requestType := range []string{"get", "scan"} { + b.Run(fmt.Sprintf("reqs=%d/type=%s", numRequests, requestType), func(b *testing.B) { + reqs := make([]roachpb.RequestUnion, numRequests) + switch requestType { + case "get": + for i := 0; i < numRequests; i++ { + var get roachpb.GetRequest + get.Key = []byte{randomKeyByte()} + reqs[i].MustSetInner(&get) + } + case "scan": + for i := 0; i < numRequests; i++ { + var scan roachpb.ScanRequest + startKey := randomKeyByte() + endKey := randomKeyByte() + if endKey < startKey { + startKey, endKey = endKey, startKey + } + scan.Key = []byte{startKey} + scan.EndKey = []byte{endKey} + reqs[i].MustSetInner(&scan) + } + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _, err := truncateLegacy(reqs, rs) + require.NoError(b, err) + } + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e5f22213e6a0..8ef43a0df625 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1321,6 +1321,15 @@ func (ds *DistSender) divideAndSendBatchToRanges( canParallelize = canParallelize && !isExpensive } + var truncationHelper BatchTruncationHelper + // In several places that handle writes (kvserver.maybeStripInFlightWrites, + // storage.replayTransactionalWrite, possibly others) we rely on requests + // being in the original order, so the helper must preserve the order if the + // batch is not a read-only. + mustPreserveOrder := !ba.IsReadOnly() + if err := truncationHelper.Init(scanDir, ba.Requests, mustPreserveOrder); err != nil { + return nil, roachpb.NewError(err) + } // Iterate over the ranges that the batch touches. The iteration is done in // key order - the order of requests in the batch is not relevant for the // iteration. Each iteration sends for evaluation one sub-batch to one range. @@ -1340,33 +1349,6 @@ func (ds *DistSender) divideAndSendBatchToRanges( responseCh := make(chan response, 1) responseChs = append(responseChs, responseCh) - // Determine next seek key, taking a potentially sparse batch into - // consideration. - var err error - nextRS := rs - if scanDir == Descending { - // In next iteration, query previous range. - // We use the StartKey of the current descriptor as opposed to the - // EndKey of the previous one since that doesn't have bugs when - // stale descriptors come into play. - seekKey, err = prev(ba.Requests, ri.Desc().StartKey) - nextRS.EndKey = seekKey - } else { - // In next iteration, query next range. - // It's important that we use the EndKey of the current descriptor - // as opposed to the StartKey of the next one: if the former is stale, - // it's possible that the next range has since merged the subsequent - // one, and unless both descriptors are stale, the next descriptor's - // StartKey would move us to the beginning of the current range, - // resulting in a duplicate scan. - seekKey, err = Next(ba.Requests, ri.Desc().EndKey) - nextRS.Key = seekKey - } - if err != nil { - responseCh <- response{pErr: roachpb.NewError(err)} - return - } - // Truncate the request to range descriptor. curRangeRS, err := rs.Intersect(ri.Token().Desc()) if err != nil { @@ -1375,7 +1357,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( } curRangeBatch := ba var positions []int - curRangeBatch.Requests, positions, err = Truncate(ba.Requests, curRangeRS) + curRangeBatch.Requests, positions, seekKey, err = truncationHelper.Truncate(curRangeRS) if len(positions) == 0 && err == nil { // This shouldn't happen in the wild, but some tests exercise it. err = errors.Newf("truncation resulted in empty batch on %s: %s", rs, ba) @@ -1384,6 +1366,12 @@ func (ds *DistSender) divideAndSendBatchToRanges( responseCh <- response{pErr: roachpb.NewError(err)} return } + nextRS := rs + if scanDir == Ascending { + nextRS.Key = seekKey + } else { + nextRS.EndKey = seekKey + } lastRange := !ri.NeedAnother(rs) // Send the next partial batch to the first range in the "rs" span. @@ -1451,13 +1439,12 @@ func (ds *DistSender) divideAndSendBatchToRanges( } } - // The iteration is complete if the iterator's current range - // encompasses the remaining span, OR if the next span has - // inverted. This can happen if this method is invoked - // re-entrantly due to ranges being split or merged. In that case - // the batch request has all the original requests but the span is - // a sub-span of the original, causing next() and prev() methods - // to potentially return values which invert the span. + // The iteration is complete if the iterator's current range encompasses + // the remaining span, OR if the next span has inverted. This can happen + // if this method is invoked re-entrantly due to ranges being split or + // merged. In that case the batch request has all the original requests + // but the span is a sub-span of the original, causing Truncate() to + // potentially return the next seek key which inverts the span. if lastRange || !nextRS.Key.Less(nextRS.EndKey) { return } diff --git a/pkg/kv/kvclient/kvcoord/truncate_test.go b/pkg/kv/kvclient/kvcoord/truncate_test.go deleted file mode 100644 index 3be378cfda4e..000000000000 --- a/pkg/kv/kvclient/kvcoord/truncate_test.go +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright 2015 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package kvcoord - -import ( - "bytes" - "fmt" - "reflect" - "testing" - - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/stretchr/testify/require" -) - -func TestTruncate(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - loc := func(s string) string { - return string(keys.RangeDescriptorKey(roachpb.RKey(s))) - } - locPrefix := func(s string) string { - return string(keys.MakeRangeKeyPrefix(roachpb.RKey(s))) - } - testCases := []struct { - keys [][2]string - expKeys [][2]string - from, to string - desc [2]string // optional, defaults to {from,to} - err string - }{ - { - // Keys inside of active range. - keys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, - expKeys: [][2]string{{"a", "q"}, {"c"}, {"b, e"}, {"q"}}, - from: "a", to: "q\x00", - }, - { - // Keys outside of active range. - keys: [][2]string{{"a"}, {"a", "b"}, {"q"}, {"q", "z"}}, - expKeys: [][2]string{{}, {}, {}, {}}, - from: "b", to: "q", - }, - { - // Range-local keys inside of active range. - keys: [][2]string{{loc("b")}, {loc("c")}}, - expKeys: [][2]string{{loc("b")}, {loc("c")}}, - from: "b", to: "e", - }, - { - // Range-local key outside of active range. - keys: [][2]string{{loc("a")}}, - expKeys: [][2]string{{}}, - from: "b", to: "e", - }, - { - // Range-local range contained in active range. - keys: [][2]string{{loc("b"), loc("e") + "\x00"}}, - expKeys: [][2]string{{loc("b"), loc("e") + "\x00"}}, - from: "b", to: "e\x00", - }, - { - // Range-local range not contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{}}, - from: "c", to: "e", - }, - { - // Range-local range not contained in active range. - keys: [][2]string{{loc("a"), locPrefix("b")}, {loc("e"), loc("f")}}, - expKeys: [][2]string{{}, {}}, - from: "b", to: "e", - }, - { - // Range-local range partially contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{loc("a"), locPrefix("b")}}, - from: "a", to: "b", - }, - { - // Range-local range partially contained in active range. - keys: [][2]string{{loc("a"), loc("b")}}, - expKeys: [][2]string{{locPrefix("b"), loc("b")}}, - from: "b", to: "e", - }, - { - // Range-local range contained in active range. - keys: [][2]string{{locPrefix("b"), loc("b")}}, - expKeys: [][2]string{{locPrefix("b"), loc("b")}}, - from: "b", to: "c", - }, - { - // Mixed range-local vs global key range. - keys: [][2]string{{loc("c"), "d\x00"}}, - from: "b", to: "e", - err: "local key mixed with global key", - }, - { - // Key range touching and intersecting active range. - keys: [][2]string{{"a", "b"}, {"a", "c"}, {"p", "q"}, {"p", "r"}, {"a", "z"}}, - expKeys: [][2]string{{"b", "c"}, {"p", "q"}, {"p", "q"}, {"b", "q"}}, - from: "b", to: "q", - }, - // Active key range is intersection of descriptor and [from,to). - { - keys: [][2]string{{"c", "q"}}, - expKeys: [][2]string{{"d", "p"}}, - from: "a", to: "z", - desc: [2]string{"d", "p"}, - }, - { - keys: [][2]string{{"c", "q"}}, - expKeys: [][2]string{{"d", "p"}}, - from: "d", to: "p", - desc: [2]string{"a", "z"}, - }, - } - - for i, test := range testCases { - goldenOriginal := roachpb.BatchRequest{} - for _, ks := range test.keys { - if len(ks[1]) > 0 { - goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), - }, - IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, - }) - } else { - goldenOriginal.Add(&roachpb.GetRequest{ - RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, - }) - } - } - - original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} - for i, request := range goldenOriginal.Requests { - original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) - } - - desc := &roachpb.RangeDescriptor{ - StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), - } - if len(desc.StartKey) == 0 { - desc.StartKey = roachpb.RKey(test.from) - } - if len(desc.EndKey) == 0 { - desc.EndKey = roachpb.RKey(test.to) - } - rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} - rs, err := rs.Intersect(desc) - if err != nil { - t.Errorf("%d: intersection failure: %v", i, err) - continue - } - reqs, pos, err := Truncate(original.Requests, rs) - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) - } - continue - } - var numReqs int - for j, arg := range reqs { - req := arg.GetInner() - if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { - t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, - h.Key, h.EndKey, test.expKeys[j][0], test.expKeys[j][1]) - } else if len(h.Key) != 0 { - numReqs++ - } - } - if num := len(pos); numReqs != num { - t.Errorf("%d: counted %d requests, but truncation indicated %d", i, reqs, num) - } - if !reflect.DeepEqual(original, goldenOriginal) { - t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", - i, goldenOriginal, original) - } - } -} - -func BenchmarkTruncate(b *testing.B) { - defer leaktest.AfterTest(b)() - defer log.Scope(b).Close(b) - - rng, _ := randutil.NewTestRand() - // For simplicity, we'll work with single-byte keys, so we divide up the - // single-byte key space into three parts, and we'll be truncating the - // requests according to the middle part. - rs := roachpb.RSpan{Key: roachpb.RKey([]byte{85}), EndKey: roachpb.RKey([]byte{170})} - randomKeyByte := func() byte { - // Plus two is needed to skip local key space. - return byte(2 + rng.Intn(253)) - } - for _, numRequests := range []int{1 << 5, 1 << 10, 1 << 15} { - for _, requestType := range []string{"get", "scan"} { - b.Run(fmt.Sprintf("reqs=%d/type=%s", numRequests, requestType), func(b *testing.B) { - reqs := make([]roachpb.RequestUnion, numRequests) - switch requestType { - case "get": - for i := 0; i < numRequests; i++ { - var get roachpb.GetRequest - get.Key = []byte{randomKeyByte()} - reqs[i].MustSetInner(&get) - } - case "scan": - for i := 0; i < numRequests; i++ { - var scan roachpb.ScanRequest - startKey := randomKeyByte() - endKey := randomKeyByte() - if endKey < startKey { - startKey, endKey = endKey, startKey - } - scan.Key = []byte{startKey} - scan.EndKey = []byte{endKey} - reqs[i].MustSetInner(&scan) - } - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _, err := Truncate(reqs, rs) - require.NoError(b, err) - } - }) - } - } -} diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 6418c7e3964b..1b6330e895ca 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -450,6 +450,17 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re s.mu.Unlock() } }() + // TODO(yuzefovich): reuse truncation helpers between different Enqueue() + // calls. + // TODO(yuzefovich): introduce a fast path when all requests are contained + // within a single range. + var truncationHelper kvcoord.BatchTruncationHelper + // The streamer can process the responses in an arbitrary order, so we don't + // require the helper to preserve the order of requests. + const mustPreserveOrder = false + if err = truncationHelper.Init(scanDir, reqs, mustPreserveOrder); err != nil { + return err + } var reqsKeysScratch []roachpb.Key for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) { // Truncate the request span to the current range. @@ -458,10 +469,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re return err } // Find all requests that touch the current range. - singleRangeReqs, positions, err := kvcoord.Truncate(reqs, singleRangeSpan) + var singleRangeReqs []roachpb.RequestUnion + var positions []int + singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan) if err != nil { return err } + rs.Key = seekKey var subRequestIdx []int32 if !s.hints.SingleRowLookup { for i, pos := range positions { @@ -527,22 +541,6 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re } requestsToServe = append(requestsToServe, r) - - // Determine next seek key, taking potentially sparse requests into - // consideration. - // - // In next iteration, query next range. - // It's important that we use the EndKey of the current descriptor - // as opposed to the StartKey of the next one: if the former is stale, - // it's possible that the next range has since merged the subsequent - // one, and unless both descriptors are stale, the next descriptor's - // StartKey would move us to the beginning of the current range, - // resulting in a duplicate scan. - seekKey, err = kvcoord.Next(reqs, ri.Desc().EndKey) - rs.Key = seekKey - if err != nil { - return err - } } if streamerLocked {