diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index a57c6eed5c26..47168deef5d6 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -181,9 +181,7 @@ func newHashJoiner( if limit <= 0 { limit = settingWorkMemBytes.Get(&st.SV) } - limitedMon := mon.MakeMonitorInheritWithLimit("hashjoiner-limited", limit, flowCtx.EvalCtx.Mon) - limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{}) - h.MemMonitor = &limitedMon + h.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "hashjoiner-limited") h.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "hashjoiner-disk") // Override initialBufferSize to be half of this processor's memory // limit. We consume up to h.initialBufferSize bytes from each input @@ -736,14 +734,14 @@ func (h *hashJoiner) shouldEmitUnmatched( // initStoredRows initializes a hashRowContainer and sets h.storedRows. func (h *hashJoiner) initStoredRows() error { if h.useTempStorage { - hrc := rowcontainer.MakeHashDiskBackedRowContainer( + hrc := rowcontainer.NewHashDiskBackedRowContainer( &h.rows[h.storedSide], h.evalCtx, h.MemMonitor, h.diskMonitor, h.flowCtx.Cfg.TempStorage, ) - h.storedRows = &hrc + h.storedRows = hrc } else { hrc := rowcontainer.MakeHashMemRowContainer(&h.rows[h.storedSide]) h.storedRows = &hrc diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index ba5c488d440d..5a0916e2db8b 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -239,11 +239,9 @@ func newJoinReader( if limit <= 0 { limit = settingWorkMemBytes.Get(&st.SV) } - limitedMon := mon.MakeMonitorInheritWithLimit("joinreader-limited", limit, flowCtx.EvalCtx.Mon) - limitedMon.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{}) - jr.MemMonitor = &limitedMon + jr.MemMonitor = NewLimitedMonitor(ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, "joiner-limited") jr.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "joinreader-disk") - drc := rowcontainer.MakeDiskBackedIndexedRowContainer( + drc := rowcontainer.NewDiskBackedIndexedRowContainer( nil, /* ordering */ jr.desc.ColumnTypesWithMutations(returnMutations), jr.evalCtx, diff --git a/pkg/sql/distsqlrun/routers.go b/pkg/sql/distsqlrun/routers.go index 8a3e2e2fed5a..aa458ee57307 100644 --- a/pkg/sql/distsqlrun/routers.go +++ b/pkg/sql/distsqlrun/routers.go @@ -101,8 +101,7 @@ type routerOutput struct { stats RouterOutputStats - // memoryMonitor and diskMonitor are references to mu.rowContainer's monitors, - // used for stats extraction. + // memoryMonitor and diskMonitor are mu.rowContainer's monitors. memoryMonitor, diskMonitor *mon.BytesMonitor } @@ -241,27 +240,24 @@ func (rb *routerBase) init(ctx context.Context, flowCtx *FlowCtx, types []types. // This method must be called before we start() so we don't need // to take the mutex. evalCtx := flowCtx.NewEvalCtx() - memoryMonitor := evalCtx.Mon - diskMonitor := flowCtx.Cfg.DiskMonitor - if rb.statsCollectionEnabled { - // Start private monitors for stats collection. - memoryMonitorName := fmt.Sprintf("router-stat-mem-%d", rb.outputs[i].streamID) - memoryMonitor = NewMonitor(ctx, memoryMonitor, memoryMonitorName) - diskMonitorName := fmt.Sprintf("router-stat-disk-%d", rb.outputs[i].streamID) - diskMonitor = NewMonitor(ctx, diskMonitor, diskMonitorName) - } + rb.outputs[i].memoryMonitor = NewLimitedMonitor( + ctx, evalCtx.Mon, flowCtx.Cfg, + fmt.Sprintf("router-limited-%d", rb.outputs[i].streamID), + ) + rb.outputs[i].diskMonitor = NewMonitor( + ctx, flowCtx.Cfg.DiskMonitor, + fmt.Sprintf("router-disk-%d", rb.outputs[i].streamID), + ) rb.outputs[i].mu.rowContainer.Init( nil, /* ordering */ types, evalCtx, flowCtx.Cfg.TempStorage, - memoryMonitor, - diskMonitor, + rb.outputs[i].memoryMonitor, + rb.outputs[i].diskMonitor, 0, /* rowCapacity */ ) - rb.outputs[i].memoryMonitor = memoryMonitor - rb.outputs[i].diskMonitor = diskMonitor // Initialize any outboxes. if o, ok := rb.outputs[i].stream.(*outbox); ok { @@ -357,12 +353,8 @@ func (rb *routerBase) start(ctx context.Context, wg *sync.WaitGroup, ctxCancel c ro.mu.rowContainer.Close(ctx) ro.mu.Unlock() - if rb.statsCollectionEnabled { - // Stats collection requires private BytesMonitors, so ensure that they - // are stopped. - ro.memoryMonitor.Stop(ctx) - ro.diskMonitor.Stop(ctx) - } + ro.memoryMonitor.Stop(ctx) + ro.diskMonitor.Stop(ctx) wg.Done() }(ctx, rb, &rb.outputs[i], wg) diff --git a/pkg/sql/distsqlrun/routers_test.go b/pkg/sql/distsqlrun/routers_test.go index aaa5be3c368b..f888fb466ea1 100644 --- a/pkg/sql/distsqlrun/routers_test.go +++ b/pkg/sql/distsqlrun/routers_test.go @@ -43,7 +43,9 @@ import ( // that tracks the lifetime of the background router goroutines. func setupRouter( t testing.TB, + st *cluster.Settings, evalCtx *tree.EvalContext, + diskMonitor *mon.BytesMonitor, spec distsqlpb.OutputRouterSpec, inputTypes []types.T, streams []RowReceiver, @@ -55,7 +57,10 @@ func setupRouter( ctx := context.TODO() flowCtx := FlowCtx{ - Cfg: &ServerConfig{Settings: cluster.MakeTestingClusterSettings()}, + Cfg: &ServerConfig{ + Settings: st, + DiskMonitor: diskMonitor, + }, EvalCtx: evalCtx, } r.init(ctx, &flowCtx, inputTypes) @@ -72,8 +77,12 @@ func TestRouters(t *testing.T) { rng, _ := randutil.NewPseudoRand() alloc := &sqlbase.DatumAlloc{} - evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + ctx := context.TODO() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(context.Background()) + diskMonitor := makeTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) // Generate tables of possible values for each column; we have fewer possible // values than rows to guarantee many occurrences of each value. @@ -136,7 +145,7 @@ func TestRouters(t *testing.T) { tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)} } - r, wg := setupRouter(t, evalCtx, tc.spec, types, recvs) + r, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, types, recvs) for i := 0; i < numRows; i++ { row := make(sqlbase.EncDatumRow, numCols) @@ -281,8 +290,12 @@ var ( func TestConsumerStatus(t *testing.T) { defer leaktest.AfterTest(t)() - evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + ctx := context.TODO() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(context.Background()) + diskMonitor := makeTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) testCases := []struct { name string @@ -314,7 +327,7 @@ func TestConsumerStatus(t *testing.T) { } colTypes := []types.T{*types.Int} - router, wg := setupRouter(t, evalCtx, tc.spec, colTypes, recvs) + router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, colTypes, recvs) // row0 will be a row that the router sends to the first stream, row1 to // the 2nd stream. @@ -433,8 +446,12 @@ func preimageAttack( func TestMetadataIsForwarded(t *testing.T) { defer leaktest.AfterTest(t)() - evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + ctx := context.TODO() + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(context.Background()) + diskMonitor := makeTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) testCases := []struct { name string @@ -464,7 +481,7 @@ func TestMetadataIsForwarded(t *testing.T) { recvs[i] = &chans[i] tc.spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)} } - router, wg := setupRouter(t, evalCtx, tc.spec, nil /* no columns */, recvs) + router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) err1 := errors.Errorf("test error 1") err2 := errors.Errorf("test error 2") @@ -590,8 +607,14 @@ func TestRouterBlocks(t *testing.T) { st := cluster.MakeTestingClusterSettings() ctx := context.TODO() evalCtx := tree.MakeTestingEvalContext(st) + defer evalCtx.Stop(ctx) + diskMonitor := makeTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) flowCtx := FlowCtx{ - Cfg: &ServerConfig{Settings: st}, + Cfg: &ServerConfig{ + Settings: st, + DiskMonitor: diskMonitor, + }, EvalCtx: &evalCtx, } router.init(ctx, &flowCtx, colTypes) @@ -885,8 +908,11 @@ func BenchmarkRouter(b *testing.B) { colTypes := sqlbase.MakeIntCols(numCols) ctx := context.Background() - evalCtx := tree.NewTestingEvalContext(cluster.MakeTestingClusterSettings()) + st := cluster.MakeTestingClusterSettings() + evalCtx := tree.NewTestingEvalContext(st) defer evalCtx.Stop(context.Background()) + diskMonitor := makeTestDiskMonitor(ctx, st) + defer diskMonitor.Stop(ctx) input := NewRepeatableRowSource(sqlbase.OneIntCol, sqlbase.MakeIntRows(numRows, numCols)) @@ -920,7 +946,7 @@ func BenchmarkRouter(b *testing.B) { recvs[i] = &chans[i] spec.Streams[i] = distsqlpb.StreamEndpointSpec{StreamID: distsqlpb.StreamID(i)} } - r, wg := setupRouter(b, evalCtx, spec, colTypes, recvs) + r, wg := setupRouter(b, st, evalCtx, diskMonitor, spec, colTypes, recvs) for i := range chans { go drainRowChannel(&chans[i]) } diff --git a/pkg/sql/distsqlrun/windower.go b/pkg/sql/distsqlrun/windower.go index a3c981a1bc71..633d462cde39 100644 --- a/pkg/sql/distsqlrun/windower.go +++ b/pkg/sql/distsqlrun/windower.go @@ -103,6 +103,10 @@ const ( windowerEmittingRows ) +// memRequiredByWindower indicates the minimum amount of RAM (in bytes) that +// the windower needs. +const memRequiredByWindower = 100 * 1024 + // windower is the processor that performs computation of window functions // that have the same PARTITION BY clause. It passes through all of its input // columns and puts the output of a window function windowFn at @@ -160,34 +164,9 @@ func newWindower( evalCtx := flowCtx.NewEvalCtx() w.inputTypes = input.OutputTypes() ctx := evalCtx.Ctx() - memMonitor := NewMonitor(ctx, evalCtx.Mon, "windower-mem") - w.acc = memMonitor.MakeBoundAccount() - w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk") - if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { - w.input = NewInputStatCollector(w.input) - w.finishTrace = w.outputStatsToTrace - } - windowFns := spec.WindowFns w.partitionBy = spec.PartitionBy - allRowsPartitioned := rowcontainer.MakeHashDiskBackedRowContainer( - nil, /* memRowContainer */ - evalCtx, - memMonitor, - w.diskMonitor, - flowCtx.Cfg.TempStorage, - ) - w.allRowsPartitioned = &allRowsPartitioned - if err := w.allRowsPartitioned.Init( - ctx, - false, /* shouldMark */ - w.inputTypes, - w.partitionBy, - true, /* encodeNull */ - ); err != nil { - return nil, err - } - + windowFns := spec.WindowFns w.windowFns = make([]*windowFunc, 0, len(windowFns)) w.builtins = make([]tree.WindowFunc, 0, len(windowFns)) // windower passes through all of its input columns and appends an output @@ -219,6 +198,28 @@ func newWindower( } w.outputRow = make(sqlbase.EncDatumRow, len(w.outputTypes)) + st := flowCtx.Cfg.Settings + // Limit the memory use by creating a child monitor with a hard limit. + // windower will overflow to disk if this limit is not enough. + limit := flowCtx.Cfg.TestingKnobs.MemoryLimitBytes + if limit <= 0 { + limit = settingWorkMemBytes.Get(&st.SV) + if limit < memRequiredByWindower { + return nil, errors.Errorf( + "window functions require %d bytes of RAM but only %d are in the budget. "+ + "Consider increasing sql.distsql.temp_storage.workmem setting", + memRequiredByWindower, limit) + } + } else { + if limit < memRequiredByWindower { + // The limit is set very low by the tests, but the windower requires + // some amount of RAM, so we override the limit. + limit = memRequiredByWindower + } + } + limitedMon := mon.MakeMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon) + limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + if err := w.InitWithEvalCtx( w, post, @@ -227,7 +228,7 @@ func newWindower( evalCtx, processorID, output, - memMonitor, + &limitedMon, ProcStateOpts{InputsToDrain: []RowSource{w.input}, TrailingMetaCallback: func(context.Context) []distsqlpb.ProducerMetadata { w.close() @@ -237,6 +238,31 @@ func newWindower( return nil, err } + w.diskMonitor = NewMonitor(ctx, flowCtx.Cfg.DiskMonitor, "windower-disk") + w.allRowsPartitioned = rowcontainer.NewHashDiskBackedRowContainer( + nil, /* memRowContainer */ + evalCtx, + w.MemMonitor, + w.diskMonitor, + flowCtx.Cfg.TempStorage, + ) + if err := w.allRowsPartitioned.Init( + ctx, + false, /* shouldMark */ + w.inputTypes, + w.partitionBy, + true, /* encodeNull */ + ); err != nil { + return nil, err + } + + w.acc = w.MemMonitor.MakeBoundAccount() + + if sp := opentracing.SpanFromContext(ctx); sp != nil && tracing.IsRecording(sp) { + w.input = NewInputStatCollector(w.input) + w.finishTrace = w.outputStatsToTrace + } + return w, nil } @@ -666,7 +692,7 @@ func (w *windower) computeWindowFunctions(ctx context.Context, evalCtx *tree.Eva // w.partition will have ordering as needed by the first window function to // be processed. ordering := distsqlpb.ConvertToColumnOrdering(w.windowFns[w.orderOfWindowFnsProcessing[0]].ordering) - w.partition = rowcontainer.MakeDiskBackedIndexedRowContainer( + w.partition = rowcontainer.NewDiskBackedIndexedRowContainer( ordering, w.inputTypes, w.evalCtx, diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index 3af555520572..4625a5c29838 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -3545,3 +3545,19 @@ SELECT *, avg(w) OVER (PARTITION BY w, z ORDER BY y) FROM wxyz ORDER BY z, w, y ---- 2 10 2 0 2 4 10 2 0 4 + +# Test that windower respects the memory limit set via the cluster setting. +statement ok +SET CLUSTER SETTING sql.distsql.temp_storage.workmem='200KB' + +statement ok +CREATE TABLE l (a INT PRIMARY KEY) + +statement ok +INSERT INTO l SELECT g FROM generate_series(0,10000) g(g) + +statement error memory budget exceeded +SELECT array_agg(a) OVER () FROM l LIMIT 1 + +statement ok +RESET CLUSTER SETTING sql.distsql.temp_storage.workmem diff --git a/pkg/sql/rowcontainer/hash_row_container.go b/pkg/sql/rowcontainer/hash_row_container.go index 2a077883dd1a..cf0af75e69fc 100644 --- a/pkg/sql/rowcontainer/hash_row_container.go +++ b/pkg/sql/rowcontainer/hash_row_container.go @@ -765,21 +765,21 @@ type HashDiskBackedRowContainer struct { var _ HashRowContainer = &HashDiskBackedRowContainer{} -// MakeHashDiskBackedRowContainer makes a HashDiskBackedRowContainer. +// NewHashDiskBackedRowContainer makes a HashDiskBackedRowContainer. // mrc (the first argument) can either be nil (in which case // HashMemRowContainer will be built upon an empty MemRowContainer) or non-nil // (in which case mrc is used as underlying MemRowContainer under // HashMemRowContainer). The latter case is used by the hashJoiner since when // initializing HashDiskBackedRowContainer it will have accumulated rows from // both sides of the join in MemRowContainers, and we can reuse one of them. -func MakeHashDiskBackedRowContainer( +func NewHashDiskBackedRowContainer( mrc *MemRowContainer, evalCtx *tree.EvalContext, memoryMonitor *mon.BytesMonitor, diskMonitor *mon.BytesMonitor, engine diskmap.Factory, -) HashDiskBackedRowContainer { - return HashDiskBackedRowContainer{ +) *HashDiskBackedRowContainer { + return &HashDiskBackedRowContainer{ mrc: mrc, evalCtx: evalCtx, memoryMonitor: memoryMonitor, diff --git a/pkg/sql/rowcontainer/hash_row_container_test.go b/pkg/sql/rowcontainer/hash_row_container_test.go index f53e47166ac7..168a4c5a62a4 100644 --- a/pkg/sql/rowcontainer/hash_row_container_test.go +++ b/pkg/sql/rowcontainer/hash_row_container_test.go @@ -68,7 +68,7 @@ func TestHashDiskBackedRowContainer(t *testing.T) { types := sqlbase.OneIntCol ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) + rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) err = rc.Init( ctx, false, /* shouldMark */ @@ -373,7 +373,7 @@ func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) { types := []types.T{*types.Int, *types.Int} ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - rc := MakeHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) + rc := NewHashDiskBackedRowContainer(nil, &evalCtx, &memoryMonitor, &diskMonitor, tempEngine) err = rc.Init( ctx, true, /* shouldMark */ diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index ca005eb6ffc4..81f0593128ff 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -16,8 +16,6 @@ import ( "fmt" "unsafe" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -134,7 +132,7 @@ type MemRowContainer struct { } var _ heap.Interface = &MemRowContainer{} -var _ SortableRowContainer = &MemRowContainer{} +var _ IndexedRowContainer = &MemRowContainer{} // Init initializes the MemRowContainer. The MemRowContainer uses evalCtx.Mon // to track memory usage. @@ -357,7 +355,7 @@ type DiskBackedRowContainer struct { diskMonitor *mon.BytesMonitor } -var _ SortableRowContainer = &DiskBackedRowContainer{} +var _ ReorderableRowContainer = &DiskBackedRowContainer{} // Init initializes a DiskBackedRowContainer. // Arguments: @@ -481,7 +479,7 @@ func (f *DiskBackedRowContainer) UsingDisk() bool { // memory error. Returns whether the DiskBackedRowContainer spilled to disk and // an error if one occurred while doing so. func (f *DiskBackedRowContainer) spillIfMemErr(ctx context.Context, err error) (bool, error) { - if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory { + if !sqlbase.IsOutOfMemoryError(err) { return false, nil } if spillErr := f.SpillToDisk(ctx); spillErr != nil { @@ -561,7 +559,9 @@ type DiskBackedIndexedRowContainer struct { DisableCache bool } -// MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer +var _ IndexedRowContainer = &DiskBackedIndexedRowContainer{} + +// NewDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer // with the given engine as the underlying store that rows are stored on when // it spills to disk. // Arguments: @@ -575,7 +575,7 @@ type DiskBackedIndexedRowContainer struct { // - diskMonitor is used to monitor this container's disk usage. // - rowCapacity (if not 0) specifies the number of rows in-memory container // should be preallocated for. -func MakeDiskBackedIndexedRowContainer( +func NewDiskBackedIndexedRowContainer( ordering sqlbase.ColumnOrdering, typs []types.T, evalCtx *tree.EvalContext, diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 19d40b68a824..77d79a3115f2 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -429,7 +429,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) mid := numRows / 2 for i := 0; i < mid; i++ { @@ -493,7 +493,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for _, row := range rows { if err := rc.AddRow(ctx, row); err != nil { @@ -588,7 +588,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { t.Fatal(err) @@ -654,7 +654,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { storedTypes[len(typs)] = sqlbase.OneIntCol[0] func() { - rc := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for i := 0; i < numRows; i++ { if err := rc.AddRow(ctx, rows[i]); err != nil { @@ -695,7 +695,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { storedTypes[len(typs)] = sqlbase.OneIntCol[0] func() { - d := MakeDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + d := NewDiskBackedIndexedRowContainer(ordering, typs, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer d.Close(ctx) if err := d.SpillToDisk(ctx); err != nil { t.Fatal(err) @@ -855,7 +855,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { accessPattern := generateAccessPattern(numRows) b.Run("InMemory", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) for i := 0; i < len(rows); i++ { if err := rc.AddRow(ctx, rows[i]); err != nil { @@ -877,7 +877,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { }) b.Run("OnDiskWithCache", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { b.Fatal(err) @@ -902,7 +902,7 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { }) b.Run("OnDiskWithoutCache", func(b *testing.B) { - rc := MakeDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + rc := NewDiskBackedIndexedRowContainer(nil, sqlbase.OneIntCol, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) defer rc.Close(ctx) if err := rc.SpillToDisk(ctx); err != nil { b.Fatal(err)