Skip to content

Commit

Permalink
kvcoord: introduce batch truncation helper
Browse files Browse the repository at this point in the history
This commit introduces a batch truncation helper that encompasses logic
of truncating requests to the boundaries of a single range as well as
returning the next key to seek the range iterator to. The helper is now
used both in the DistSender as well as in the Streamer. No modification
to the actual logic of `Truncate` nor `Next`/`prev` functions has been
done other than incorporating the return of the next seek key into
`Truncate` function itself. This is needed since the following commit
will tightly couple the truncation process with the next seek key
determination in order to optimize it.

The helper can be configured with a knob indicating whether `Truncate`
needs to return requests in the original order. This behavior is
necessary by the BatchRequests that contain writes since in several
spots we rely on the ordering assumptions (e.g. of increasing values of
`Sequence`).

The following adjustments were made to the tests:
- `BenchmarkTruncate` has been renamed to `BenchmarkTruncateLegacy`
- `TestTruncate` has been refactored to exercise the new and the old
code-paths
- `TestBatchPrevNext` has been refactored to run through the new code
path, also a few test cases have been adjusted slightly.

This commit also introduces some unit tests for the new code path when
it runs in a loop over multiple ranges as well as a corresponding
benchmark.

Release note: None
  • Loading branch information
yuzefovich committed Jun 15, 2022
1 parent 25d7ef7 commit d90e204
Show file tree
Hide file tree
Showing 6 changed files with 740 additions and 342 deletions.
1 change: 0 additions & 1 deletion pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ go_test(
"send_test.go",
"split_test.go",
"transport_test.go",
"truncate_test.go",
"txn_coord_sender_savepoints_test.go",
"txn_coord_sender_server_test.go",
"txn_coord_sender_test.go",
Expand Down
176 changes: 136 additions & 40 deletions pkg/kv/kvclient/kvcoord/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,119 @@ import (
"github.com/cockroachdb/errors"
)

var emptyHeader = roachpb.RequestHeader{}
// BatchTruncationHelper is a utility struct that helps with truncating requests
// to range boundaries as well as figuring out the next key to seek to for the
// range iterator.
//
// The caller should not use the helper if all requests fit within a single
// range since the helper has non-trivial setup cost.
//
// It is designed to be used roughly as follows:
//
// rs := keys.Range(requests)
// ri.Seek(scanDir, rs.Key)
// if !ri.NeedAnother(rs) {
// // All requests fit within a single range, don't use the helper.
// ...
// }
// helper.Init(scanDir, requests)
// for ri.Valid() {
// curRangeRS := rs.Intersect(ri.Token().Desc())
// curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS)
// // Process curRangeReqs that touch a single range and then use positions
// // to reassemble the result.
// ...
// ri.Seek(scanDir, seekKey)
// }
//
type BatchTruncationHelper struct {
scanDir ScanDirection
requests []roachpb.RequestUnion
// mustPreserveOrder indicates whether the requests must be returned by
// Truncate() in the original order.
mustPreserveOrder bool
}

// Init initializes the helper for the given requests.
//
// mustPreserveOrder, if true, indicates that the caller requires that requests
// are returned by Truncate() in the original order (i.e. with strictly
// increasing positions values).
func (h *BatchTruncationHelper) Init(
scanDir ScanDirection, requests []roachpb.RequestUnion, mustPreserveOrder bool,
) error {
h.scanDir = scanDir
h.requests = requests
h.mustPreserveOrder = mustPreserveOrder
return nil
}

// Truncate restricts all requests to the given key range and returns new,
// truncated, requests. All returned requests are "truncated" to the given span,
// and requests which are found to not overlap the given span at all are
// removed. A mapping of response index to request index is returned. It also
// returns the next seek key for the range iterator. With Ascending scan
// direction, the next seek key is such that requests in [RKeyMin, seekKey)
// range have been processed, with Descending scan direction, it is such that
// requests in [seekKey, RKeyMax) range have been processed.
//
// For example, if
//
// reqs = Put[a], Put[c], Put[b],
// rs = [a,bb],
// BatchTruncationHelper.Init(Ascending, reqs)
//
// then BatchTruncationHelper.Truncate(rs) returns (Put[a], Put[b]), positions
// [0,2] as well as seekKey 'c'.
//
// Truncate returns the requests in an arbitrary order (meaning that positions
// return values might not be ascending), unless mustPreserveOrder was true in
// Init().
//
// NOTE: it is assumed that
// 1. Truncate has been called on the previous ranges that intersect with
// keys.Range(reqs);
// 2. rs is intersected with the current range boundaries.
func (h *BatchTruncationHelper) Truncate(
rs roachpb.RSpan,
) ([]roachpb.RequestUnion, []int, roachpb.RKey, error) {
truncReqs, positions, err := truncateLegacy(h.requests, rs)
if err != nil {
return nil, nil, nil, err
}
var seekKey roachpb.RKey
if h.scanDir == Ascending {
// In next iteration, query next range.
// It's important that we use the EndKey of the current descriptor
// as opposed to the StartKey of the next one: if the former is stale,
// it's possible that the next range has since merged the subsequent
// one, and unless both descriptors are stale, the next descriptor's
// StartKey would move us to the beginning of the current range,
// resulting in a duplicate scan.
seekKey, err = nextLegacy(h.requests, rs.EndKey)
} else {
// In next iteration, query previous range.
// We use the StartKey of the current descriptor as opposed to the
// EndKey of the previous one since that doesn't have bugs when
// stale descriptors come into play.
seekKey, err = prevLegacy(h.requests, rs.Key)
}
return truncReqs, positions, seekKey, err
}

var emptyHeader = roachpb.RequestHeader{}

// truncateLegacy restricts all requests to the given key range and returns new,
// truncated, requests. All returned requests are "truncated" to the given span,
// and requests which are found to not overlap the given span at all are
// removed. A mapping of response index to request index is returned. For
// example, if
//
// reqs = Put[a], Put[c], Put[b],
// rs = [a,bb],
//
// then Truncate(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
func Truncate(
// then truncateLegacy(reqs,rs) returns (Put[a], Put[b]) and positions [0,2].
func truncateLegacy(
reqs []roachpb.RequestUnion, rs roachpb.RSpan,
) ([]roachpb.RequestUnion, []int, error) {
truncateOne := func(args roachpb.Request) (hasRequest bool, changed bool, _ roachpb.RequestHeader, _ error) {
Expand Down Expand Up @@ -64,26 +164,26 @@ func Truncate(
local = true
}
if keyAddr.Less(rs.Key) {
// rs.Key can't be local because it contains range split points, which
// are never local.
// rs.Key can't be local because it contains range split points,
// which are never local.
changed = true
if !local {
header.Key = rs.Key.AsRawKey()
} else {
// The local start key should be truncated to the boundary of local keys which
// address to rs.Key.
// The local start key should be truncated to the boundary of
// local keys which address to rs.Key.
header.Key = keys.MakeRangeKeyPrefix(rs.Key)
}
}
if !endKeyAddr.Less(rs.EndKey) {
// rs.EndKey can't be local because it contains range split points, which
// are never local.
// rs.EndKey can't be local because it contains range split points,
// which are never local.
changed = true
if !local {
header.EndKey = rs.EndKey.AsRawKey()
} else {
// The local end key should be truncated to the boundary of local keys which
// address to rs.EndKey.
// The local end key should be truncated to the boundary of
// local keys which address to rs.EndKey.
header.EndKey = keys.MakeRangeKeyPrefix(rs.EndKey)
}
}
Expand Down Expand Up @@ -122,18 +222,15 @@ func Truncate(
return truncReqs, positions, nil
}

// prev gives the right boundary of the union of all requests which don't
// prevLegacy gives the right boundary of the union of all requests which don't
// affect keys larger than the given key. Note that a right boundary is
// exclusive, that is, the returned RKey is to be used as the exclusive
// right endpoint in finding the next range to query.
// exclusive, that is, the returned RKey is to be used as the exclusive right
// endpoint in finding the next range to query.
//
// Informally, a call `prev(reqs, k)` means: we've already executed the parts
// of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the
// Informally, a call `prevLegacy(reqs, k)` means: we've already executed the
// parts of `reqs` that intersect `[k, KeyMax)`; please tell me how far to the
// left the next relevant request begins.
//
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
// 'keys' into 'roachpb'.
func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
func prevLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
candidate := roachpb.RKeyMin
for _, union := range reqs {
inner := union.GetInner()
Expand All @@ -144,16 +241,18 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
}
endKey := h.EndKey
if len(endKey) == 0 {
// If we have a point request for `x < k` then that request has not been
// satisfied (since the batch has only been executed for keys `>=k`). We
// treat `x` as `[x, x.Next())` which does the right thing below. This
// also works when `x > k` or `x=k` as the logic below will skip `x`.
// If we have a point request for `x < k` then that request has not
// been satisfied (since the batch has only been executed for keys
// `>=k`). We treat `x` as `[x, x.Next())` which does the right
// thing below. This also works when `x > k` or `x=k` as the logic
// below will skip `x`.
//
// Note that if the key is /Local/x/something, then instead of using
// /Local/x/something.Next() as the end key, we rely on AddrUpperBound to
// handle local keys. In particular, AddrUpperBound will turn it into
// `x\x00`, so we're looking at the key-range `[x, x.Next())`. This is
// exactly what we want as the local key is contained in that range.
// /Local/x/something.Next() as the end key, we rely on
// AddrUpperBound to handle local keys. In particular,
// AddrUpperBound will turn it into `x\x00`, so we're looking at the
// key-range `[x, x.Next())`. This is exactly what we want as the
// local key is contained in that range.
//
// See TestBatchPrevNext for test cases with commentary.
endKey = h.Key.Next()
Expand Down Expand Up @@ -191,18 +290,15 @@ func prev(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
return candidate, nil
}

// Next gives the left boundary of the union of all requests which don't affect
// keys less than the given key. Note that the left boundary is inclusive, that
// is, the returned RKey is the inclusive left endpoint of the keys the request
// should operate on next.
//
// Informally, a call `Next(reqs, k)` means: we've already executed the parts of
// `reqs` that intersect `[KeyMin, k)`; please tell me how far to the right the
// next relevant request begins.
//
// TODO(tschottdorf): again, better on BatchRequest itself, but can't pull
// 'keys' into 'proto'.
func Next(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
// nextLegacy gives the left boundary of the union of all requests which don't
// affect keys less than the given key. Note that the left boundary is
// inclusive, that is, the returned RKey is the inclusive left endpoint of the
// keys the request should operate on next.
//
// Informally, a call `nextLegacy(reqs, k)` means: we've already executed the
// parts of `reqs` that intersect `[KeyMin, k)`; please tell me how far to the
// right the next relevant request begins.
func nextLegacy(reqs []roachpb.RequestUnion, k roachpb.RKey) (roachpb.RKey, error) {
candidate := roachpb.RKeyMax
for _, union := range reqs {
inner := union.GetInner()
Expand Down
Loading

0 comments on commit d90e204

Please sign in to comment.