Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsqlrun: make windower respect the memory limits #40340

Merged
merged 2 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func newHashJoiner(
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit("hashjoiner-limited", limit, flowCtx.EvalCtx.Mon)
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
h.MemMonitor = &limitedMon
h.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "hashjoiner-limited")
h.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "hashjoiner-disk")
// Override initialBufferSize to be half of this processor's memory
// limit. We consume up to h.initialBufferSize bytes from each input
Expand Down Expand Up @@ -736,14 +734,14 @@ func (h *hashJoiner) shouldEmitUnmatched(
// initStoredRows initializes a hashRowContainer and sets h.storedRows.
func (h *hashJoiner) initStoredRows() error {
if h.useTempStorage {
hrc := rowcontainer.MakeHashDiskBackedRowContainer(
hrc := rowcontainer.NewHashDiskBackedRowContainer(
&h.rows[h.storedSide],
h.evalCtx,
h.MemMonitor,
h.diskMonitor,
h.flowCtx.Cfg.TempStorage,
)
h.storedRows = &hrc
h.storedRows = hrc
} else {
hrc := rowcontainer.MakeHashMemRowContainer(&h.rows[h.storedSide])
h.storedRows = &hrc
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,9 @@ func newJoinReader(
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit("joinreader-limited", limit, flowCtx.EvalCtx.Mon)
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.MemMonitor = &limitedMon
jr.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "joiner-limited")
jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
drc := rowcontainer.MakeDiskBackedIndexedRowContainer(
drc := rowcontainer.NewDiskBackedIndexedRowContainer(
nil, /* ordering */
jr.desc.ColumnTypesWithMutations(returnMutations),
jr.evalCtx,
Expand Down
34 changes: 13 additions & 21 deletions pkg/sql/distsqlrun/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ type routerOutput struct {

stats RouterOutputStats

// memoryMonitor and diskMonitor are references to mu.rowContainer's monitors,
// used for stats extraction.
// memoryMonitor and diskMonitor are mu.rowContainer's monitors.
memoryMonitor, diskMonitor *mon.BytesMonitor
}

Expand Down Expand Up @@ -241,27 +240,24 @@ func (rb *routerBase) init(ctx context.Context, flowCtx *FlowCtx, types []types.
// This method must be called before we start() so we don't need
// to take the mutex.
evalCtx := flowCtx.NewEvalCtx()
memoryMonitor := evalCtx.Mon
diskMonitor := flowCtx.Cfg.DiskMonitor
if rb.statsCollectionEnabled {
// Start private monitors for stats collection.
memoryMonitorName := fmt.Sprintf("router-stat-mem-%d", rb.outputs[i].streamID)
memoryMonitor = NewMonitor(ctx, memoryMonitor, memoryMonitorName)
diskMonitorName := fmt.Sprintf("router-stat-disk-%d", rb.outputs[i].streamID)
diskMonitor = NewMonitor(ctx, diskMonitor, diskMonitorName)
}
rb.outputs[i].memoryMonitor = NewLimitedMonitor(
ctx, evalCtx.Mon, flowCtx.Cfg,
fmt.Sprintf("router-limited-%d", rb.outputs[i].streamID),
)
rb.outputs[i].diskMonitor = NewMonitor(
ctx, flowCtx.Cfg.DiskMonitor,
fmt.Sprintf("router-disk-%d", rb.outputs[i].streamID),
)

rb.outputs[i].mu.rowContainer.Init(
nil, /* ordering */
types,
evalCtx,
flowCtx.Cfg.TempStorage,
memoryMonitor,
diskMonitor,
rb.outputs[i].memoryMonitor,
rb.outputs[i].diskMonitor,
0, /* rowCapacity */
)
rb.outputs[i].memoryMonitor = memoryMonitor
rb.outputs[i].diskMonitor = diskMonitor

// Initialize any outboxes.
if o, ok := rb.outputs[i].stream.(*outbox); ok {
Expand Down Expand Up @@ -357,12 +353,8 @@ func (rb *routerBase) start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c
ro.mu.rowContainer.Close(ctx)
ro.mu.Unlock()

if rb.statsCollectionEnabled {
// Stats collection requires private BytesMonitors, so ensure that they
// are stopped.
ro.memoryMonitor.Stop(ctx)
ro.diskMonitor.Stop(ctx)
}
ro.memoryMonitor.Stop(ctx)
ro.diskMonitor.Stop(ctx)

wg.Done()
}(ctx, rb, &rb.outputs[i], wg)
Expand Down
46 changes: 36 additions & 10 deletions pkg/sql/distsqlrun/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ import (
// that tracks the lifetime of the background router goroutines.
func setupRouter(
t testing.TB,
st *cluster.Settings,
evalCtx *tree.EvalContext,
diskMonitor *mon.BytesMonitor,
spec distsqlpb.OutputRouterSpec,
inputTypes []types.T,
streams []RowReceiver,
Expand All @@ -55,7 +57,10 @@ func setupRouter(

ctx := context.TODO()
flowCtx := FlowCtx{
Cfg: &ServerConfig{Settings: cluster.MakeTestingClusterSettings()},
Cfg: &ServerConfig{
Settings: st,
DiskMonitor: diskMonitor,
},
EvalCtx: evalCtx,
}
r.init(ctx, &flowCtx, inputTypes)
Expand All @@ -72,8 +77,12 @@ func TestRouters(t *testing.T) {

rng, _ := randutil.NewPseudoRand()
alloc := &sqlbase.DatumAlloc{}
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

// Generate tables of possible values for each column; we have fewer possible
// values than rows to guarantee many occurrences of each value.
Expand Down Expand Up @@ -136,7 +145,7 @@ func TestRouters(t *testing.T) {
tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}

r, wg := setupRouter(t, evalCtx, tc.spec, types, recvs)
r, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, types, recvs)

for i := 0; i < numRows; i++ {
row := make(sqlbase.EncDatumRow, numCols)
Expand Down Expand Up @@ -281,8 +290,12 @@ var (
func TestConsumerStatus(t *testing.T) {
defer leaktest.AfterTest(t)()

evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

testCases := []struct {
name string
Expand Down Expand Up @@ -314,7 +327,7 @@ func TestConsumerStatus(t *testing.T) {
}

colTypes := []types.T{*types.Int}
router, wg := setupRouter(t, evalCtx, tc.spec, colTypes, recvs)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, colTypes, recvs)

// row0 will be a row that the router sends to the first stream, row1 to
// the 2nd stream.
Expand Down Expand Up @@ -433,8 +446,12 @@ func preimageAttack(
func TestMetadataIsForwarded(t *testing.T) {
defer leaktest.AfterTest(t)()

evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

testCases := []struct {
name string
Expand Down Expand Up @@ -464,7 +481,7 @@ func TestMetadataIsForwarded(t *testing.T) {
recvs[i] = &chans[i]
tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}
router, wg := setupRouter(t, evalCtx, tc.spec, nil /* no columns */, recvs)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

err1 := errors.Errorf("test error 1")
err2 := errors.Errorf("test error 2")
Expand Down Expand Up @@ -590,8 +607,14 @@ func TestRouterBlocks(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
ctx := context.TODO()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)
flowCtx := FlowCtx{
Cfg: &ServerConfig{Settings: st},
Cfg: &ServerConfig{
Settings: st,
DiskMonitor: diskMonitor,
},
EvalCtx: &evalCtx,
}
router.init(ctx, &flowCtx, colTypes)
Expand Down Expand Up @@ -885,8 +908,11 @@ func BenchmarkRouter(b *testing.B) {
colTypes := sqlbase.MakeIntCols(numCols)

ctx := context.Background()
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

input := NewRepeatableRowSource(sqlbase.OneIntCol, sqlbase.MakeIntRows(numRows, numCols))

Expand Down Expand Up @@ -920,7 +946,7 @@ func BenchmarkRouter(b *testing.B) {
recvs[i] = &chans[i]
spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}
r, wg := setupRouter(b, evalCtx, spec, colTypes, recvs)
r, wg := setupRouter(b, st, evalCtx, diskMonitor, spec, colTypes, recvs)
for i := range chans {
go drainRowChannel(&chans[i])
}
Expand Down
82 changes: 54 additions & 28 deletions pkg/sql/distsqlrun/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ const (
windowerEmittingRows
)

// memRequiredByWindower indicates the minimum amount of RAM (in bytes) that
// the windower needs.
const memRequiredByWindower = 100 * 1024

// windower is the processor that performs computation of window functions
// that have the same PARTITION BY clause. It passes through all of its input
// columns and puts the output of a window function windowFn at
Expand Down Expand Up @@ -160,34 +164,9 @@ func newWindower(
evalCtx := flowCtx.NewEvalCtx()
w.inputTypes = input.OutputTypes()
ctx := evalCtx.Ctx()
memMonitor := NewMonitor(ctx, evalCtx.Mon, "windower-mem")
w.acc = memMonitor.MakeBoundAccount()
w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk")
if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = NewInputStatCollector(w.input)
w.finishTrace = w.outputStatsToTrace
}

windowFns := spec.WindowFns
w.partitionBy = spec.PartitionBy
allRowsPartitioned := rowcontainer.MakeHashDiskBackedRowContainer(
nil, /* memRowContainer */
evalCtx,
memMonitor,
w.diskMonitor,
flowCtx.Cfg.TempStorage,
)
w.allRowsPartitioned = &allRowsPartitioned
if err := w.allRowsPartitioned.Init(
ctx,
false, /* shouldMark */
w.inputTypes,
w.partitionBy,
true, /* encodeNull */
); err != nil {
return nil, err
}

windowFns := spec.WindowFns
w.windowFns = make([]*windowFunc, 0, len(windowFns))
w.builtins = make([]tree.WindowFunc, 0, len(windowFns))
// windower passes through all of its input columns and appends an output
Expand Down Expand Up @@ -219,6 +198,28 @@ func newWindower(
}
w.outputRow = make(sqlbase.EncDatumRow, len(w.outputTypes))

st := flowCtx.Cfg.Settings
// Limit the memory use by creating a child monitor with a hard limit.
// windower will overflow to disk if this limit is not enough.
limit := flowCtx.Cfg.TestingKnobs.MemoryLimitBytes
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
if limit < memRequiredByWindower {
return nil, errors.Errorf(
"window functions require %d bytes of RAM but only %d are in the budget. "+
"Consider increasing sql.distsql.temp_storage.workmem setting",
memRequiredByWindower, limit)
}
} else {
if limit < memRequiredByWindower {
// The limit is set very low by the tests, but the windower requires
// some amount of RAM, so we override the limit.
limit = memRequiredByWindower
}
}
limitedMon := mon.MakeMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon)
limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{})

if err := w.InitWithEvalCtx(
w,
post,
Expand All @@ -227,7 +228,7 @@ func newWindower(
evalCtx,
processorID,
output,
memMonitor,
&limitedMon,
ProcStateOpts{InputsToDrain: []RowSource{w.input},
TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata {
w.close()
Expand All @@ -237,6 +238,31 @@ func newWindower(
return nil, err
}

w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk")
w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer(
nil, /* memRowContainer */
evalCtx,
w.MemMonitor,
w.diskMonitor,
flowCtx.Cfg.TempStorage,
)
if err := w.allRowsPartitioned.Init(
ctx,
false, /* shouldMark */
w.inputTypes,
w.partitionBy,
true, /* encodeNull */
); err != nil {
return nil, err
}

w.acc = w.MemMonitor.MakeBoundAccount()

if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) {
w.input = NewInputStatCollector(w.input)
w.finishTrace = w.outputStatsToTrace
}

return w, nil
}

Expand Down Expand Up @@ -666,7 +692,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva
// w.partition will have ordering as needed by the first window function to
// be processed.
ordering := distsqlpb.ConvertToColumnOrdering(w.windowFns[w.orderOfWindowFnsProcessing[0]].ordering)
w.partition = rowcontainer.MakeDiskBackedIndexedRowContainer(
w.partition = rowcontainer.NewDiskBackedIndexedRowContainer(
ordering,
w.inputTypes,
w.evalCtx,
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -3545,3 +3545,19 @@ SELECT *, avg(w) OVER (PARTITION BY w, z ORDER BY y) FROM wxyz ORDER BY z, w, y
----
2 10 2 0 2
4 10 2 0 4

# Test that windower respects the memory limit set via the cluster setting.
statement ok
SET CLUSTER SETTING sql.distsql.temp_storage.workmem='200KB'

statement ok
CREATE TABLE l (a INT PRIMARY KEY)

statement ok
INSERT INTO l SELECT g FROM generate_series(0,10000) g(g)

statement error memory budget exceeded
SELECT array_agg(a) OVER () FROM l LIMIT 1

statement ok
RESET CLUSTER SETTING sql.distsql.temp_storage.workmem
Loading