Skip to content

Commit

Permalink
kvcoord: refactor the truncation helper for reuse
Browse files Browse the repository at this point in the history
This commit refactors the batch truncation helper so that it can be
reused for multiple batches of requests. In particular, that ability is
now utilized by the streamer. Additionally, since the streamer now holds
on to the same truncation helper for possibly a long time, this commit
adds the memory accounting for the internal state of the helper.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent 35e502e commit 75a18a8
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 55 deletions.
166 changes: 131 additions & 35 deletions pkg/kv/kvclient/kvcoord/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvcoord

import (
"sort"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -34,7 +35,7 @@ import (
// // All requests fit within a single range, don't use the helper.
// ...
// }
// helper := MakeBatchTruncationHelper(scanDir, requests)
// helper := NewBatchTruncationHelper(scanDir, requests)
// for ri.Valid() {
// curRangeRS := rs.Intersect(ri.Token().Desc())
// curRangeReqs, positions, seekKey := helper.Truncate(curRangeRS)
Expand Down Expand Up @@ -80,9 +81,21 @@ type BatchTruncationHelper struct {
// requests are the original requests this helper needs to process (possibly
// in non-original order).
requests []roachpb.RequestUnion
// ownRequestsSlice indicates whether a separate slice was allocated for
// requests. It is used for the purposes of the memory accounting.
//
// It is the same as !canReorderRequestsSlice in most cases, except for when
// the local keys are present. In such a scenario, even if
// canReorderRequestsSlice is false, ownRequestsSlice might remain false.
ownRequestsSlice bool
// mustPreserveOrder indicates whether the requests must be returned by
// Truncate() in the original order.
mustPreserveOrder bool
// canReorderRequestsSlice indicates whether the helper will hold on to the
// given slice of requests and might reorder the requests within it
// (although each request will not be modified "deeply" - i.e. its header
// won't be updated or anything like that).
canReorderRequestsSlice bool
// foundLocalKey, if true, indicates whether some of the requests reference
// the local keys. When true, the helper falls back to the legacy methods.
foundLocalKey bool
Expand Down Expand Up @@ -154,8 +167,9 @@ func (h descBatchTruncationHelper) Less(i, j int) bool {
return h.headers[i].EndKey.Compare(h.headers[j].EndKey) > 0
}

// MakeBatchTruncationHelper returns a new BatchTruncationHelper for the given
// requests.
// NewBatchTruncationHelper returns a new BatchTruncationHelper for the given
// requests. The helper can be reused later for a different set of requests via
// a separate Init() call.
//
// mustPreserveOrder, if true, indicates that the caller requires that requests
// are returned by Truncate() in the original order (i.e. with strictly
Expand All @@ -166,67 +180,141 @@ func (h descBatchTruncationHelper) Less(i, j int) bool {
// not be modified "deeply" - i.e. its header won't be updated or anything like
// that). Set it to false when the caller cares about the slice not being
// mutated in any way.
func MakeBatchTruncationHelper(
func NewBatchTruncationHelper(
scanDir ScanDirection,
requests []roachpb.RequestUnion,
mustPreserveOrder bool,
canReorderRequestsSlice bool,
) (BatchTruncationHelper, error) {
var ret BatchTruncationHelper
ret.scanDir = scanDir
ret.requests = requests
ret.mustPreserveOrder = mustPreserveOrder
) (*BatchTruncationHelper, error) {
ret := &BatchTruncationHelper{
scanDir: scanDir,
mustPreserveOrder: mustPreserveOrder,
canReorderRequestsSlice: canReorderRequestsSlice,
}
return ret, ret.Init(requests)
}

// Init sets up the helper for the provided requests. It can be called multiple
// times, and it will reuse as much internal allocations as possible.
func (h *BatchTruncationHelper) Init(requests []roachpb.RequestUnion) error {
// Determine whether we can use the optimized strategy before making any
// allocations.
h.foundLocalKey = false
for i := range requests {
header := requests[i].GetInner().Header()
if keys.IsLocal(header.Key) {
ret.foundLocalKey = true
return ret, nil
h.requests = requests
h.foundLocalKey = true
return nil
}
}
// We can use the optimized strategy, so set up all of the internal state.
if !canReorderRequestsSlice {
h.startIdx = 0
if h.canReorderRequestsSlice {
h.requests = requests
} else {
// If we can't reorder the original requests slice, we must make a copy.
ret.requests = make([]roachpb.RequestUnion, len(requests))
copy(ret.requests, requests)
if cap(h.requests) < len(requests) {
h.requests = make([]roachpb.RequestUnion, len(requests))
h.ownRequestsSlice = true
} else {
if len(requests) < len(h.requests) {
// Ensure that we lose references to the old requests that will
// not be overwritten by copy.
//
// Note that we only need to go up to the number of old requests
// and not the capacity of the slice since we assume that
// everything past the length is already nil-ed out.
oldRequests := h.requests[len(requests):len(h.requests)]
for i := range oldRequests {
oldRequests[i] = roachpb.RequestUnion{}
}
}
h.requests = h.requests[:len(requests)]
}
copy(h.requests, requests)
}
if cap(h.headers) < len(requests) {
h.headers = make([]roachpb.RequestHeader, len(requests))
} else {
if len(requests) < len(h.headers) {
// Ensure that we lose references to the old header that will
// not be overwritten in the loop below.
//
// Note that we only need to go up to the number of old headers and
// not the capacity of the slice since we assume that everything
// past the length is already nil-ed out.
oldHeaders := h.headers[len(requests):len(h.headers)]
for i := range oldHeaders {
oldHeaders[i] = roachpb.RequestHeader{}
}
}
h.headers = h.headers[:len(requests)]
}
if cap(h.positions) < len(requests) {
h.positions = make([]int, len(requests))
} else {
h.positions = h.positions[:len(requests)]
}
if cap(h.isRange) < len(requests) {
h.isRange = make([]bool, len(requests))
} else {
h.isRange = h.isRange[:len(requests)]
}
ret.headers = make([]roachpb.RequestHeader, len(requests))
ret.positions = make([]int, len(requests))
ret.isRange = make([]bool, len(requests))
// Populate the internal state as well as perform some sanity checks on the
// requests.
for i := range requests {
req := requests[i].GetInner()
ret.headers[i] = req.Header()
ret.positions[i] = i
ret.isRange[i] = roachpb.IsRange(req)
if ret.isRange[i] {
h.headers[i] = req.Header()
h.positions[i] = i
h.isRange[i] = roachpb.IsRange(req)
if h.isRange[i] {
// We're dealing with a range-spanning request.
if l, r := keys.IsLocal(ret.headers[i].Key), keys.IsLocal(ret.headers[i].EndKey); (l && !r) || (!l && r) {
return BatchTruncationHelper{}, errors.AssertionFailedf("local key mixed with global key in range")
if l, r := keys.IsLocal(h.headers[i].Key), keys.IsLocal(h.headers[i].EndKey); (l && !r) || (!l && r) {
return errors.AssertionFailedf("local key mixed with global key in range")
}
} else if len(ret.headers[i].EndKey) > 0 {
return BatchTruncationHelper{}, errors.AssertionFailedf("%T is not a range command, but EndKey is set", req)
} else if len(h.headers[i].EndKey) > 0 {
return errors.AssertionFailedf("%T is not a range command, but EndKey is set", req)
}
}
if scanDir == Ascending {
sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: &ret})
if h.scanDir == Ascending {
sort.Sort(ascBatchTruncationHelper{BatchTruncationHelper: h})
} else {
// With the Descending scan direction, we have to convert all point
// requests into range-spanning requests that include only a single
// point.
for i := range ret.headers {
if len(ret.headers[i].EndKey) == 0 {
ret.headers[i].EndKey = ret.headers[i].Key.Next()
for i := range h.headers {
if len(h.headers[i].EndKey) == 0 {
h.headers[i].EndKey = h.headers[i].Key.Next()
}
}
sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: &ret})
sort.Sort(descBatchTruncationHelper{BatchTruncationHelper: h})
}
if ret.mustPreserveOrder {
ret.helper.init(len(requests))
if h.mustPreserveOrder {
h.helper.init(len(requests))
}
return ret, nil
return nil
}

const (
requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))
requestHeaderOverhead = int64(unsafe.Sizeof(roachpb.RequestHeader{}))
intOverhead = int64(unsafe.Sizeof(int(0)))
boolOverhead = int64(unsafe.Sizeof(false))
)

// MemUsage returns the memory usage of the internal state of the helper.
func (h *BatchTruncationHelper) MemUsage() int64 {
var memUsage int64
if h.ownRequestsSlice {
// Only account for the requests slice if we own it.
memUsage += int64(cap(h.requests)) * requestUnionOverhead
}
memUsage += int64(cap(h.headers)) * requestHeaderOverhead
memUsage += int64(cap(h.positions)) * intOverhead
memUsage += int64(cap(h.isRange)) * boolOverhead
memUsage += h.helper.memUsage()
return memUsage
}

// Truncate restricts all requests to the given key range and returns new,
Expand Down Expand Up @@ -906,12 +994,20 @@ type orderRestorationHelper struct {
}

func (h *orderRestorationHelper) init(numOriginalRequests int) {
h.found = make([]int, numOriginalRequests)
if cap(h.found) < numOriginalRequests {
h.found = make([]int, numOriginalRequests)
} else {
h.found = h.found[:numOriginalRequests]
}
for i := range h.found {
h.found[i] = -1
}
}

func (h *orderRestorationHelper) memUsage() int64 {
return int64(cap(h.scratch))*requestUnionOverhead + int64(cap(h.found))*intOverhead
}

// restoreOrder reorders truncReqs in the ascending order of the corresponding
// positions values.
//
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvcoord/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ func TestBatchPrevNext(t *testing.T) {
}
const mustPreserveOrder = false
const canReorderRequestsSlice = false
ascHelper, err := MakeBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice)
ascHelper, err := NewBatchTruncationHelper(Ascending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice)
require.NoError(t, err)
descHelper, err := MakeBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice)
descHelper, err := NewBatchTruncationHelper(Descending, ba.Requests, mustPreserveOrder, canReorderRequestsSlice)
require.NoError(t, err)
if _, _, next, err := ascHelper.Truncate(
roachpb.RSpan{
Expand Down Expand Up @@ -396,10 +396,10 @@ func TestTruncate(t *testing.T) {
original.Requests[i].MustSetInner(request.GetInner().ShallowCopy())
}

var truncationHelper BatchTruncationHelper
var truncationHelper *BatchTruncationHelper
if !isLegacy {
var err error
truncationHelper, err = MakeBatchTruncationHelper(
truncationHelper, err = NewBatchTruncationHelper(
Ascending, original.Requests, mustPreserveOrder, canReorderRequestsSlice,
)
if err != nil {
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestTruncateLoop(t *testing.T) {
for _, mustPreserveOrder := range []bool{false, true} {
t.Run(fmt.Sprintf("run=%d/%s/order=%t", numRuns, scanDir, mustPreserveOrder), func(t *testing.T) {
const canReorderRequestsSlice = false
helper, err := MakeBatchTruncationHelper(
helper, err := NewBatchTruncationHelper(
scanDir, requests, mustPreserveOrder, canReorderRequestsSlice,
)
require.NoError(t, err)
Expand Down Expand Up @@ -698,7 +698,7 @@ func BenchmarkTruncateLoop(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
const canReorderRequestsSlice = false
h, err := MakeBatchTruncationHelper(
h, err := NewBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
)
require.NoError(b, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// TODO(yuzefovich): refactor the DistSender so that the truncation helper
// could reorder requests as it pleases.
const canReorderRequestsSlice = false
truncationHelper, err := MakeBatchTruncationHelper(
truncationHelper, err := NewBatchTruncationHelper(
scanDir, ba.Requests, mustPreserveOrder, canReorderRequestsSlice,
)
if err != nil {
Expand Down
37 changes: 24 additions & 13 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ type Streamer struct {

waitGroup sync.WaitGroup

truncationHelper *kvcoord.BatchTruncationHelper
// truncationHelperAccountedFor tracks how much space has been consumed from
// the budget in order to account for the memory usage of the truncation
// helper.
truncationHelperAccountedFor int64

// requestsToServe contains all single-range sub-requests that have yet
// to be served.
requestsToServe requestsProvider
Expand Down Expand Up @@ -504,21 +510,21 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}
}()
allRequestsAreWithinSingleRange := !ri.NeedAnother(rs)
var truncationHelper kvcoord.BatchTruncationHelper
if !allRequestsAreWithinSingleRange {
// We only need the truncation helper if the requests span multiple
// ranges.
//
// The streamer can process the responses in an arbitrary order, so we
// don't require the helper to preserve the order of requests and allow
// it to reorder the reqs slice too.
const mustPreserveOrder = false
const canReorderRequestsSlice = true
// TODO(yuzefovich): reuse truncation helpers between different
// Enqueue() calls.
truncationHelper, err = kvcoord.MakeBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
)
if s.truncationHelper == nil {
// The streamer can process the responses in an arbitrary order, so
// we don't require the helper to preserve the order of requests and
// allow it to reorder the reqs slice too.
const mustPreserveOrder = false
const canReorderRequestsSlice = true
s.truncationHelper, err = kvcoord.NewBatchTruncationHelper(
scanDir, reqs, mustPreserveOrder, canReorderRequestsSlice,
)
} else {
err = s.truncationHelper.Init(reqs)
}
if err != nil {
return err
}
Expand All @@ -544,7 +550,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
if err != nil {
return err
}
singleRangeReqs, positions, seekKey, err = truncationHelper.Truncate(singleRangeSpan)
singleRangeReqs, positions, seekKey, err = s.truncationHelper.Truncate(singleRangeSpan)
if err != nil {
return err
}
Expand Down Expand Up @@ -634,6 +640,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}

toConsume := totalReqsMemUsage
if !allRequestsAreWithinSingleRange {
accountedFor := s.truncationHelperAccountedFor
s.truncationHelperAccountedFor = s.truncationHelper.MemUsage()
toConsume += s.truncationHelperAccountedFor - accountedFor
}
if newNumRangesPerScanRequestMemoryUsage != 0 && newNumRangesPerScanRequestMemoryUsage != s.numRangesPerScanRequestAccountedFor {
toConsume += newNumRangesPerScanRequestMemoryUsage - s.numRangesPerScanRequestAccountedFor
s.numRangesPerScanRequestAccountedFor = newNumRangesPerScanRequestMemoryUsage
Expand Down

0 comments on commit 75a18a8

Please sign in to comment.