Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvstreamer: perform more memory accounting #83683

Merged
merged 2 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"budget.go",
"requests_provider.go",
"results_buffer.go",
"size.go",
"streamer.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer",
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
// singleRangeBatch contains parts of the originally enqueued requests that have
// been truncated to be within a single range. All requests within the
// singleRangeBatch will be issued as a single BatchRequest.
// TODO(yuzefovich): perform memory accounting for slices other than reqs in
// singleRangeBatch.
type singleRangeBatch struct {
reqs []roachpb.RequestUnion
// reqsKeys stores the start key of the corresponding request in reqs. It is
Expand Down Expand Up @@ -81,9 +79,10 @@ type singleRangeBatch struct {
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects).
// Since we reuse the same reqs slice for resume requests, this can be
// released only when the BatchResponse doesn't have any resume spans.
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects) as
// well as the positions and the subRequestIdx slices. Since we reuse these
// slices for the resume requests, this can be released only when the
// BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
Expand Down Expand Up @@ -212,6 +211,9 @@ type requestsProviderBase struct {
hasWork *sync.Cond
// requests contains all single-range sub-requests that have yet to be
// served.
// TODO(yuzefovich): this memory is not accounted for. However, the number
// of singleRangeBatch objects in flight is limited by the number of ranges
// of a single table, so it doesn't seem urgent to fix the accounting here.
requests []singleRangeBatch
// done is set to true once the Streamer is Close()'d.
done bool
Expand Down
45 changes: 39 additions & 6 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvstreamer
import (
"context"
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
Expand Down Expand Up @@ -86,7 +87,9 @@ type resultsBuffer interface {
// added all Results it could, and the resultsBuffer checks whether any
// Results are available to be returned to the client. If there is a
// goroutine blocked in wait(), the goroutine is woken up.
doneAddingLocked()
//
// It is assumed that the budget's mutex is already being held.
doneAddingLocked(context.Context)

///////////////////////////////////////////////////////////////////////////
// //
Expand Down Expand Up @@ -149,7 +152,11 @@ type resultsBufferBase struct {
// hasResults is used in wait() to block until there are some results to be
// picked up.
hasResults chan struct{}
err error
// overheadAccountedFor tracks how much overhead space for the Results in
// this results buffer has been consumed from the budget. Note that this
// does not include the footprint of Get and Scan responses.
overheadAccountedFor int64
err error
}

func newResultsBufferBase(budget *budget) *resultsBufferBase {
Expand Down Expand Up @@ -185,6 +192,23 @@ func (b *resultsBufferBase) checkIfCompleteLocked(r Result) {
}
}

func (b *resultsBufferBase) accountForOverheadLocked(ctx context.Context, overheadMemUsage int64) {
b.budget.mu.AssertHeld()
b.Mutex.AssertHeld()
if overheadMemUsage > b.overheadAccountedFor {
// We're allowing the budget to go into debt here since the results
// buffer doesn't have a way to push back on the Results. It would also
// be unfortunate to discard these Results - instead, we rely on the
// worker coordinator to make sure the budget gets out of debt.
if err := b.budget.consumeLocked(ctx, overheadMemUsage-b.overheadAccountedFor, true /* allowDebt */); err != nil {
b.setErrorLocked(err)
}
} else {
b.budget.releaseLocked(ctx, b.overheadAccountedFor-overheadMemUsage)
}
b.overheadAccountedFor = overheadMemUsage
}

// signal non-blockingly sends on hasResults channel.
func (b *resultsBufferBase) signal() {
select {
Expand Down Expand Up @@ -273,15 +297,20 @@ func (b *outOfOrderResultsBuffer) addLocked(r Result) {
b.numUnreleasedResults++
}

func (b *outOfOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
const resultSize = int64(unsafe.Sizeof(Result{}))

func (b *outOfOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
b.accountForOverheadLocked(ctx, int64(cap(b.results))*resultSize)
b.signal()
}

func (b *outOfOrderResultsBuffer) get(context.Context) ([]Result, bool, error) {
b.Lock()
defer b.Unlock()
results := b.results
// Note that although we're losing the reference to the Results slice, we
// still keep the overhead of the slice accounted for with the budget. This
// is done as a way of "amortizing" the reservation.
b.results = nil
allComplete := b.numCompleteResponses == b.numExpectedResponses
return results, allComplete, b.err
Expand Down Expand Up @@ -483,8 +512,12 @@ func (b *inOrderResultsBuffer) addLocked(r Result) {
b.addCounter++
}

func (b *inOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
const inOrderBufferedResultSize = int64(unsafe.Sizeof(inOrderBufferedResult{}))

func (b *inOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
overhead := int64(cap(b.buffered))*inOrderBufferedResultSize + // b.buffered
int64(cap(b.resultScratch))*resultSize // b.resultsScratch
b.accountForOverheadLocked(ctx, overhead)
if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx {
if debug {
fmt.Println("found head-of-the-line")
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ func TestInOrderResultsBuffer(t *testing.T) {
break
}

budget.mu.Lock()
b.Lock()
numToAdd := rng.Intn(len(addOrder)) + 1
for i := 0; i < numToAdd; i++ {
b.addLocked(results[addOrder[0]])
addOrder = addOrder[1:]
}
b.doneAddingLocked()
b.doneAddingLocked(ctx)
b.Unlock()
budget.mu.Unlock()

// With 50% probability, try spilling some of the buffered results
// to disk.
Expand Down
76 changes: 76 additions & 0 deletions pkg/kv/kvclient/kvstreamer/size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstreamer

import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

const (
intSliceOverhead = int64(unsafe.Sizeof([]int{}))
intSize = int64(unsafe.Sizeof(int(0)))
int32SliceOverhead = int64(unsafe.Sizeof([]int32{}))
int32Size = int64(unsafe.Sizeof(int32(0)))
requestUnionSliceOverhead = int64(unsafe.Sizeof([]roachpb.RequestUnion{}))
requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))
requestOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion_Get{}) +
unsafe.Sizeof(roachpb.GetRequest{}))
)

var zeroInt32Slice []int32

func init() {
scanRequestOverhead := int64(unsafe.Sizeof(roachpb.RequestUnion_Scan{}) +
unsafe.Sizeof(roachpb.ScanRequest{}))
if requestOverhead != scanRequestOverhead {
panic("GetRequest and ScanRequest have different overheads")
}
zeroInt32Slice = make([]int32, 1<<10)
}

// Note that we cannot use Size() methods that are automatically generated by
// the protobuf library because
// - they calculate the size of the serialized message whereas we're interested
// in the deserialized in-memory footprint.
// - they account for things differently from how the memory usage is accounted
// for by the KV layer for the purposes of tracking TargetBytes limit.

// requestSize calculates the footprint of a request including the overhead. key
// and endKey are the keys from the span of the request header (we choose to
// avoid taking in a roachpb.Span in order to reduce allocations).
func requestSize(key, endKey roachpb.Key) int64 {
return requestOverhead + int64(cap(key)) + int64(cap(endKey))
}

func requestsMemUsage(reqs []roachpb.RequestUnion) (memUsage int64) {
for _, r := range reqs {
h := r.GetInner().Header()
memUsage += requestSize(h.Key, h.EndKey)
}
return memUsage
}

// getResponseSize calculates the size of the GetResponse similar to how it is
// accounted for TargetBytes parameter by the KV layer.
func getResponseSize(get *roachpb.GetResponse) int64 {
if get.Value == nil {
return 0
}
return int64(len(get.Value.RawBytes))
}

// scanResponseSize calculates the size of the ScanResponse similar to how it is
// accounted for TargetBytes parameter by the KV layer.
func scanResponseSize(scan *roachpb.ScanResponse) int64 {
return scan.NumBytes
}
Loading