From a132513613975c4b0bd74fa292bd8e8fc29e11f5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sun, 10 Feb 2019 10:58:02 -0800 Subject: [PATCH] sql: reuse already allocated memory for the cache in a row container Previously, we would always allocate new memory for every row that we put in the cache of DiskBackedIndexedRowContainer and simply discard the memory underlying the row that we remove from the cache. Now, we're reusing that memory. Release note: None --- pkg/sql/rowcontainer/row_container.go | 104 ++++++++++++++++----- pkg/sql/rowcontainer/row_container_test.go | 104 ++++++++++++++++++--- 2 files changed, 173 insertions(+), 35 deletions(-) diff --git a/pkg/sql/rowcontainer/row_container.go b/pkg/sql/rowcontainer/row_container.go index 427bdc29d514..d85ab61ec7c9 100644 --- a/pkg/sql/rowcontainer/row_container.go +++ b/pkg/sql/rowcontainer/row_container.go @@ -534,8 +534,13 @@ type DiskBackedIndexedRowContainer struct { // [firstCachedRowPos, nextPosToCache). firstCachedRowPos int nextPosToCache int - indexedRowsCache ring.Buffer // the cache of up to maxIndexedRowsCacheSize contiguous rows - cacheMemAcc mon.BoundAccount + // indexedRowsCache is the cache of up to maxCacheSize contiguous rows. + indexedRowsCache ring.Buffer + // maxCacheSize indicates the maximum number of rows to be cached. It is + // initialized to maxIndexedRowsCacheSize and dynamically adjusted if OOM + // error is encountered. + maxCacheSize int + cacheMemAcc mon.BoundAccount } // MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer @@ -570,6 +575,7 @@ func MakeDiskBackedIndexedRowContainer( d.scratchEncRow = make(sqlbase.EncDatumRow, len(d.storedTypes)) d.DiskBackedRowContainer = &DiskBackedRowContainer{} d.DiskBackedRowContainer.Init(ordering, d.storedTypes, evalCtx, engine, memoryMonitor, diskMonitor, rowCapacity) + d.maxCacheSize = maxIndexedRowsCacheSize d.cacheMemAcc = memoryMonitor.MakeBoundAccount() return &d } @@ -692,28 +698,38 @@ func (f *DiskBackedIndexedRowContainer) GetRow( } row, rowIdx := rowWithIdx[:len(rowWithIdx)-1], rowWithIdx[len(rowWithIdx)-1].Datum if idx, ok := rowIdx.(*tree.DInt); ok { - if f.indexedRowsCache.Len() == maxIndexedRowsCacheSize { - // The cache size is capped at maxIndexedRowsCacheSize, so we first - // remove the row with the smallest pos and advance + if f.indexedRowsCache.Len() == f.maxCacheSize { + // The cache size is capped at f.maxCacheSize, so we reuse the row + // with the smallest pos, put it as the last row, and advance // f.firstCachedRowPos. - usage := sizeOfInt + int64(f.indexedRowsCache.GetFirst().(IndexedRow).Row.Size()) - // TODO(yuzefovich): extend ring.Buffer to allow for reusing the - // allocated memory instead of allocating new one for every row. - f.indexedRowsCache.RemoveFirst() - // TODO(yuzefovich): investigate whether the pattern of growing and - // shrinking the memory account can be optimized. - f.cacheMemAcc.Shrink(ctx, usage) - f.firstCachedRowPos++ + if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil { + return nil, err + } + } else { + // We choose to ignore minor details like IndexedRow overhead and + // the cache overhead. + usage := sizeOfInt + int64(row.Size()) + if err := f.cacheMemAcc.Grow(ctx, usage); err != nil { + if sqlbase.IsOutOfMemoryError(err) { + // We hit the memory limit, so we need to cap the cache size + // and reuse the memory underlying first row in the cache. + if f.indexedRowsCache.Len() == 0 { + // The cache is empty, so there is no memory to be reused. + return nil, err + } + f.maxCacheSize = f.indexedRowsCache.Len() + if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil { + return nil, err + } + } else { + return nil, err + } + } else { + // We actually need to copy the row into memory. + ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)} + f.indexedRowsCache.AddLast(ir) + } } - // We choose to ignore minor details like IndexedRow overhead and - // the cache overhead. - usage := sizeOfInt + int64(row.Size()) - if err := f.cacheMemAcc.Grow(ctx, usage); err != nil { - return nil, err - } - // We actually need to copy the row into memory. - ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)} - f.indexedRowsCache.AddLast(ir) f.nextPosToCache++ } else { return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", idx) @@ -733,6 +749,50 @@ func (f *DiskBackedIndexedRowContainer) GetRow( return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", rowIdx) } +// reuseFirstRowInCache reuses the underlying memory of the first row in the +// cache to store 'row' and puts it as the last one in the cache. It adjusts +// the memory account accordingly and, if necessary, removes some first rows. +func (f *DiskBackedIndexedRowContainer) reuseFirstRowInCache( + ctx context.Context, idx int, row sqlbase.EncDatumRow, +) error { + newRowSize := row.Size() + for { + if f.indexedRowsCache.Len() == 0 { + return errors.Errorf("unexpectedly the cache of DiskBackedIndexedRowContainer contains zero rows") + } + indexedRowToReuse := f.indexedRowsCache.GetFirst().(IndexedRow) + oldRowSize := indexedRowToReuse.Row.Size() + delta := int64(newRowSize - oldRowSize) + if delta > 0 { + // New row takes up more memory than the old one. + if err := f.cacheMemAcc.Grow(ctx, delta); err != nil { + if sqlbase.IsOutOfMemoryError(err) { + // We need to actually reduce the cache size, so we remove the first + // row and adjust the memory account, maxCacheSize, and + // f.firstCachedRowPos accordingly. + f.indexedRowsCache.RemoveFirst() + f.cacheMemAcc.Shrink(ctx, int64(oldRowSize)) + f.maxCacheSize-- + f.firstCachedRowPos++ + if f.indexedRowsCache.Len() == 0 { + return err + } + continue + } + return err + } + } else if delta < 0 { + f.cacheMemAcc.Shrink(ctx, -delta) + } + indexedRowToReuse.Idx = idx + copy(indexedRowToReuse.Row, row) + f.indexedRowsCache.RemoveFirst() + f.indexedRowsCache.AddLast(indexedRowToReuse) + f.firstCachedRowPos++ + return nil + } +} + // getRowWithoutCache returns the row at requested position without using the // cache. It utilizes the same disk row iterator along multiple consequent // calls and rewinds the iterator only when it has been advanced further than diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 8db929b6a8ef..840a82712a30 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -316,6 +316,7 @@ func TestDiskBackedRowContainer(t *testing.T) { memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + defer diskMonitor.Stop(ctx) defer func() { if err := rc.UnsafeReset(ctx); err != nil { @@ -411,7 +412,6 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { const numTestRuns = 10 const numRows = 10 const numCols = 2 - rows := make([]sqlbase.EncDatumRow, numRows) ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} newOrdering := sqlbase.ColumnOrdering{{ColIdx: 1, Direction: encoding.Ascending}} @@ -427,6 +427,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // index). t.Run("SpillingHalfway", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -482,6 +483,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // to be returned. Then, it spills to disk and does the same check again. t.Run("TestGetRow", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) sortedRows := indexedRows{rows: make([]IndexedRow, numRows)} types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { @@ -489,7 +491,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { sortedRows.rows[i] = IndexedRow{Idx: i, Row: rows[i]} } - sorter := sorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} + sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} sort.Sort(&sorter) if sorter.err != nil { t.Fatal(sorter.err) @@ -559,6 +561,86 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { } }) + // TestGetRowFromDiskWithLimitedMemory forces the container to spill to disk, + // adds all rows to it, sorts them, and checks that both the index and the + // row are what we expect by GetRow() to be returned. The goal is to test the + // behavior of capping the cache size and reusing the memory of the first + // rows in the cache, so we use the memory budget that accommodates only + // about half of all rows in the cache. + t.Run("TestGetRowWithLimitedMemory", func(t *testing.T) { + for i := 0; i < numTestRuns; i++ { + budget := int64(10240) + memoryUsage := int64(0) + rows := make([]sqlbase.EncDatumRow, 0, numRows) + sortedRows := indexedRows{rows: make([]IndexedRow, 0, numRows)} + types := sqlbase.RandSortingColumnTypes(rng, numCols) + for memoryUsage < 2*budget { + row := sqlbase.RandEncDatumRowOfTypes(rng, types) + memoryUsage += int64(row.Size()) + rows = append(rows, row) + sortedRows.rows = append(sortedRows.rows, IndexedRow{Idx: len(sortedRows.rows), Row: row}) + } + + memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget)) + defer memoryMonitor.Stop(ctx) + diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + defer diskMonitor.Stop(ctx) + + sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} + sort.Sort(&sorter) + if sorter.err != nil { + t.Fatal(sorter.err) + } + + func() { + rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */) + defer rc.Close(ctx) + if err := rc.spillToDisk(ctx); err != nil { + t.Fatal(err) + } + for _, row := range rows { + if err := rc.AddRow(ctx, row); err != nil { + t.Fatal(err) + } + } + if !rc.Spilled() { + t.Fatal("unexpectedly using memory") + } + rc.Sort(ctx) + + // Check that GetRow returns the row we expect at each position. + for i := 0; i < len(rows); i++ { + readRow, err := rc.GetRow(ctx, i) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expectedRow := sortedRows.rows[i] + readOrderingDatum, err := readRow.GetDatum(ordering[0].ColIdx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if readOrderingDatum.Compare(&evalCtx, expectedRow.Row[ordering[0].ColIdx].Datum) != 0 { + // We're skipping comparison if both rows are equal on the ordering + // column since in this case the order of indexed rows after + // sorting is nondeterministic. + if readRow.GetIdx() != expectedRow.GetIdx() { + t.Fatalf("read row has different idx that what we expect") + } + for col, expectedDatum := range expectedRow.Row { + readDatum, err := readRow.GetDatum(col) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cmp := readDatum.Compare(&evalCtx, expectedDatum.Datum); cmp != 0 { + t.Fatalf("read row is not equal to expected one") + } + } + } + } + }() + } + }) + // ReorderingInMemory initializes a DiskBackedIndexedRowContainer with one // ordering, adds all rows to it, sorts it and makes sure that the rows are // sorted as expected. Then, it reorders the container to a different @@ -566,6 +648,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // Only in-memory containers should be used. t.Run("ReorderingInMemory", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -606,6 +689,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { // container is forced to spill to disk right after initialization. t.Run("ReorderingOnDisk", func(t *testing.T) { for i := 0; i < numTestRuns; i++ { + rows := make([]sqlbase.EncDatumRow, numRows) types := sqlbase.RandSortingColumnTypes(rng, numCols) for i := 0; i < numRows; i++ { rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types) @@ -659,19 +743,19 @@ func (ir indexedRows) Len() int { // TODO(yuzefovich): this is a duplicate of partitionSorter from windower.go. // There are possibly couple of other duplicates as well in other files, so we // should refactor it and probably extract the code into a new package. -type sorter struct { +type rowsSorter struct { evalCtx *tree.EvalContext rows indexedRows ordering sqlbase.ColumnOrdering err error } -func (n *sorter) Len() int { return n.rows.Len() } +func (n *rowsSorter) Len() int { return n.rows.Len() } -func (n *sorter) Swap(i, j int) { +func (n *rowsSorter) Swap(i, j int) { n.rows.rows[i], n.rows.rows[j] = n.rows.rows[j], n.rows.rows[i] } -func (n *sorter) Less(i, j int) bool { +func (n *rowsSorter) Less(i, j int) bool { if n.err != nil { // An error occurred in previous calls to Less(). We want to be done with // sorting and to propagate that error to the caller of Sort(). @@ -685,13 +769,7 @@ func (n *sorter) Less(i, j int) bool { return cmp < 0 } -// sorter implements the tree.PeerGroupChecker interface. -func (n *sorter) InSameGroup(i, j int) (bool, error) { - cmp, err := n.Compare(i, j) - return cmp == 0, err -} - -func (n *sorter) Compare(i, j int) (int, error) { +func (n *rowsSorter) Compare(i, j int) (int, error) { ra, rb := n.rows.rows[i], n.rows.rows[j] for _, o := range n.ordering { da, err := ra.GetDatum(o.ColIdx)