Skip to content

Commit

Permalink
storage: be tighter with allocations when TargetBytes is present
Browse files Browse the repository at this point in the history
Previously, while `put`ting the key into the `repr`, we could make an
allocation that was too large given the remaining `TargetBytes` budget.
This is the case since we're exponentially increasing the capacities of
the buffers until 128MiB and because we're only accounting for the
length of the slice even though the whole capacity would have a memory
footprint.

For example, with 10MiB of `TargetBytes` (which is used by SQL in many
cases) and a ScanResponse that exceeds that limit, we'd allocate
capacities that are powers of two, starting at, say, 256B, and would go
all the way up to 8MiB; however, given that 10MiB limit, we'd only use
2MiB of that last 8MiB `repr` when we encounter the target bytes limit
and stop. Such behavior is kinda ok if the response is marshalled by the
gRPC to be sent to the other node, but it is quite unfortunate in the
local fast-path cases (meaning the SQL and the KV are part of the same
physical machine). In the latter scenario SQL would only account for the
lengths of slices while keeping the whole slices alive for a while,
leading to significant unaccounted for memory. In the example above, on
the order of 6MiB would be unaccounted for - multiply that by some
concurrency, and we have unaccounted memory on the order of hundreds of
MiBs.

The current behavior seems especially bad for the streamer use case
where we issue many requests with the `TargetBytes` set and use
`ScanResponse.NumBytes` field (which tracks the lengths of the slices)
for the memory accounting purposes.

In order to improve here, this commit teaches `put` method about the
maximum capacity it can use. The key now might be no longer added if the
key and the value exceed the remaining capacity. In the example above,
the last slice would be on the order of 2MiB making everything happy: we
stay very close to TargetBytes limit and don't have any wasted space.

Release note: None
  • Loading branch information
yuzefovich committed Jul 18, 2022
1 parent a8568d2 commit 150b228
Showing 1 changed file with 73 additions and 23 deletions.
96 changes: 73 additions & 23 deletions pkg/storage/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,14 @@ func (p *pebbleResults) clear() {
// The repr that MVCCScan / MVCCGet expects to provide as output goes:
// <valueLen:Uint32><keyLen:Uint32><Key><Value>
// This function adds to repr in that format.
// - maxNewSize, if non-zero, indicates the maximum capacity for a new repr that
// can be allocated.
//
// It returns a boolean indicating whether the key was added into the repr (in
// the non-error case, this can be false only if maxNewSize is non-zero).
func (p *pebbleResults) put(
ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount,
) error {
ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount, maxNewSize int,
) (bool, error) {
const minSize = 16
const maxSize = 128 << 20 // 128 MB

Expand All @@ -97,25 +102,38 @@ func (p *pebbleResults) put(
lenValue := len(value)
lenToAdd := p.sizeOf(lenKey, lenValue)
if len(p.repr)+lenToAdd > cap(p.repr) {
if maxNewSize > 0 && lenToAdd > maxNewSize {
// This key exceeds the given capacity, so we don't add it.
return false, nil
}
// If we don't have a limit or the given capacity is too much, clamp it
// at maxSize.
if maxNewSize <= 0 || maxNewSize > maxSize {
maxNewSize = maxSize
}
newSize := 2 * cap(p.repr)
if newSize == 0 || newSize > maxSize {
// If the previous buffer exceeded maxSize, we don't double its capacity
// for next allocation, and instead reset the exponential increase, in
// case we had a stray huge key-value.
newSize = minSize
} else if newSize > maxNewSize {
// If the exponentially-increasing size exceeds the given capacity,
// clamp the new size at the limit.
newSize = maxNewSize
}
if lenToAdd >= maxSize {
if lenToAdd >= maxNewSize {
newSize = lenToAdd
} else {
for newSize < lenToAdd && newSize < maxSize {
for newSize < lenToAdd && newSize < maxNewSize {
newSize *= 2
}
}
if len(p.repr) > 0 {
p.bufs = append(p.bufs, p.repr)
}
if err := memAccount.Grow(ctx, int64(newSize)); err != nil {
return err
return false, err
}
p.repr = nonZeroingMakeByteSlice(newSize)[:0]
}
Expand All @@ -140,7 +158,7 @@ func (p *pebbleResults) put(
}
}

return nil
return true, nil
}

func (p *pebbleResults) sizeOf(lenKey, lenValue int) int {
Expand Down Expand Up @@ -981,35 +999,67 @@ func (p *pebbleMVCCScanner) addAndAdvance(
p.resumeReason = roachpb.RESUME_KEY_LIMIT
}

maybeTrimResult := func() (resumeKey roachpb.Key, _ error) {
resumeKey = key
// If requested, remove any partial SQL rows from the end of the result.
if p.wholeRows {
trimmedKey, err := p.results.maybeTrimPartialLastRow(key)
if err != nil {
return nil, err
}
if trimmedKey != nil {
resumeKey = trimmedKey
}
}
return resumeKey, nil
}

// We need to decide how we need to handle the current key in relation to
// limits and what new capacity we allow it to use in put(). We have several
// cases:
//
// 1. the limits were exceeded (p.resumeReason != 0):
// a) we're not allowed to return an empty result and we currently have an
// empty result - we must include the key into the result. We do so by
// unsetting the relevant resume* fields and marking mustPutKey as true
// b) we're not allowed to return an empty result, wholeRows is true, and
// the current key continues the first row - same action as above - we
// must include the key into the result.
// c) if neither a) nor b) conditions are true, we don't include the
// current key and exit.
//
// 2. the limits were not exceeded. Here we attempt to include the current
// key into the result, subject to the remaining maxNewSize capacity (if
// targetBytes is non-zero).
//
// NB: if we're not allowed to return an empty result (or must continue
// the first row if wholeRows is true), put() below will never run into
// the capacity limit. This is the case because we have checked above
// that including the current key into the result doesn't exceed the
// targetBytes limit.
var mustPutKey bool
if p.resumeReason != 0 {
// If we exceeded a limit, but we're not allowed to return an empty result,
// then make sure we include the first key in the result. If wholeRows is
// enabled, then also make sure we complete the first SQL row.
if !p.allowEmpty &&
(p.results.count == 0 || (p.wholeRows && p.results.continuesFirstRow(key))) {
p.resumeReason = 0
p.resumeNextBytes = 0
mustPutKey = true
} else {
p.resumeKey = key

// If requested, remove any partial SQL rows from the end of the result.
if p.wholeRows {
trimmedKey, err := p.results.maybeTrimPartialLastRow(key)
if err != nil {
p.err = err
return false
}
if trimmedKey != nil {
p.resumeKey = trimmedKey
}
}
p.resumeKey, p.err = maybeTrimResult()
return false
}
}

if err := p.results.put(ctx, rawKey, rawValue, p.memAccount); err != nil {
var maxNewSize int
if p.targetBytes > 0 && p.targetBytes > p.results.bytes && !mustPutKey {
maxNewSize = int(p.targetBytes - p.results.bytes)
}
if added, err := p.results.put(ctx, rawKey, rawValue, p.memAccount, maxNewSize); err != nil {
p.err = errors.Wrapf(err, "scan with start key %s", p.start)
return false
} else if !added {
p.resumeKey, p.err = maybeTrimResult()
return false
}

// Check if we hit the key limit just now to avoid scanning further before
Expand Down

0 comments on commit 150b228

Please sign in to comment.