Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: rowexec: improve join reader memory usage when ordering is maintained #82957

Merged
merged 1 commit into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,15 @@ func (jr *joinReader) initJoinReaderStrategy(
}
}

defer func() {
generator.setResizeMemoryAccountFunc(jr.strategy.resizeMemoryAccount)
}()

if readerType == indexJoinReaderType {
jr.strategy = &joinReaderIndexJoinStrategy{
joinerBase: &jr.joinerBase,
joinReaderSpanGenerator: generator,
memAcc: &strategyMemAcc,
strategyMemAcc: &strategyMemAcc,
}
return nil
}
Expand All @@ -593,7 +597,7 @@ func (jr *joinReader) initJoinReaderStrategy(
joinReaderSpanGenerator: generator,
isPartialJoin: jr.joinType == descpb.LeftSemiJoin || jr.joinType == descpb.LeftAntiJoin,
groupingState: jr.groupingState,
memAcc: &strategyMemAcc,
strategyMemAcc: &strategyMemAcc,
}
return nil
}
Expand All @@ -604,6 +608,10 @@ func (jr *joinReader) initJoinReaderStrategy(
limit := execinfra.GetWorkMemLimit(flowCtx)
// Initialize memory monitors and row container for looked up rows.
jr.limitedMemMonitor = execinfra.NewLimitedMonitor(ctx, jr.MemMonitor, flowCtx, "joinreader-limited")
// We want to make sure that if the disk-backed container is spilled to
// disk, it releases all of the memory reservations, so we make the
// corresponding memory monitor not hold on to any bytes.
jr.limitedMemMonitor.RelinquishAllOnReleaseBytes()
jr.diskMonitor = execinfra.NewMonitor(ctx, flowCtx.DiskMonitor, "joinreader-disk")
drc := rowcontainer.NewDiskBackedNumberedRowContainer(
false, /* deDup */
Expand All @@ -625,7 +633,7 @@ func (jr *joinReader) initJoinReaderStrategy(
lookedUpRows: drc,
groupingState: jr.groupingState,
outputGroupContinuationForLeftRow: jr.outputGroupContinuationForLeftRow,
memAcc: &strategyMemAcc,
strategyMemAcc: &strategyMemAcc,
}
return nil
}
Expand Down Expand Up @@ -720,8 +728,8 @@ func (jr *joinReader) readInput() (
// We've just discarded the old rows, so we have to update the memory
// accounting accordingly.
newSz := jr.accountedFor.scratchInputRows + jr.accountedFor.groupingState
if err := jr.memAcc.ResizeTo(jr.Ctx, newSz); err != nil {
jr.MoveToDraining(addWorkmemHint(err))
if err := jr.strategy.resizeMemoryAccount(&jr.memAcc, jr.memAcc.Used(), newSz); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
}
jr.scratchInputRows = jr.scratchInputRows[:0]
Expand Down Expand Up @@ -784,8 +792,8 @@ func (jr *joinReader) readInput() (
//
// We need to subtract the EncDatumRowOverhead because that is already
// tracked in jr.accountedFor.scratchInputRows.
if err := jr.memAcc.Grow(jr.Ctx, rowSize-int64(rowenc.EncDatumRowOverhead)); err != nil {
jr.MoveToDraining(addWorkmemHint(err))
if err := jr.strategy.growMemoryAccount(&jr.memAcc, rowSize-int64(rowenc.EncDatumRowOverhead)); err != nil {
jr.MoveToDraining(err)
return jrStateUnknown, nil, jr.DrainHelper()
}
jr.scratchInputRows = append(jr.scratchInputRows, jr.rowAlloc.CopyRow(encDatumRow))
Expand Down Expand Up @@ -1004,7 +1012,7 @@ func (jr *joinReader) performMemoryAccounting() error {
jr.accountedFor.scratchInputRows = int64(cap(jr.scratchInputRows)) * int64(rowenc.EncDatumRowOverhead)
jr.accountedFor.groupingState = jr.groupingState.memUsage()
newSz := jr.accountedFor.scratchInputRows + jr.accountedFor.groupingState
return addWorkmemHint(jr.memAcc.Resize(jr.Ctx, oldSz, newSz))
return jr.strategy.resizeMemoryAccount(&jr.memAcc, oldSz, newSz)
}

// Start is part of the RowSource interface.
Expand Down
34 changes: 29 additions & 5 deletions pkg/sql/rowexec/joinreader_span_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
// joinReaderSpanGenerator is used by the joinReader to generate spans for
// looking up into the index.
type joinReaderSpanGenerator interface {
// setResizeMemoryAccountFunc provides the generator with a
// resizeMemoryAccountFunc. It must be called once before the generator is
// used.
setResizeMemoryAccountFunc(resizeMemoryAccountFunc)

// generateSpans generates spans for the given batch of input rows. The spans
// are returned in rows order, but there are no duplicates (i.e. if a 2nd row
// results in the same spans as a previous row, the results don't include them
Expand Down Expand Up @@ -61,6 +66,8 @@ var _ joinReaderSpanGenerator = &defaultSpanGenerator{}
var _ joinReaderSpanGenerator = &multiSpanGenerator{}
var _ joinReaderSpanGenerator = &localityOptimizedSpanGenerator{}

type resizeMemoryAccountFunc func(_ *mon.BoundAccount, oldSz, newSz int64) error

type defaultSpanGenerator struct {
spanBuilder span.Builder
spanSplitter span.Splitter
Expand All @@ -78,8 +85,10 @@ type defaultSpanGenerator struct {
scratchSpans roachpb.Spans

// memAcc is owned by this span generator and is closed when the generator
// is closed.
memAcc *mon.BoundAccount
// is closed. All memory reservations should be done via
// resizeMemoryAccount.
memAcc *mon.BoundAccount
resizeMemoryAccount resizeMemoryAccountFunc
}

func (g *defaultSpanGenerator) init(
Expand All @@ -106,6 +115,10 @@ func (g *defaultSpanGenerator) init(
return nil
}

func (g *defaultSpanGenerator) setResizeMemoryAccountFunc(f resizeMemoryAccountFunc) {
g.resizeMemoryAccount = f
}

// Generate spans for a given row.
// If lookup columns are specified will use those to collect the relevant
// columns. Otherwise the first rows are assumed to correspond with the index.
Expand Down Expand Up @@ -166,7 +179,7 @@ func (g *defaultSpanGenerator) generateSpans(
}

// Memory accounting.
if err := g.memAcc.ResizeTo(ctx, g.memUsage()); err != nil {
if err := g.resizeMemoryAccount(g.memAcc, g.memAcc.Used(), g.memUsage()); err != nil {
return nil, addWorkmemHint(err)
}

Expand Down Expand Up @@ -290,8 +303,10 @@ type multiSpanGenerator struct {
scratchSpans roachpb.Spans

// memAcc is owned by this span generator and is closed when the generator
// is closed.
memAcc *mon.BoundAccount
// is closed. All memory reservations should be done via
// resizeMemoryAccount.
memAcc *mon.BoundAccount
resizeMemoryAccount resizeMemoryAccountFunc
}

// multiSpanGeneratorColInfo contains info about the values that a specific
Expand Down Expand Up @@ -601,6 +616,10 @@ func (g *multiSpanGenerator) fillInIndexColInfos(expr tree.TypedExpr) error {
return nil
}

func (g *multiSpanGenerator) setResizeMemoryAccountFunc(f resizeMemoryAccountFunc) {
g.resizeMemoryAccount = f
}

// generateNonNullSpans generates spans for a given row. It does not include
// null values, since those values would not match the lookup condition anyway.
func (g *multiSpanGenerator) generateNonNullSpans(row rowenc.EncDatumRow) (roachpb.Spans, error) {
Expand Down Expand Up @@ -803,6 +822,11 @@ func (g *localityOptimizedSpanGenerator) init(
return nil
}

func (g *localityOptimizedSpanGenerator) setResizeMemoryAccountFunc(f resizeMemoryAccountFunc) {
g.localSpanGen.setResizeMemoryAccountFunc(f)
g.remoteSpanGen.setResizeMemoryAccountFunc(f)
}

// maxLookupCols is part of the joinReaderSpanGenerator interface.
func (g *localityOptimizedSpanGenerator) maxLookupCols() int {
// We already asserted in init that maxLookupCols is the same for both the
Expand Down
108 changes: 81 additions & 27 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ type joinReaderStrategy interface {
nextRowToEmit(ctx context.Context) (rowenc.EncDatumRow, joinReaderState, error)
// spilled returns whether the strategy spilled to disk.
spilled() bool
// growMemoryAccount and resizeMemoryAccount should be used instead of
// memAcc.Grow and memAcc.Resize, respectively. The goal of these functions
// is to provide strategies a way to try to handle "memory budget exceeded"
// errors. These methods also wrap all errors with a workmem hint so that
// the caller doesn't have to.
growMemoryAccount(memAcc *mon.BoundAccount, delta int64) error
resizeMemoryAccount(memAcc *mon.BoundAccount, oldSz, newSz int64) error
// close releases any resources associated with the joinReaderStrategy.
close(ctx context.Context)
}
Expand Down Expand Up @@ -143,10 +150,10 @@ type joinReaderNoOrderingStrategy struct {

groupingState *inputBatchGroupingState

// memAcc is owned by this strategy and is closed when the strategy is
// closed. inputRows are owned by the joinReader, so they aren't accounted
// for with this memory account.
memAcc *mon.BoundAccount
// strategyMemAcc is owned by this strategy and is closed when the strategy
// is closed. inputRows are owned by the joinReader, so they aren't
// accounted for with this memory account.
strategyMemAcc *mon.BoundAccount
}

// getLookupRowsBatchSizeHint returns the batch size for the join reader no
Expand Down Expand Up @@ -201,8 +208,8 @@ func (s *joinReaderNoOrderingStrategy) processLookedUpRow(
matchingInputRowIndices = s.scratchMatchingInputRowIndices

// Perform memory accounting.
if err := s.memAcc.ResizeTo(s.Ctx, s.memUsage()); err != nil {
return jrStateUnknown, addWorkmemHint(err)
if err := s.resizeMemoryAccount(s.strategyMemAcc, s.strategyMemAcc.Used(), s.memUsage()); err != nil {
return jrStateUnknown, err
}
}
s.emitState.processingLookupRow = true
Expand Down Expand Up @@ -237,8 +244,8 @@ func (s *joinReaderNoOrderingStrategy) nextRowToEmit(
s.emitState.unmatchedInputRowIndicesCursor = 0

// Perform memory accounting.
if err := s.memAcc.ResizeTo(s.Ctx, s.memUsage()); err != nil {
return nil, jrStateUnknown, addWorkmemHint(err)
if err := s.resizeMemoryAccount(s.strategyMemAcc, s.strategyMemAcc.Used(), s.memUsage()); err != nil {
return nil, jrStateUnknown, err
}
}

Expand Down Expand Up @@ -298,8 +305,20 @@ func (s *joinReaderNoOrderingStrategy) nextRowToEmit(

func (s *joinReaderNoOrderingStrategy) spilled() bool { return false }

func (s *joinReaderNoOrderingStrategy) growMemoryAccount(
memAcc *mon.BoundAccount, delta int64,
) error {
return addWorkmemHint(memAcc.Grow(s.Ctx, delta))
}

func (s *joinReaderNoOrderingStrategy) resizeMemoryAccount(
memAcc *mon.BoundAccount, oldSz, newSz int64,
) error {
return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz))
}

func (s *joinReaderNoOrderingStrategy) close(ctx context.Context) {
s.memAcc.Close(ctx)
s.strategyMemAcc.Close(ctx)
s.joinReaderSpanGenerator.close(ctx)
*s = joinReaderNoOrderingStrategy{}
}
Expand Down Expand Up @@ -347,13 +366,13 @@ type joinReaderIndexJoinStrategy struct {
lookedUpRow rowenc.EncDatumRow
}

// memAcc is owned by this strategy and is closed when the strategy is
// closed. inputRows are owned by the joinReader, so they aren't accounted
// for with this memory account.
// strategyMemAcc is owned by this strategy and is closed when the strategy
// is closed. inputRows are owned by the joinReader, so they aren't
// accounted for with this memory account.
//
// Note that joinReaderIndexJoinStrategy doesn't actually need a
// memory account, and it's only responsible for closing it.
memAcc *mon.BoundAccount
// Note that joinReaderIndexJoinStrategy doesn't actually need a memory
// account, and it's only responsible for closing it.
strategyMemAcc *mon.BoundAccount
}

// getLookupRowsBatchSizeHint returns the batch size for the join reader index
Expand Down Expand Up @@ -408,8 +427,20 @@ func (s *joinReaderIndexJoinStrategy) spilled() bool {
return false
}

func (s *joinReaderIndexJoinStrategy) growMemoryAccount(
memAcc *mon.BoundAccount, delta int64,
) error {
return addWorkmemHint(memAcc.Grow(s.Ctx, delta))
}

func (s *joinReaderIndexJoinStrategy) resizeMemoryAccount(
memAcc *mon.BoundAccount, oldSz, newSz int64,
) error {
return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz))
}

func (s *joinReaderIndexJoinStrategy) close(ctx context.Context) {
s.memAcc.Close(ctx)
s.strategyMemAcc.Close(ctx)
s.joinReaderSpanGenerator.close(ctx)
*s = joinReaderIndexJoinStrategy{}
}
Expand Down Expand Up @@ -494,11 +525,11 @@ type joinReaderOrderingStrategy struct {
// the second join in paired-joins).
outputGroupContinuationForLeftRow bool

// memAcc is owned by this strategy and is closed when the strategy is
// closed. inputRows are owned by the joinReader, so they aren't accounted
// for with this memory account.
memAcc *mon.BoundAccount
accountedFor struct {
// strategyMemAcc is owned by this strategy and is closed when the strategy
// is closed. inputRows are owned by the joinReader, so they aren't
// accounted for with this memory account.
strategyMemAcc *mon.BoundAccount
accountedFor struct {
// sliceOverhead contains the memory usage of
// inputRowIdxToLookedUpRowIndices and
// accountedFor.inputRowIdxToLookedUpRowIndices that is currently
Expand Down Expand Up @@ -591,7 +622,7 @@ func (s *joinReaderOrderingStrategy) processLookupRows(
// Account for the new allocations, if any.
sliceOverhead := memsize.IntSliceOverhead*int64(cap(s.inputRowIdxToLookedUpRowIndices)) +
memsize.Int64*int64(cap(s.accountedFor.inputRowIdxToLookedUpRowIndices))
if err := s.growMemoryAccount(sliceOverhead - s.accountedFor.sliceOverhead); err != nil {
if err := s.growMemoryAccount(s.strategyMemAcc, sliceOverhead-s.accountedFor.sliceOverhead); err != nil {
return nil, err
}
s.accountedFor.sliceOverhead = sliceOverhead
Expand Down Expand Up @@ -655,7 +686,7 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
delta += newSize - s.accountedFor.inputRowIdxToLookedUpRowIndices[idx]
s.accountedFor.inputRowIdxToLookedUpRowIndices[idx] = newSize
}
if err := s.growMemoryAccount(delta); err != nil {
if err := s.growMemoryAccount(s.strategyMemAcc, delta); err != nil {
return jrStateUnknown, err
}

Expand Down Expand Up @@ -756,7 +787,7 @@ func (s *joinReaderOrderingStrategy) spilled() bool {
}

func (s *joinReaderOrderingStrategy) close(ctx context.Context) {
s.memAcc.Close(ctx)
s.strategyMemAcc.Close(ctx)
s.joinReaderSpanGenerator.close(ctx)
if s.lookedUpRows != nil {
s.lookedUpRows.Close(ctx)
Expand All @@ -770,8 +801,31 @@ func (s *joinReaderOrderingStrategy) close(ctx context.Context) {
// reservation is denied initially, then it'll attempt to spill lookedUpRows row
// container to disk, so the error is only returned when that wasn't
// successful.
func (s *joinReaderOrderingStrategy) growMemoryAccount(delta int64) error {
if err := s.memAcc.Grow(s.Ctx, delta); err != nil {
func (s *joinReaderOrderingStrategy) growMemoryAccount(
memAcc *mon.BoundAccount, delta int64,
) error {
if err := memAcc.Grow(s.Ctx, delta); err != nil {
// We don't have enough budget to account for the new size. Check
// whether we can spill the looked up rows to disk to free up the
// budget.
spilled, spillErr := s.lookedUpRows.SpillToDisk(s.Ctx)
if !spilled || spillErr != nil {
return addWorkmemHint(errors.CombineErrors(err, spillErr))
}
// We freed up some budget, so try to perform the accounting again.
return addWorkmemHint(memAcc.Grow(s.Ctx, delta))
}
return nil
}

// resizeMemoryAccount resizes the memory account according to oldSz and newSz.
// If the reservation is denied initially, then it'll attempt to spill
// lookedUpRows row container to disk, so the error is only returned when that
// wasn't successful.
func (s *joinReaderOrderingStrategy) resizeMemoryAccount(
memAcc *mon.BoundAccount, oldSz, newSz int64,
) error {
if err := memAcc.Resize(s.Ctx, oldSz, newSz); err != nil {
// We don't have enough budget to account for the new size. Check
// whether we can spill the looked up rows to disk to free up the
// budget.
Expand All @@ -780,7 +834,7 @@ func (s *joinReaderOrderingStrategy) growMemoryAccount(delta int64) error {
return addWorkmemHint(errors.CombineErrors(err, spillErr))
}
// We freed up some budget, so try to perform the accounting again.
return addWorkmemHint(s.memAcc.Grow(s.Ctx, delta))
return addWorkmemHint(memAcc.Resize(s.Ctx, oldSz, newSz))
}
return nil
}
Loading