From 150b228decd80daf8d992021ef75238d5a790621 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 12 Jul 2022 15:51:41 -0700 Subject: [PATCH] storage: be tighter with allocations when TargetBytes is present Previously, while `put`ting the key into the `repr`, we could make an allocation that was too large given the remaining `TargetBytes` budget. This is the case since we're exponentially increasing the capacities of the buffers until 128MiB and because we're only accounting for the length of the slice even though the whole capacity would have a memory footprint. For example, with 10MiB of `TargetBytes` (which is used by SQL in many cases) and a ScanResponse that exceeds that limit, we'd allocate capacities that are powers of two, starting at, say, 256B, and would go all the way up to 8MiB; however, given that 10MiB limit, we'd only use 2MiB of that last 8MiB `repr` when we encounter the target bytes limit and stop. Such behavior is kinda ok if the response is marshalled by the gRPC to be sent to the other node, but it is quite unfortunate in the local fast-path cases (meaning the SQL and the KV are part of the same physical machine). In the latter scenario SQL would only account for the lengths of slices while keeping the whole slices alive for a while, leading to significant unaccounted for memory. In the example above, on the order of 6MiB would be unaccounted for - multiply that by some concurrency, and we have unaccounted memory on the order of hundreds of MiBs. The current behavior seems especially bad for the streamer use case where we issue many requests with the `TargetBytes` set and use `ScanResponse.NumBytes` field (which tracks the lengths of the slices) for the memory accounting purposes. In order to improve here, this commit teaches `put` method about the maximum capacity it can use. The key now might be no longer added if the key and the value exceed the remaining capacity. In the example above, the last slice would be on the order of 2MiB making everything happy: we stay very close to TargetBytes limit and don't have any wasted space. Release note: None --- pkg/storage/pebble_mvcc_scanner.go | 96 +++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index feee2f6a45dc..733438d7fda7 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -82,9 +82,14 @@ func (p *pebbleResults) clear() { // The repr that MVCCScan / MVCCGet expects to provide as output goes: // // This function adds to repr in that format. +// - maxNewSize, if non-zero, indicates the maximum capacity for a new repr that +// can be allocated. +// +// It returns a boolean indicating whether the key was added into the repr (in +// the non-error case, this can be false only if maxNewSize is non-zero). func (p *pebbleResults) put( - ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount, -) error { + ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount, maxNewSize int, +) (bool, error) { const minSize = 16 const maxSize = 128 << 20 // 128 MB @@ -97,17 +102,30 @@ func (p *pebbleResults) put( lenValue := len(value) lenToAdd := p.sizeOf(lenKey, lenValue) if len(p.repr)+lenToAdd > cap(p.repr) { + if maxNewSize > 0 && lenToAdd > maxNewSize { + // This key exceeds the given capacity, so we don't add it. + return false, nil + } + // If we don't have a limit or the given capacity is too much, clamp it + // at maxSize. + if maxNewSize <= 0 || maxNewSize > maxSize { + maxNewSize = maxSize + } newSize := 2 * cap(p.repr) if newSize == 0 || newSize > maxSize { // If the previous buffer exceeded maxSize, we don't double its capacity // for next allocation, and instead reset the exponential increase, in // case we had a stray huge key-value. newSize = minSize + } else if newSize > maxNewSize { + // If the exponentially-increasing size exceeds the given capacity, + // clamp the new size at the limit. + newSize = maxNewSize } - if lenToAdd >= maxSize { + if lenToAdd >= maxNewSize { newSize = lenToAdd } else { - for newSize < lenToAdd && newSize < maxSize { + for newSize < lenToAdd && newSize < maxNewSize { newSize *= 2 } } @@ -115,7 +133,7 @@ func (p *pebbleResults) put( p.bufs = append(p.bufs, p.repr) } if err := memAccount.Grow(ctx, int64(newSize)); err != nil { - return err + return false, err } p.repr = nonZeroingMakeByteSlice(newSize)[:0] } @@ -140,7 +158,7 @@ func (p *pebbleResults) put( } } - return nil + return true, nil } func (p *pebbleResults) sizeOf(lenKey, lenValue int) int { @@ -981,35 +999,67 @@ func (p *pebbleMVCCScanner) addAndAdvance( p.resumeReason = roachpb.RESUME_KEY_LIMIT } + maybeTrimResult := func() (resumeKey roachpb.Key, _ error) { + resumeKey = key + // If requested, remove any partial SQL rows from the end of the result. + if p.wholeRows { + trimmedKey, err := p.results.maybeTrimPartialLastRow(key) + if err != nil { + return nil, err + } + if trimmedKey != nil { + resumeKey = trimmedKey + } + } + return resumeKey, nil + } + + // We need to decide how we need to handle the current key in relation to + // limits and what new capacity we allow it to use in put(). We have several + // cases: + // + // 1. the limits were exceeded (p.resumeReason != 0): + // a) we're not allowed to return an empty result and we currently have an + // empty result - we must include the key into the result. We do so by + // unsetting the relevant resume* fields and marking mustPutKey as true + // b) we're not allowed to return an empty result, wholeRows is true, and + // the current key continues the first row - same action as above - we + // must include the key into the result. + // c) if neither a) nor b) conditions are true, we don't include the + // current key and exit. + // + // 2. the limits were not exceeded. Here we attempt to include the current + // key into the result, subject to the remaining maxNewSize capacity (if + // targetBytes is non-zero). + // + // NB: if we're not allowed to return an empty result (or must continue + // the first row if wholeRows is true), put() below will never run into + // the capacity limit. This is the case because we have checked above + // that including the current key into the result doesn't exceed the + // targetBytes limit. + var mustPutKey bool if p.resumeReason != 0 { - // If we exceeded a limit, but we're not allowed to return an empty result, - // then make sure we include the first key in the result. If wholeRows is - // enabled, then also make sure we complete the first SQL row. if !p.allowEmpty && (p.results.count == 0 || (p.wholeRows && p.results.continuesFirstRow(key))) { p.resumeReason = 0 p.resumeNextBytes = 0 + mustPutKey = true } else { - p.resumeKey = key - - // If requested, remove any partial SQL rows from the end of the result. - if p.wholeRows { - trimmedKey, err := p.results.maybeTrimPartialLastRow(key) - if err != nil { - p.err = err - return false - } - if trimmedKey != nil { - p.resumeKey = trimmedKey - } - } + p.resumeKey, p.err = maybeTrimResult() return false } } - if err := p.results.put(ctx, rawKey, rawValue, p.memAccount); err != nil { + var maxNewSize int + if p.targetBytes > 0 && p.targetBytes > p.results.bytes && !mustPutKey { + maxNewSize = int(p.targetBytes - p.results.bytes) + } + if added, err := p.results.put(ctx, rawKey, rawValue, p.memAccount, maxNewSize); err != nil { p.err = errors.Wrapf(err, "scan with start key %s", p.start) return false + } else if !added { + p.resumeKey, p.err = maybeTrimResult() + return false } // Check if we hit the key limit just now to avoid scanning further before