diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8ed46a6818ab..1ccded24e24b 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/storage/enginepb", "//pkg/util", + "//pkg/util/buildutil", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/envutil", diff --git a/pkg/kv/kvclient/kvcoord/batch.go b/pkg/kv/kvclient/kvcoord/batch.go index 75c03f592f9a..5fd44de4b6f4 100644 --- a/pkg/kv/kvclient/kvcoord/batch.go +++ b/pkg/kv/kvclient/kvcoord/batch.go @@ -11,8 +11,11 @@ package kvcoord import ( + "sort" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) @@ -41,12 +44,112 @@ import ( // ri.Seek(scanDir, seekKey) // } // +// The helper utilizes two different strategies depending on whether the +// requests use local keys or not: +// +// - a "legacy" strategy is used when requests use local keys. This strategy +// utilizes "legacy" methods that operate on the original requests without +// keeping any additional bookkeeping. In particular, it leads to truncating +// already processed requests as well as to iterating over the fully processed +// requests when searching for the next seek key. +// +// - an "optimized" strategy is used when requests only use global keys. +// Although this strategy has the same worst-case complexity of O(N * R) as +// the "legacy" strategy (where N is the number of requests, R is the number +// of ranges that all requests fit int, the worst-case is achieved when all +// requests are range-spanning and each request spans all R ranges), in +// practice it is much faster. See the comments on truncateAsc() and +// truncateDesc() for the details. +// +// The gist of the optimized strategy is sorting all of the requests according +// to the keys upfront and then, on each truncation iteration, examining only a +// subset of requests that might overlap with the current range boundaries. The +// strategy is careful to update the internal state so that the ordering of +// unprocessed requests is maintained. +// +// The key insight is best shown by an example. Imagine that we have two +// requests Scan(a, c) and Scan(b, d) (which are ordered according to start keys +// with the Ascending scan direction), and on the first iteration we're +// truncating to range [a, b). Only the first request overlaps with the range, +// so we include Scan(a, b) into the truncation response, and, crucially, we can +// update the header of the first request to be [b, c) to track the remaining +// part of the first request. We can update the header in-place without breaking +// the ordering property. type BatchTruncationHelper struct { - scanDir ScanDirection + scanDir ScanDirection + // requests are the original requests this helper needs to process (possibly + // in non-original order). requests []roachpb.RequestUnion // mustPreserveOrder indicates whether the requests must be returned by // Truncate() in the original order. mustPreserveOrder bool + // foundLocalKey, if true, indicates whether some of the requests reference + // the local keys. When true, the helper falls back to the legacy methods. + foundLocalKey bool + + // Fields below are only used if the optimized strategy is used. + + // headers contains the parts of the corresponding requests that have not + // been processed yet. For range-spanning requests: + // - with the Ascending direction, we're advancing the start key of the + // header once a prefix of the request is returned by Truncate(); + // - with the Descending direction, we're moving the end key of the header + // backwards once a suffix of the request is returned by Truncate(). We also + // ensure that all requests have an end key (meaning that for point requests + // as well as range-spanning requests we populate the end key as + // startKey.Next()). + // + // All keys in the headers are global. + headers []roachpb.RequestHeader + // positions stores the corresponding indices of requests in the original + // requests slice. Once request is fully processed, it's position value + // becomes negative. + positions []int + // isRange indicates whether the corresponding request is a range-spanning + // one. + isRange []bool + // startIdx tracks the "smallest" request (according to the order of the + // original start keys) that might not have been fully processed. In other + // words, all requests in range [0, startIdx) have negative positions + // values. + startIdx int + // helper is only initialized and used if mustPreserveOrder is true. + helper orderRestorationHelper +} + +func (h *BatchTruncationHelper) Len() int { + return len(h.requests) +} + +func (h *BatchTruncationHelper) Swap(i, j int) { + h.requests[i], h.requests[j] = h.requests[j], h.requests[i] + h.headers[i], h.headers[j] = h.headers[j], h.headers[i] + h.positions[i], h.positions[j] = h.positions[j], h.positions[i] + h.isRange[i], h.isRange[j] = h.isRange[j], h.isRange[i] +} + +// ascBatchTruncationHelper is used for the Ascending scan direction in order to +// sort the requests in the ascending order of the start keys. +type ascBatchTruncationHelper struct { + *BatchTruncationHelper +} + +var _ sort.Interface = ascBatchTruncationHelper{} + +func (h ascBatchTruncationHelper) Less(i, j int) bool { + return h.headers[i].Key.Compare(h.headers[j].Key) < 0 +} + +// descBatchTruncationHelper is used for the Descending scan direction in order +// to sort the requests in the descending order of the end keys. +type descBatchTruncationHelper struct { + *BatchTruncationHelper +} + +var _ sort.Interface = descBatchTruncationHelper{} + +func (h descBatchTruncationHelper) Less(i, j int) bool { + return h.headers[i].EndKey.Compare(h.headers[j].EndKey) > 0 } // Init initializes the helper for the given requests. @@ -54,12 +157,71 @@ type BatchTruncationHelper struct { // mustPreserveOrder, if true, indicates that the caller requires that requests // are returned by Truncate() in the original order (i.e. with strictly // increasing positions values). +// +// If canReorderRequestsSlice is true, then the helper will hold on to the given +// slice and might reorder the requests within it (although each request will +// not be modified "deeply" - i.e. its header won't be updated or anything like +// that). Set it to false when the caller cares about the slice not being +// mutated in any way. func (h *BatchTruncationHelper) Init( - scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool, + scanDir ScanDirection, + requests []roachpb.RequestUnion, + mustPreserveOrder bool, + canReorderRequestsSlice bool, ) error { h.scanDir = scanDir h.requests = requests h.mustPreserveOrder = mustPreserveOrder + // Determine whether we can use the optimized strategy before making any + // allocations. + for i := range requests { + header := requests[i].GetInner().Header() + if keys.IsLocal(header.Key) { + h.foundLocalKey = true + return nil + } + } + // We can use the optimized strategy, so set up all of the internal state. + if !canReorderRequestsSlice { + // If we can't reorder the original requests slice, we must make a copy. + h.requests = make([]roachpb.RequestUnion, len(requests)) + copy(h.requests, requests) + } + h.headers = make([]roachpb.RequestHeader, len(requests)) + h.positions = make([]int, len(requests)) + h.isRange = make([]bool, len(requests)) + // Populate the internal state as well as perform some sanity checks on the + // requests. + for i := range requests { + req := requests[i].GetInner() + h.headers[i] = req.Header() + h.positions[i] = i + h.isRange[i] = roachpb.IsRange(req) + if h.isRange[i] { + // We're dealing with a range-spanning request. + if l, r := keys.IsLocal(h.headers[i].Key), keys.IsLocal(h.headers[i].EndKey); (l && !r) || (!l && r) { + return errors.Errorf("local key mixed with global key in range") + } + } else if len(h.headers[i].EndKey) > 0 { + return errors.Errorf("%T is not a range command, but EndKey is set", req) + } + } + if scanDir == Ascending { + sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: h}) + } else { + // With the Descending scan direction, we have to convert all point + // requests into range-spanning requests that include only a single + // point. + for i := range h.headers { + if len(h.headers[i].EndKey) == 0 { + h.headers[i].EndKey = h.headers[i].Key.Next() + } + } + sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: h}) + } + if h.mustPreserveOrder { + h.helper.init(len(requests)) + } return nil } @@ -92,9 +254,26 @@ func (h *BatchTruncationHelper) Init( func (h *BatchTruncationHelper) Truncate( rs roachpb.RSpan, ) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) { - truncReqs, positions, err := truncateLegacy(h.requests, rs) - if err != nil { - return nil, nil, nil, err + var truncReqs []roachpb.RequestUnion + var positions []int + var err error + if !h.foundLocalKey { + if h.scanDir == Ascending { + truncReqs, positions, err = h.truncateAsc(rs) + } else { + truncReqs, positions, err = h.truncateDesc(rs) + } + if err != nil { + return nil, nil, nil, err + } + if h.mustPreserveOrder { + truncReqs, positions = h.helper.restoreOrder(truncReqs, positions) + } + } else { + truncReqs, positions, err = truncateLegacy(h.requests, rs) + if err != nil { + return nil, nil, nil, err + } } var seekKey roachpb.RKey if h.scanDir == Ascending { @@ -105,17 +284,327 @@ func (h *BatchTruncationHelper) Truncate( // one, and unless both descriptors are stale, the next descriptor's // StartKey would move us to the beginning of the current range, // resulting in a duplicate scan. - seekKey, err = nextLegacy(h.requests, rs.EndKey) + seekKey, err = h.next(rs.EndKey) } else { // In next iteration, query previous range. // We use the StartKey of the current descriptor as opposed to the // EndKey of the previous one since that doesn't have bugs when // stale descriptors come into play. - seekKey, err = prevLegacy(h.requests, rs.Key) + seekKey, err = h.prev(rs.Key) } return truncReqs, positions, seekKey, err } +// truncateAsc is the optimized strategy for Truncate() with the Ascending scan +// direction when requests only use global keys. +// +// The first step of this strategy is to reorder all requests according to their +// start keys (this is done in Init()). Then, on every call to truncateAsc(), we +// only look at a subset of original requests that might overlap with rs and +// ignore already processed requests entirely. +// +// Let's go through an example. Say we have seven original requests: +// +// requests : Scan(i, l), Get(d), Scan(h, k), Scan(g, i), Get(i), Scan(d, f), Scan(b, h) +// positions: 0 1 2 3 4 5 6 +// +// as well three ranges to iterate over: +// +// ranges: range[a, e), range[e, i), range[i, m). +// +// In Init(), we have reordered the requests according to their start keys: +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: 6 1 5 3 2 4 0 +// headers : [b, h) [d) [d, f) [g, i) [h, k) [i) [i, l) +// +// On the first call to Truncate(), we're using the range [a, e). We only need +// to look at the first four requests since the fourth request starts after the +// end of the current range, and, due to the ordering, all following requests +// too. We truncate first three requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 2nd request Get(d) as fully processed. +// +// The first call prepares +// truncReqs = [Scan(b, e), Get(d), Scan(d, e)], positions = [6, 1, 5] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: 6 -1 5 3 2 4 0 +// headers : [e, h) [e, f) [g, i) [h, k) [i) [i, l) +// +// Then the optimized next() function determines the seekKey as 'e' and keeps +// the startIdx at 0. +// +// On the second call to Truncate(), we're using the range [e, i). We only need +// to look at the first six requests since the sixth request starts after the +// end of the current range, and, due to the ordering, all following requests +// too. We truncate first five requests to the range boundaries (skipping the +// second that has been fully processed already), update the headers to refer to +// the unprocessed parts of the corresponding requests, and mark the 1st, the +// 3rd, and the 4th requests as fully processed. +// +// The second call prepares +// truncReqs = [Scan(e, h), Scan(e, f), Scan(g, i), Scan(h, i)], positions = [6, 5, 3, 2] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: -1 -1 -1 -1 2 4 0 +// headers : [i, k) [i) [i, l) +// +// Then the optimized next() function determines the seekKey as 'i' and sets +// the startIdx at 4 (meaning that all first four requests have been fully +// processed). +// +// On the third call to Truncate(), we're using the range [i, m). We only look +// at the 5th, 6th, and 7th requests (because of the value of startIdx). All +// requests are contained within the range, so we include them into the return +// value and mark all of them as processed. +// +// The third call prepares +// truncReqs = [Scan(i, k), Get(i), Scan(i, l)], positions = [2, 4, 0] +// and the internal state is now +// +// requests : Scan(b, h), Get(d), Scan(d, f), Scan(g, i), Scan(h, k), Get(i), Scan(i, l) +// positions: -1 -1 -1 -1 -1 -1 -1 +// headers : +// +// Then the optimized next() function determines the seekKey as KeyMax and sets +// the startIdx at 7 (meaning that all requests have been fully processed), and +// we're done. +// +// NOTE: for all requests, headers always keep track of the unprocessed part of +// the request and is such that the ordering of the keys in the headers is +// preserved when the requests are truncated. +// +// Note that this function is very similar to truncateDesc(), and we could +// extract out the differences into an interface; however, this leads to +// non-trivial slowdown and increase in allocations, so we choose to duplicate +// the code for performance. +func (h *BatchTruncationHelper) truncateAsc( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, error) { + var truncReqs []roachpb.RequestUnion + var positions []int + for i := h.startIdx; i < len(h.positions); i++ { + pos := h.positions[i] + if pos < 0 { + // This request has already been fully processed, so there is no + // need to look at it. + continue + } + header := h.headers[i] + // rs.EndKey can't be local because it contains range split points, + // which are never local. + ek := rs.EndKey.AsRawKey() + if ek.Compare(header.Key) <= 0 { + // All of the remaining requests start after this range, so we're + // done. + break + } + if !h.isRange[i] { + // This is a point request, and the key is contained within this + // range, so we include the request as is and mark it as "fully + // processed". + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + continue + } + // We're dealing with a range-spanning request. + if buildutil.CrdbTestBuild { + // rs.Key can't be local because it contains range split points, + // which are never local. + if header.Key.Compare(rs.Key.AsRawKey()) < 0 { + return nil, nil, errors.AssertionFailedf( + "unexpectedly header.Key %s is less than rs %s", header.Key, rs, + ) + } + } + inner := h.requests[i].GetInner() + if header.EndKey.Compare(ek) <= 0 { + // This is the last part of this request since it is fully contained + // within this range, so we mark the request as "fully processed". + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + if origStartKey := inner.Header().Key; origStartKey.Equal(header.Key) { + // This range-spanning request fits within a single range, so we + // can just use the original request. + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + continue + } + } else { + header.EndKey = ek + // Adjust the start key of the header so that it contained only the + // unprocessed suffix of the request. + h.headers[i].Key = header.EndKey + } + shallowCopy := inner.ShallowCopy() + shallowCopy.SetHeader(header) + truncReqs = append(truncReqs, roachpb.RequestUnion{}) + truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy) + positions = append(positions, pos) + } + return truncReqs, positions, nil +} + +// truncateDesc is the optimized strategy for Truncate() with the Descending +// scan direction when requests only use global keys. +// +// The first step of this strategy is to reorder all requests according to their +// end keys with the descending direction (this is done in Init()). Then, on +// every call to truncateDesc(), we only look at a subset of original requests +// that might overlap with rs and ignore already processed requests entirely. +// +// Let's go through an example. Say we have seven original requests: +// +// requests : Scan(i, l), Get(d), Scan(h, k), Scan(g, i), Get(i), Scan(d, f), Scan(b, h) +// positions: 0 1 2 3 4 5 6 +// +// as well three ranges to iterate over: +// +// ranges: range[i, m), range[e, i), range[a, e). +// +// In Init(), we have reordered the requests according to their end keys with +// the descending direction (below, i' denotes Key("i").Next()): +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: 0 2 4 3 6 5 1 +// headers : [i, l) [h, k) [i, i') [g, i) [b, h) [d, f) [d, d') +// +// On the first call to Truncate(), we're using the range [i, m). We only need +// to look at the first four requests since the fourth request ends before the +// start of the current range, and, due to the ordering, all following requests +// too. We truncate first three requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 1st and the 3rd requests as fully processed. +// +// The first call prepares +// truncReqs = [Scan(i, l), Scan(i, k), Get(i)], positions = [0, 2, 4] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 2 -1 3 6 5 1 +// headers : [h, i) [g, i) [b, h) [d, f) [d, d') +// +// Then the optimized prev() function determines the seekKey as 'i' and moves +// the startIdx to 1. +// +// On the second call to Truncate(), we're using the range [e, i). We skip +// looking at the first request entirely (due to value of startIdx) and only +// need to look at all remaining requests (skipping the third one since it's fully +// processed). We truncate the requests to the range boundaries, update the +// headers to refer to the unprocessed parts of the corresponding requests, and +// mark the 2nd and the 4th requests as fully processed. +// +// The second call prepares +// truncReqs = [Scan(h, i), Scan(g, i), Scan(e, h), Scan(e, f)], positions = [2, 3, 6, 5] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 -1 -1 -1 6 5 1 +// headers : [b, e) [d, e) [d, d') +// +// Then the optimized prev() function determines the seekKey as 'e' and sets +// the startIdx at 4 (meaning that all first four requests have been fully +// processed). +// +// On the third call to Truncate(), we're using the range [a, e). We only look +// at the 5th, 6th, and 7th requests (because of the value of startIdx). All +// requests are contained within the range, so we include them into the return +// value and mark all of them as processed. +// +// The third call prepares +// truncReqs = [Scan(b, e), Scan(d, e), Get(d)], positions = [6, 5, 1] +// and the internal state is now +// +// requests : Scan(i, l), Scan(h, k), Get(i), Scan(g, i), Scan(b, h), Scan(d, f), Get(d) +// positions: -1 -1 -1 -1 -1 -1 -1 +// headers : +// +// Then the optimized prev() function determines the seekKey as KeyMin and sets +// the startIdx at 7 (meaning that all requests have been fully processed), and +// we're done. +// +// NOTE: for all requests, headers always keep track of the unprocessed part of +// the request and is such that the ordering of the end keys in the headers is +// preserved when the requests are truncated. +// +// Note that this function is very similar to truncateAsc(), and we could +// extract out the differences into an interface; however, this leads to +// non-trivial slowdown and increase in allocations, so we choose to duplicate +// the code for performance. +func (h *BatchTruncationHelper) truncateDesc( + rs roachpb.RSpan, +) ([]roachpb.RequestUnion, []int, error) { + var truncReqs []roachpb.RequestUnion + var positions []int + for i := h.startIdx; i < len(h.positions); i++ { + pos := h.positions[i] + if pos < 0 { + // This request has already been fully processed, so there is no + // need to look at it. + continue + } + header := h.headers[i] + // rs.Key can't be local because it contains range split points, which + // are never local. + sk := rs.Key.AsRawKey() + if sk.Compare(header.EndKey) >= 0 { + // All of the remaining requests end before this range, so we're + // done. + break + } + if !h.isRange[i] { + // This is a point request, and the key is contained within this + // range, so we include the request as is and mark it as "fully + // processed". + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + continue + } + // We're dealing with a range-spanning request. + if buildutil.CrdbTestBuild { + // rs.EndKey can't be local because it contains range split points, + // which are never local. + if header.EndKey.Compare(rs.EndKey.AsRawKey()) > 0 { + return nil, nil, errors.AssertionFailedf( + "unexpectedly header.EndKey %s is greater than rs %s", header.Key, rs, + ) + } + } + inner := h.requests[i].GetInner() + if header.Key.Compare(sk) >= 0 { + // This is the last part of this request since it is fully contained + // within this range, so we mark the request as "fully processed". + h.headers[i] = roachpb.RequestHeader{} + h.positions[i] = -1 + if origEndKey := inner.Header().EndKey; len(origEndKey) == 0 || origEndKey.Equal(header.EndKey) { + // This range-spanning request fits within a single range, so we + // can just use the original request. + truncReqs = append(truncReqs, h.requests[i]) + positions = append(positions, pos) + continue + } + } else { + header.Key = sk + // Adjust the end key of the header so that it contained only the + // unprocessed prefix of the request. + h.headers[i].EndKey = header.Key + } + shallowCopy := inner.ShallowCopy() + shallowCopy.SetHeader(header) + truncReqs = append(truncReqs, roachpb.RequestUnion{}) + truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy) + positions = append(positions, pos) + } + return truncReqs, positions, nil +} + var emptyHeader = roachpb.RequestHeader{} // truncateLegacy restricts all requests to the given key range and returns new, @@ -222,6 +711,37 @@ func truncateLegacy( return truncReqs, positions, nil } +// prev returns the next seek key for the range iterator with the Descending +// scan direction. +// +// Informally, a call `prev(k)` means: we've already executed the parts of +// `reqs` that intersect `[k, KeyMax)`; please tell me how far to the left the +// next relevant request begins. +func (h *BatchTruncationHelper) prev(k roachpb.RKey) (roachpb.RKey, error) { + if h.foundLocalKey { + return prevLegacy(h.requests, k) + } + // Skip over first startIdx-1 requests since they have been fully processed. + for i, pos := range h.positions[h.startIdx:] { + if pos < 0 { + continue + } + // This is the first request that hasn't been fully processed, so we can + // bump the startIdx to this request's index and use the end key of the + // unprocessed part for the next seek key. + // + // By construction, all requests after this one will have their end key + // greater or equal to this request's end key, thus, there is no need to + // iterate any further. See the comment on truncateDesc() for more + // details. + h.startIdx += i + return keys.Addr(h.headers[h.startIdx].EndKey) + } + // If we got to this point, then all requests have been fully processed. + h.startIdx = len(h.requests) + return roachpb.RKeyMin, nil +} + // prevLegacy gives the right boundary of the union of all requests which don't // affect keys larger than the given key. Note that a right boundary is // exclusive, that is, the returned RKey is to be used as the exclusive right @@ -290,6 +810,36 @@ func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, erro return candidate, nil } +// next returns the next seek key for the range iterator. +// +// Informally, a call `next(k)` means: we've already executed the parts of +// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the +// next relevant request begins. +func (h *BatchTruncationHelper) next(k roachpb.RKey) (roachpb.RKey, error) { + if h.foundLocalKey { + return nextLegacy(h.requests, k) + } + // Skip over first startIdx-1 requests since they have been fully processed. + for i, pos := range h.positions[h.startIdx:] { + if pos < 0 { + continue + } + // This is the first request that hasn't been fully processed, so we can + // bump the startIdx to this request's index and use the start key of + // the unprocessed part for the next seek key. + // + // By construction, all requests after this one will have their start + // key greater or equal to this request's start key, thus, there is no + // need to iterate any further. See the comment on truncateAsc() for + // more details. + h.startIdx += i + return keys.Addr(h.headers[h.startIdx].Key) + } + // If we got to this point, then all requests have been fully processed. + h.startIdx = len(h.requests) + return roachpb.RKeyMax, nil +} + // nextLegacy gives the left boundary of the union of all requests which don't // affect keys less than the given key. Note that the left boundary is // inclusive, that is, the returned RKey is the inclusive left endpoint of the @@ -330,3 +880,81 @@ func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, erro } return candidate, nil } + +// orderRestorationHelper is a utility struct that allows to restore the order +// of requests according to the positions values in O(N) time where N is the +// number of the original requests (i.e. before truncation). Benchmarks have +// shown that it is faster that a solution with explicitly sorting the requests +// in O(T log T) time where T is the number of truncated requests returned by +// a single Truncate() call. +type orderRestorationHelper struct { + // scratch is reused on the next call to restoreOrder() if it has enough + // capacity. + scratch []roachpb.RequestUnion + // found is used as a map indicating whether a request for a particular + // positions value is included into the truncated requests and at what + // index, -1 if the corresponding request is not found in the truncated + // requests. + // + // found has the length of N where N is the number of the original requests. + // It is reused on all restoreOrder() calls. + found []int +} + +func (h *orderRestorationHelper) init(numOriginalRequests int) { + h.found = make([]int, numOriginalRequests) + for i := range h.found { + h.found[i] = -1 + } +} + +// restoreOrder reorders truncReqs in the ascending order of the corresponding +// positions values. +// +// Let's go through a quick example. Say we have five original requests and the +// following setup: +// +// truncReqs = [Scan(a, c), Get(b), Scan(c, d)], positions = [3, 0, 4] +// +// We first populate the found map: +// +// found = [1, -1, -1, 0, 2] +// +// meaning that requests at positions 0, 3, 4 are present in truncReqs. Then we +// iterate over the found map, and for all non-negative found values, we include +// the corresponding request: +// 1. found[0] = 1 -> toReturn = [Get(b)] positions = [0] +// 2. found[1] = -1 -> skip +// 3. found[2] = -1 -> skip +// 4. found[3] = 0 -> toReturn = [Get(b), Scan(a, c)] positions = [0, 3] +// 5. found[4] = 2 -> toReturn = [Get(b), Scan(a, c), Scan(c, d)] positions = [0, 3, 4] +func (h *orderRestorationHelper) restoreOrder( + truncReqs []roachpb.RequestUnion, positions []int, +) ([]roachpb.RequestUnion, []int) { + for i, pos := range positions { + h.found[pos] = i + } + var toReturn []roachpb.RequestUnion + if cap(h.scratch) >= len(positions) { + toReturn = h.scratch[:0] + } else { + toReturn = make([]roachpb.RequestUnion, 0, len(positions)) + } + positions = positions[:0] + for pos, found := range h.found { + if found < 0 { + // The request with positions value 'pos' is not included in + // truncReqs. + continue + } + toReturn = append(toReturn, truncReqs[found]) + positions = append(positions, pos) + // Lose the reference to the request so that we can keep truncReqs as + // the scratch space for the next call. + truncReqs[found] = roachpb.RequestUnion{} + // Make sure that the found map is set up for the next call. + h.found[pos] = -1 + } + h.scratch = truncReqs + return toReturn, positions +} diff --git a/pkg/kv/kvclient/kvcoord/batch_test.go b/pkg/kv/kvclient/kvcoord/batch_test.go index 99b31d0e8578..d95cc59d51ce 100644 --- a/pkg/kv/kvclient/kvcoord/batch_test.go +++ b/pkg/kv/kvclient/kvcoord/batch_test.go @@ -208,8 +208,13 @@ func TestBatchPrevNext(t *testing.T) { } var ascHelper, descHelper BatchTruncationHelper const mustPreserveOrder = false - require.NoError(t, ascHelper.Init(Ascending, ba.Requests, mustPreserveOrder)) - require.NoError(t, descHelper.Init(Descending, ba.Requests, mustPreserveOrder)) + const canReorderRequestsSlice = false + require.NoError(t, ascHelper.Init( + Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice, + )) + require.NoError(t, descHelper.Init( + Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice, + )) if _, _, next, err := ascHelper.Truncate( roachpb.RSpan{ Key: roachpb.RKeyMin, @@ -364,102 +369,136 @@ func TestTruncate(t *testing.T) { // preserves the ordering. continue } - for i, test := range testCases { - goldenOriginal := roachpb.BatchRequest{} - for _, ks := range test.keys { - if len(ks[1]) > 0 { - goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), - }, - IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, - }) - } else { - goldenOriginal.Add(&roachpb.GetRequest{ - RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, - }) - } + for _, canReorderRequestsSlice := range []bool{false, true} { + if isLegacy && canReorderRequestsSlice { + // This config is meaningless because truncateLegacy() + // doesn't reorder the original requests slice. + continue } + for i, test := range testCases { + goldenOriginal := roachpb.BatchRequest{} + for _, ks := range test.keys { + if len(ks[1]) > 0 { + goldenOriginal.Add(&roachpb.ResolveIntentRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key(ks[0]), EndKey: roachpb.Key(ks[1]), + }, + IntentTxn: enginepb.TxnMeta{ID: uuid.MakeV4()}, + }) + } else { + goldenOriginal.Add(&roachpb.GetRequest{ + RequestHeader: roachpb.RequestHeader{Key: roachpb.Key(ks[0])}, + }) + } + } - original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} - for i, request := range goldenOriginal.Requests { - original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) - } + original := roachpb.BatchRequest{Requests: make([]roachpb.RequestUnion, len(goldenOriginal.Requests))} + for i, request := range goldenOriginal.Requests { + original.Requests[i].MustSetInner(request.GetInner().ShallowCopy()) + } - var truncationHelper BatchTruncationHelper - if !isLegacy { - if err := truncationHelper.Init(Ascending, original.Requests, mustPreserveOrder); err != nil { - t.Errorf("%d: Init failure: %v", i, err) - continue + var truncationHelper BatchTruncationHelper + if !isLegacy { + if err := truncationHelper.Init( + Ascending, original.Requests, mustPreserveOrder, canReorderRequestsSlice, + ); err != nil { + t.Errorf("%d: Init failure: %v", i, err) + continue + } + // We need to truncate all requests up to the start of + // the test range since this is assumed by Truncate(). + truncateKey := roachpb.RKey(test.from) + if truncateKey.Less(roachpb.RKey(test.desc[0])) { + truncateKey = roachpb.RKey(test.desc[0]) + } + _, _, _, err := truncationHelper.Truncate( + roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, + ) + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue + } } - // We need to truncate all requests up to the start of the - // test range since this is assumed by Truncate(). - truncateKey := roachpb.RKey(test.from) - if truncateKey.Less(roachpb.RKey(test.desc[0])) { - truncateKey = roachpb.RKey(test.desc[0]) + desc := &roachpb.RangeDescriptor{ + StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), } - _, _, _, err := truncationHelper.Truncate( - roachpb.RSpan{Key: roachpb.RKeyMin, EndKey: truncateKey}, - ) - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) - } + if len(desc.StartKey) == 0 { + desc.StartKey = roachpb.RKey(test.from) + } + if len(desc.EndKey) == 0 { + desc.EndKey = roachpb.RKey(test.to) + } + rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} + rs, err := rs.Intersect(desc) + if err != nil { + t.Errorf("%d: intersection failure: %v", i, err) continue } - } - desc := &roachpb.RangeDescriptor{ - StartKey: roachpb.RKey(test.desc[0]), EndKey: roachpb.RKey(test.desc[1]), - } - if len(desc.StartKey) == 0 { - desc.StartKey = roachpb.RKey(test.from) - } - if len(desc.EndKey) == 0 { - desc.EndKey = roachpb.RKey(test.to) - } - rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)} - rs, err := rs.Intersect(desc) - if err != nil { - t.Errorf("%d: intersection failure: %v", i, err) - continue - } - reqs, pos, err := truncateLegacy(original.Requests, rs) - if isLegacy { - if err != nil || test.err != "" { - if !testutils.IsError(err, test.err) { - t.Errorf("%d: %v (expected: %q)", i, err, test.err) + reqs, pos, err := truncateLegacy(original.Requests, rs) + if isLegacy { + if err != nil || test.err != "" { + if !testutils.IsError(err, test.err) { + t.Errorf("%d: %v (expected: %q)", i, err, test.err) + } + continue } + } else { + reqs, pos, _, err = truncationHelper.Truncate(rs) + } + if err != nil { + t.Errorf("%d: truncation failure: %v", i, err) continue } - } else { - reqs, pos, _, err = truncationHelper.Truncate(rs) - } - if err != nil { - t.Errorf("%d: truncation failure: %v", i, err) - continue - } - if !isLegacy && !mustPreserveOrder { - // Truncate can return results in an arbitrary order, so we - // need to restore the order according to positions. - scratch := &requestsWithPositions{reqs: reqs, positions: pos} - sort.Sort(scratch) - } - var numReqs int - for j, arg := range reqs { - req := arg.GetInner() - if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { - t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, - h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) - } else if len(h.Key) != 0 { - numReqs++ + if !isLegacy && !mustPreserveOrder { + // Truncate can return results in an arbitrary order, so + // we need to restore the order according to positions. + scratch := &requestsWithPositions{reqs: reqs, positions: pos} + sort.Sort(scratch) + } + var numReqs int + for j, arg := range reqs { + req := arg.GetInner() + if h := req.Header(); !bytes.Equal(h.Key, roachpb.Key(test.expKeys[j][0])) || !bytes.Equal(h.EndKey, roachpb.Key(test.expKeys[j][1])) { + t.Errorf("%d.%d: range mismatch: actual [%q,%q), wanted [%q,%q)", i, j, + h.Key, h.EndKey, roachpb.RKey(test.expKeys[j][0]), roachpb.RKey(test.expKeys[j][1])) + } else if len(h.Key) != 0 { + numReqs++ + } + } + if num := len(pos); numReqs != num { + t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) + } + if isLegacy || !canReorderRequestsSlice { + if !reflect.DeepEqual(original, goldenOriginal) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } + } else { + // Modifying the order of requests in a BatchRequest is + // ok, but we want to make sure that each request hasn't + // been modified "deeply", so we try different + // permutations of the original requests. + matched := make([]bool, len(original.Requests)) + var matchedCount int + for _, goldenReq := range goldenOriginal.Requests { + for j, origReq := range original.Requests { + if matched[j] { + continue + } + if reflect.DeepEqual(goldenReq, origReq) { + matched[j] = true + matchedCount++ + break + } + } + } + if matchedCount != len(matched) { + t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", + i, goldenOriginal, original) + } } - } - if num := len(pos); numReqs != num { - t.Errorf("%d: counted %d requests, but truncation indicated %d", i, numReqs, num) - } - if !reflect.DeepEqual(original, goldenOriginal) { - t.Errorf("%d: truncation mutated original:\nexpected: %s\nactual: %s", - i, goldenOriginal, original) } } } @@ -597,7 +636,10 @@ func TestTruncateLoop(t *testing.T) { require.NoError(t, err) var helper BatchTruncationHelper const mustPreserveOrder = false - require.NoError(t, helper.Init(scanDir, requests, mustPreserveOrder)) + const canReorderRequestsSlice = false + require.NoError(t, helper.Init( + scanDir, requests, mustPreserveOrder, canReorderRequestsSlice, + )) for i := 0; i < len(testCase.iteration); i++ { tc := testCase.iteration[i] if scanDir == Descending { @@ -706,7 +748,10 @@ func BenchmarkTruncateLoop(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { var h BatchTruncationHelper - require.NoError(b, h.Init(scanDir, reqs, mustPreserveOrder)) + const canReorderRequestsSlice = false + require.NoError(b, h.Init( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + )) for _, rs := range rangeSpans { _, _, _, err := h.Truncate(rs) require.NoError(b, err) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 8ef43a0df625..fa83aedd5350 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1327,7 +1327,15 @@ func (ds *DistSender) divideAndSendBatchToRanges( // being in the original order, so the helper must preserve the order if the // batch is not a read-only. mustPreserveOrder := !ba.IsReadOnly() - if err := truncationHelper.Init(scanDir, ba.Requests, mustPreserveOrder); err != nil { + // The DistSender relies on the order of ba.Requests not being changed when + // it sets the ResumeSpans on the incomplete requests, so we ask the helper + // to not modify the ba.Requests slice. + // TODO(yuzefovich): refactor the DistSender so that the truncation helper + // could reorder requests as it pleases. + const canReorderRequestsSlice = false + if err := truncationHelper.Init( + scanDir, ba.Requests, mustPreserveOrder, canReorderRequestsSlice, + ); err != nil { return nil, roachpb.NewError(err) } // Iterate over the ranges that the batch touches. The iteration is done in diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 3e1bc5c0f082..34e5f8cfb662 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -196,9 +196,9 @@ func checkResumeSpanScanResults( for i, res := range results { // Check that satisfied scans don't have resume spans. if _, satisfied := expSatisfied[i]; satisfied { - require.Nil(t, res.ResumeSpan, "satisfied scan %d (%s) has ResumeSpan: %v", + require.Nilf(t, res.ResumeSpan, "satisfied scan %d (%s) has ResumeSpan: %v", i, spans[i], res.ResumeSpan) - require.Zero(t, res.ResumeReason, "satisfied scan %d (%s) has ResumeReason: %v", + require.Zerof(t, res.ResumeReason, "satisfied scan %d (%s) has ResumeReason: %v", i, spans[i], res.ResumeReason) continue } @@ -207,36 +207,36 @@ func checkResumeSpanScanResults( // The resume span should be identical to the original request if no // results have been produced, or should continue after the last result // otherwise. - require.NotNil(t, res.ResumeSpan, "scan %d (%s): no resume span", i, spans[i]) - require.NotZero(t, res.ResumeReason, "scan %d (%s): no resume reason. resume span: %+v", + require.NotNilf(t, res.ResumeSpan, "scan %d (%s): no resume span", i, spans[i]) + require.NotZerof(t, res.ResumeReason, "scan %d (%s): no resume reason. resume span: %+v", i, spans[i], res.ResumeSpan) - require.Equal(t, expReason, res.ResumeReason, + require.Equalf(t, expReason, res.ResumeReason, "scan %d (%s): unexpected resume reason", i, spans[i]) if !reverse { if len(res.Rows) == 0 { - require.GreaterOrEqual(t, string(res.ResumeSpan.Key), spans[i][0], + require.GreaterOrEqualf(t, string(res.ResumeSpan.Key), spans[i][0], "scan %d (%s): expected resume span %s to be at or above scan start", i, spans[i], res.ResumeSpan) - require.Less(t, string(res.ResumeSpan.Key), spans[i][1], + require.Lessf(t, string(res.ResumeSpan.Key), spans[i][1], "scan %d (%s): expected resume span %s to be below scan end", i, spans[i], res.ResumeSpan) } else { - require.Greater(t, string(res.ResumeSpan.Key), expResults[i][len(res.Rows)-1], + require.Greaterf(t, string(res.ResumeSpan.Key), expResults[i][len(res.Rows)-1], "scan %d (%s): expected resume span %s to be above last result", i, spans[i], res.ResumeSpan) } - require.Equal(t, spans[i][1], string(res.ResumeSpan.EndKey), + require.Equalf(t, spans[i][1], string(res.ResumeSpan.EndKey), "scan %d (%s): expected resume span %s to have same end key", i, spans[i], res.ResumeSpan) } else { if len(res.Rows) == 0 { - require.Greater(t, string(res.ResumeSpan.EndKey), spans[i][0], + require.Greaterf(t, string(res.ResumeSpan.EndKey), spans[i][0], "scan %d (%s): expected resume span %s to be above scan start", i, spans[i], res.ResumeSpan) - require.LessOrEqual(t, string(res.ResumeSpan.EndKey), spans[i][1], + require.LessOrEqualf(t, string(res.ResumeSpan.EndKey), spans[i][1], "scan %d (%s): expected resume span %s to be at or below scan end", i, spans[i], res.ResumeSpan) } else { - require.Less(t, string(res.ResumeSpan.EndKey), expResults[i][len(res.Rows)-1], + require.Lessf(t, string(res.ResumeSpan.EndKey), expResults[i][len(res.Rows)-1], "scan %d (%s): expected resume span %s to be below last result", i, spans[i], res.ResumeSpan) } - require.Equal(t, spans[i][0], string(res.ResumeSpan.Key), + require.Equalf(t, spans[i][0], string(res.ResumeSpan.Key), "scan %d (%s): expected resume span %s to have same start key", i, spans[i], res.ResumeSpan) } } @@ -349,6 +349,11 @@ func TestMultiRangeBoundedBatchScan(t *testing.T) { {"f2"}, } var expResultsReverse [][]string + // reverseProcessOrder contains indices into expResultsReverse ordered with + // the descending direction on the first key in each slice (this is the + // order in which the DistSender will process the corresponding ReverseScan + // requests). + reverseProcessOrder := []int{2, 3, 1, 0} for _, res := range expResults { var rres []string for i := len(res) - 1; i >= 0; i-- { @@ -412,7 +417,8 @@ func TestMultiRangeBoundedBatchScan(t *testing.T) { // The split contains keys [lastK..firstK]. firstK := sort.SearchStrings(keys, splits[s]) - 1 lastK := sort.SearchStrings(keys, splits[s-1]) - for j, res := range expResultsReverse { + for _, j := range reverseProcessOrder { + res := expResultsReverse[j] for expIdx := len(res) - 1; expIdx >= 0; expIdx-- { expK := res[expIdx] for k := firstK; k >= lastK; k-- { @@ -559,22 +565,22 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { }{ { name: "unsorted, non-overlapping, neither satisfied", - bound: 6, + bound: 3, spans: [][]string{ {"b1", "d"}, {"a", "b1"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3"}, + {}, {"a1", "a2", "a3"}, }, }, { name: "unsorted, non-overlapping, first satisfied", - bound: 6, + bound: 9, spans: [][]string{ - {"b1", "c"}, {"a", "b1"}, + {"b1", "c"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3"}, + {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, expSatisfied: []int{0}, }, @@ -650,17 +656,17 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { {"b", "g"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1"}, + {"b1"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, }, { name: "unsorted, overlapping, first satisfied", - bound: 7, + bound: 9, spans: [][]string{ {"b", "c"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1"}, + {"b1", "b2", "b3"}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, expSatisfied: []int{0}, }, @@ -693,7 +699,7 @@ func TestMultiRangeBoundedBatchScanPartialResponses(t *testing.T) { {"b", "g"}, {"c", "f"}, {"a", "d"}, }, expResults: [][]string{ - {"b1", "b2", "b3"}, {}, {"a1", "a2", "a3", "b1"}, + {"b1"}, {}, {"a1", "a2", "a3", "b1", "b2", "b3"}, }, }, { diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 1b6330e895ca..cd9067feab89 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -456,9 +456,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re // within a single range. var truncationHelper kvcoord.BatchTruncationHelper // The streamer can process the responses in an arbitrary order, so we don't - // require the helper to preserve the order of requests. + // require the helper to preserve the order of requests and allow it to + // reorder the reqs slice too. const mustPreserveOrder = false - if err = truncationHelper.Init(scanDir, reqs, mustPreserveOrder); err != nil { + const canReorderRequestsSlice = true + if err = truncationHelper.Init( + scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice, + ); err != nil { return err } var reqsKeysScratch []roachpb.Key