diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index 577d52ec06fc..2b720b688c05 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -82,9 +82,14 @@ func (p *pebbleResults) clear() { // The repr that MVCCScan / MVCCGet expects to provide as output goes: // // This function adds to repr in that format. +// - maxNewSize, if non-zero, indicates the maximum capacity for a new repr that +// can be allocated. +// +// It returns a boolean indicating whether the key was added into the repr (in +// the non-error case, this can be false only if maxNewSize is non-zero). func (p *pebbleResults) put( - ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount, -) error { + ctx context.Context, key []byte, value []byte, memAccount *mon.BoundAccount, maxNewSize int, +) (bool, error) { const minSize = 16 const maxSize = 128 << 20 // 128 MB @@ -97,17 +102,30 @@ func (p *pebbleResults) put( lenValue := len(value) lenToAdd := p.sizeOf(lenKey, lenValue) if len(p.repr)+lenToAdd > cap(p.repr) { + if maxNewSize > 0 && lenToAdd > maxNewSize { + // This key exceeds the given capacity, so we don't add it. + return false, nil + } + // If we don't have a limit or the given capacity is too much, clamp it + // at maxSize. + if maxNewSize <= 0 || maxNewSize > maxSize { + maxNewSize = maxSize + } newSize := 2 * cap(p.repr) if newSize == 0 || newSize > maxSize { // If the previous buffer exceeded maxSize, we don't double its capacity // for next allocation, and instead reset the exponential increase, in // case we had a stray huge key-value. newSize = minSize + } else if newSize > maxNewSize { + // If the exponentially-increasing size exceeds the given capacity, + // clamp the new size at the limit. + newSize = maxNewSize } - if lenToAdd >= maxSize { + if lenToAdd >= maxNewSize { newSize = lenToAdd } else { - for newSize < lenToAdd && newSize < maxSize { + for newSize < lenToAdd && newSize < maxNewSize { newSize *= 2 } } @@ -115,7 +133,7 @@ func (p *pebbleResults) put( p.bufs = append(p.bufs, p.repr) } if err := memAccount.Grow(ctx, int64(newSize)); err != nil { - return err + return false, err } p.repr = nonZeroingMakeByteSlice(newSize)[:0] } @@ -140,7 +158,7 @@ func (p *pebbleResults) put( } } - return nil + return true, nil } func (p *pebbleResults) sizeOf(lenKey, lenValue int) int { @@ -974,6 +992,22 @@ func (p *pebbleMVCCScanner) addAndAdvance( p.resumeReason = roachpb.RESUME_KEY_LIMIT } + keyNotAdded := func() error { + p.resumeKey = key + // If requested, remove any partial SQL rows from the end of the result. + if p.wholeRows { + trimmedKey, err := p.results.maybeTrimPartialLastRow(key) + if err != nil { + return err + } + if trimmedKey != nil { + p.resumeKey = trimmedKey + } + } + return nil + } + + var mustPutKey bool if p.resumeReason != 0 { // If we exceeded a limit, but we're not allowed to return an empty result, // then make sure we include the first key in the result. If wholeRows is @@ -982,27 +1016,23 @@ func (p *pebbleMVCCScanner) addAndAdvance( (p.results.count == 0 || (p.wholeRows && p.results.continuesFirstRow(key))) { p.resumeReason = 0 p.resumeNextBytes = 0 + mustPutKey = true } else { - p.resumeKey = key - - // If requested, remove any partial SQL rows from the end of the result. - if p.wholeRows { - trimmedKey, err := p.results.maybeTrimPartialLastRow(key) - if err != nil { - p.err = err - return false - } - if trimmedKey != nil { - p.resumeKey = trimmedKey - } - } + p.err = keyNotAdded() return false } } - if err := p.results.put(ctx, rawKey, rawValue, p.memAccount); err != nil { + var maxNewSize int + if p.targetBytes > 0 && p.targetBytes > p.results.bytes && !mustPutKey { + maxNewSize = int(p.targetBytes - p.results.bytes) + } + if added, err := p.results.put(ctx, rawKey, rawValue, p.memAccount, maxNewSize); err != nil { p.err = errors.Wrapf(err, "scan with start key %s", p.start) return false + } else if !added { + p.err = keyNotAdded() + return false } // Check if we hit the key limit just now to avoid scanning further before