Skip to content

Commit

Permalink
distsqlrun: make router output respect memory limit setting
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
yuzefovich committed Sep 10, 2019
1 parent a454a44 commit 9c77823
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 56 deletions.
4 changes: 1 addition & 3 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func newHashJoiner(
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit("hashjoiner-limited", limit, flowCtx.EvalCtx.Mon)
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
h.MemMonitor = &limitedMon
h.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "hashjoiner-limited")
h.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "hashjoiner-disk")
// Override initialBufferSize to be half of this processor's memory
// limit. We consume up to h.initialBufferSize bytes from each input
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ func newJoinReader(
if limit <= 0 {
limit = settingWorkMemBytes.Get(&st.SV)
}
limitedMon := mon.MakeMonitorInheritWithLimit("joinreader-limited", limit, flowCtx.EvalCtx.Mon)
limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.MemMonitor = &limitedMon
jr.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "joiner-limited")
jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk")
drc := rowcontainer.NewDiskBackedIndexedRowContainer(
nil, /* ordering */
Expand Down
34 changes: 13 additions & 21 deletions pkg/sql/distsqlrun/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ type routerOutput struct {

stats RouterOutputStats

// memoryMonitor and diskMonitor are references to mu.rowContainer's monitors,
// used for stats extraction.
// memoryMonitor and diskMonitor are mu.rowContainer's monitors.
memoryMonitor, diskMonitor *mon.BytesMonitor
}

Expand Down Expand Up @@ -241,27 +240,24 @@ func (rb *routerBase) init(ctx context.Context, flowCtx *FlowCtx, types []types.
// This method must be called before we start() so we don't need
// to take the mutex.
evalCtx := flowCtx.NewEvalCtx()
memoryMonitor := evalCtx.Mon
diskMonitor := flowCtx.Cfg.DiskMonitor
if rb.statsCollectionEnabled {
// Start private monitors for stats collection.
memoryMonitorName := fmt.Sprintf("router-stat-mem-%d", rb.outputs[i].streamID)
memoryMonitor = NewMonitor(ctx, memoryMonitor, memoryMonitorName)
diskMonitorName := fmt.Sprintf("router-stat-disk-%d", rb.outputs[i].streamID)
diskMonitor = NewMonitor(ctx, diskMonitor, diskMonitorName)
}
rb.outputs[i].memoryMonitor = NewLimitedMonitor(
ctx, evalCtx.Mon, flowCtx.Cfg,
fmt.Sprintf("router-limited-%d", rb.outputs[i].streamID),
)
rb.outputs[i].diskMonitor = NewMonitor(
ctx, flowCtx.Cfg.DiskMonitor,
fmt.Sprintf("router-disk-%d", rb.outputs[i].streamID),
)

rb.outputs[i].mu.rowContainer.Init(
nil, /* ordering */
types,
evalCtx,
flowCtx.Cfg.TempStorage,
memoryMonitor,
diskMonitor,
rb.outputs[i].memoryMonitor,
rb.outputs[i].diskMonitor,
0, /* rowCapacity */
)
rb.outputs[i].memoryMonitor = memoryMonitor
rb.outputs[i].diskMonitor = diskMonitor

// Initialize any outboxes.
if o, ok := rb.outputs[i].stream.(*outbox); ok {
Expand Down Expand Up @@ -357,12 +353,8 @@ func (rb *routerBase) start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c
ro.mu.rowContainer.Close(ctx)
ro.mu.Unlock()

if rb.statsCollectionEnabled {
// Stats collection requires private BytesMonitors, so ensure that they
// are stopped.
ro.memoryMonitor.Stop(ctx)
ro.diskMonitor.Stop(ctx)
}
ro.memoryMonitor.Stop(ctx)
ro.diskMonitor.Stop(ctx)

wg.Done()
}(ctx, rb, &rb.outputs[i], wg)
Expand Down
46 changes: 36 additions & 10 deletions pkg/sql/distsqlrun/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ import (
// that tracks the lifetime of the background router goroutines.
func setupRouter(
t testing.TB,
st *cluster.Settings,
evalCtx *tree.EvalContext,
diskMonitor *mon.BytesMonitor,
spec distsqlpb.OutputRouterSpec,
inputTypes []types.T,
streams []RowReceiver,
Expand All @@ -55,7 +57,10 @@ func setupRouter(

ctx := context.TODO()
flowCtx := FlowCtx{
Cfg: &ServerConfig{Settings: cluster.MakeTestingClusterSettings()},
Cfg: &ServerConfig{
Settings: st,
DiskMonitor: diskMonitor,
},
EvalCtx: evalCtx,
}
r.init(ctx, &flowCtx, inputTypes)
Expand All @@ -72,8 +77,12 @@ func TestRouters(t *testing.T) {

rng, _ := randutil.NewPseudoRand()
alloc := &sqlbase.DatumAlloc{}
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

// Generate tables of possible values for each column; we have fewer possible
// values than rows to guarantee many occurrences of each value.
Expand Down Expand Up @@ -136,7 +145,7 @@ func TestRouters(t *testing.T) {
tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}

r, wg := setupRouter(t, evalCtx, tc.spec, types, recvs)
r, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, types, recvs)

for i := 0; i < numRows; i++ {
row := make(sqlbase.EncDatumRow, numCols)
Expand Down Expand Up @@ -281,8 +290,12 @@ var (
func TestConsumerStatus(t *testing.T) {
defer leaktest.AfterTest(t)()

evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

testCases := []struct {
name string
Expand Down Expand Up @@ -314,7 +327,7 @@ func TestConsumerStatus(t *testing.T) {
}

colTypes := []types.T{*types.Int}
router, wg := setupRouter(t, evalCtx, tc.spec, colTypes, recvs)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, colTypes, recvs)

// row0 will be a row that the router sends to the first stream, row1 to
// the 2nd stream.
Expand Down Expand Up @@ -433,8 +446,12 @@ func preimageAttack(
func TestMetadataIsForwarded(t *testing.T) {
defer leaktest.AfterTest(t)()

evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
ctx := context.TODO()
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

testCases := []struct {
name string
Expand Down Expand Up @@ -464,7 +481,7 @@ func TestMetadataIsForwarded(t *testing.T) {
recvs[i] = &chans[i]
tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}
router, wg := setupRouter(t, evalCtx, tc.spec, nil /* no columns */, recvs)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

err1 := errors.Errorf("test error 1")
err2 := errors.Errorf("test error 2")
Expand Down Expand Up @@ -590,8 +607,14 @@ func TestRouterBlocks(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
ctx := context.TODO()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)
flowCtx := FlowCtx{
Cfg: &ServerConfig{Settings: st},
Cfg: &ServerConfig{
Settings: st,
DiskMonitor: diskMonitor,
},
EvalCtx: &evalCtx,
}
router.init(ctx, &flowCtx, colTypes)
Expand Down Expand Up @@ -885,8 +908,11 @@ func BenchmarkRouter(b *testing.B) {
colTypes := sqlbase.MakeIntCols(numCols)

ctx := context.Background()
evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings())
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.NewTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)

input := NewRepeatableRowSource(sqlbase.OneIntCol, sqlbase.MakeIntRows(numRows, numCols))

Expand Down Expand Up @@ -920,7 +946,7 @@ func BenchmarkRouter(b *testing.B) {
recvs[i] = &chans[i]
spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)}
}
r, wg := setupRouter(b, evalCtx, spec, colTypes, recvs)
r, wg := setupRouter(b, st, evalCtx, diskMonitor, spec, colTypes, recvs)
for i := range chans {
go drainRowChannel(&chans[i])
}
Expand Down
38 changes: 19 additions & 19 deletions pkg/sql/distsqlrun/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,6 @@ func newWindower(
}
w.outputRow = make(sqlbase.EncDatumRow, len(w.outputTypes))

if err := w.InitWithEvalCtx(
w,
post,
w.outputTypes,
flowCtx,
evalCtx,
processorID,
output,
nil, /* memMonitor */
ProcStateOpts{InputsToDrain: []RowSource{w.input},
TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata {
w.close()
return nil
}},
); err != nil {
return nil, err
}

st := flowCtx.Cfg.Settings
// Limit the memory use by creating a child monitor with a hard limit.
// windower will overflow to disk if this limit is not enough.
Expand All @@ -237,7 +219,25 @@ func newWindower(
}
limitedMon := mon.MakeMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon)
limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{})
w.MemMonitor = &limitedMon

if err := w.InitWithEvalCtx(
w,
post,
w.outputTypes,
flowCtx,
evalCtx,
processorID,
output,
&limitedMon,
ProcStateOpts{InputsToDrain: []RowSource{w.input},
TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata {
w.close()
return nil
}},
); err != nil {
return nil, err
}

w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk")
w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer(
nil, /* memRowContainer */
Expand Down

0 comments on commit 9c77823

Please sign in to comment.