Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: reuse already allocated memory for the cache in a row container #34767

Merged
merged 1 commit into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 82 additions & 22 deletions pkg/sql/rowcontainer/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,13 @@ type DiskBackedIndexedRowContainer struct {
// [firstCachedRowPos, nextPosToCache).
firstCachedRowPos int
nextPosToCache int
indexedRowsCache ring.Buffer // the cache of up to maxIndexedRowsCacheSize contiguous rows
cacheMemAcc mon.BoundAccount
// indexedRowsCache is the cache of up to maxCacheSize contiguous rows.
indexedRowsCache ring.Buffer
// maxCacheSize indicates the maximum number of rows to be cached. It is
// initialized to maxIndexedRowsCacheSize and dynamically adjusted if OOM
// error is encountered.
maxCacheSize int
cacheMemAcc mon.BoundAccount
}

// MakeDiskBackedIndexedRowContainer creates a DiskBackedIndexedRowContainer
Expand Down Expand Up @@ -570,6 +575,7 @@ func MakeDiskBackedIndexedRowContainer(
d.scratchEncRow = make(sqlbase.EncDatumRow, len(d.storedTypes))
d.DiskBackedRowContainer = &DiskBackedRowContainer{}
d.DiskBackedRowContainer.Init(ordering, d.storedTypes, evalCtx, engine, memoryMonitor, diskMonitor, rowCapacity)
d.maxCacheSize = maxIndexedRowsCacheSize
d.cacheMemAcc = memoryMonitor.MakeBoundAccount()
return &d
}
Expand Down Expand Up @@ -692,28 +698,38 @@ func (f *DiskBackedIndexedRowContainer) GetRow(
}
row, rowIdx := rowWithIdx[:len(rowWithIdx)-1], rowWithIdx[len(rowWithIdx)-1].Datum
if idx, ok := rowIdx.(*tree.DInt); ok {
if f.indexedRowsCache.Len() == maxIndexedRowsCacheSize {
// The cache size is capped at maxIndexedRowsCacheSize, so we first
// remove the row with the smallest pos and advance
if f.indexedRowsCache.Len() == f.maxCacheSize {
// The cache size is capped at f.maxCacheSize, so we reuse the row
// with the smallest pos, put it as the last row, and advance
// f.firstCachedRowPos.
usage := sizeOfInt + int64(f.indexedRowsCache.GetFirst().(IndexedRow).Row.Size())
// TODO(yuzefovich): extend ring.Buffer to allow for reusing the
// allocated memory instead of allocating new one for every row.
f.indexedRowsCache.RemoveFirst()
// TODO(yuzefovich): investigate whether the pattern of growing and
// shrinking the memory account can be optimized.
f.cacheMemAcc.Shrink(ctx, usage)
f.firstCachedRowPos++
if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil {
return nil, err
}
} else {
// We choose to ignore minor details like IndexedRow overhead and
// the cache overhead.
usage := sizeOfInt + int64(row.Size())
if err := f.cacheMemAcc.Grow(ctx, usage); err != nil {
if sqlbase.IsOutOfMemoryError(err) {
// We hit the memory limit, so we need to cap the cache size
// and reuse the memory underlying first row in the cache.
if f.indexedRowsCache.Len() == 0 {
// The cache is empty, so there is no memory to be reused.
return nil, err
}
f.maxCacheSize = f.indexedRowsCache.Len()
if err := f.reuseFirstRowInCache(ctx, int(*idx), row); err != nil {
return nil, err
}
} else {
return nil, err
}
} else {
// We actually need to copy the row into memory.
ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)}
f.indexedRowsCache.AddLast(ir)
}
}
// We choose to ignore minor details like IndexedRow overhead and
// the cache overhead.
usage := sizeOfInt + int64(row.Size())
if err := f.cacheMemAcc.Grow(ctx, usage); err != nil {
return nil, err
}
// We actually need to copy the row into memory.
ir := IndexedRow{int(*idx), f.rowAlloc.CopyRow(row)}
f.indexedRowsCache.AddLast(ir)
f.nextPosToCache++
} else {
return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", idx)
Expand All @@ -733,6 +749,50 @@ func (f *DiskBackedIndexedRowContainer) GetRow(
return nil, errors.Errorf("unexpected last column type: should be DInt but found %T", rowIdx)
}

// reuseFirstRowInCache reuses the underlying memory of the first row in the
// cache to store 'row' and puts it as the last one in the cache. It adjusts
// the memory account accordingly and, if necessary, removes some first rows.
func (f *DiskBackedIndexedRowContainer) reuseFirstRowInCache(
ctx context.Context, idx int, row sqlbase.EncDatumRow,
) error {
newRowSize := row.Size()
for {
if f.indexedRowsCache.Len() == 0 {
return errors.Errorf("unexpectedly the cache of DiskBackedIndexedRowContainer contains zero rows")
}
indexedRowToReuse := f.indexedRowsCache.GetFirst().(IndexedRow)
oldRowSize := indexedRowToReuse.Row.Size()
delta := int64(newRowSize - oldRowSize)
if delta > 0 {
// New row takes up more memory than the old one.
if err := f.cacheMemAcc.Grow(ctx, delta); err != nil {
if sqlbase.IsOutOfMemoryError(err) {
// We need to actually reduce the cache size, so we remove the first
// row and adjust the memory account, maxCacheSize, and
// f.firstCachedRowPos accordingly.
f.indexedRowsCache.RemoveFirst()
f.cacheMemAcc.Shrink(ctx, int64(oldRowSize))
f.maxCacheSize--
f.firstCachedRowPos++
if f.indexedRowsCache.Len() == 0 {
return err
}
continue
}
return err
}
} else if delta < 0 {
f.cacheMemAcc.Shrink(ctx, -delta)
}
indexedRowToReuse.Idx = idx
copy(indexedRowToReuse.Row, row)
f.indexedRowsCache.RemoveFirst()
f.indexedRowsCache.AddLast(indexedRowToReuse)
f.firstCachedRowPos++
return nil
}
}

// getRowWithoutCache returns the row at requested position without using the
// cache. It utilizes the same disk row iterator along multiple consequent
// calls and rewinds the iterator only when it has been advanced further than
Expand Down
104 changes: 91 additions & 13 deletions pkg/sql/rowcontainer/row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func TestDiskBackedRowContainer(t *testing.T) {
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1))
defer memoryMonitor.Stop(ctx)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1))
defer diskMonitor.Stop(ctx)

defer func() {
if err := rc.UnsafeReset(ctx); err != nil {
Expand Down Expand Up @@ -411,7 +412,6 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
const numTestRuns = 10
const numRows = 10
const numCols = 2
rows := make([]sqlbase.EncDatumRow, numRows)
ordering := sqlbase.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
newOrdering := sqlbase.ColumnOrdering{{ColIdx: 1, Direction: encoding.Ascending}}

Expand All @@ -427,6 +427,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// index).
t.Run("SpillingHalfway", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -482,14 +483,15 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// to be returned. Then, it spills to disk and does the same check again.
t.Run("TestGetRow", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
sortedRows := indexedRows{rows: make([]IndexedRow, numRows)}
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
sortedRows.rows[i] = IndexedRow{Idx: i, Row: rows[i]}
}

sorter := sorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sort.Sort(&sorter)
if sorter.err != nil {
t.Fatal(sorter.err)
Expand Down Expand Up @@ -559,13 +561,94 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
}
})

// TestGetRowFromDiskWithLimitedMemory forces the container to spill to disk,
// adds all rows to it, sorts them, and checks that both the index and the
// row are what we expect by GetRow() to be returned. The goal is to test the
// behavior of capping the cache size and reusing the memory of the first
// rows in the cache, so we use the memory budget that accommodates only
// about half of all rows in the cache.
t.Run("TestGetRowWithLimitedMemory", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
budget := int64(10240)
memoryUsage := int64(0)
rows := make([]sqlbase.EncDatumRow, 0, numRows)
sortedRows := indexedRows{rows: make([]IndexedRow, 0, numRows)}
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for memoryUsage < 2*budget {
row := sqlbase.RandEncDatumRowOfTypes(rng, types)
memoryUsage += int64(row.Size())
rows = append(rows, row)
sortedRows.rows = append(sortedRows.rows, IndexedRow{Idx: len(sortedRows.rows), Row: row})
}

memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget))
defer memoryMonitor.Stop(ctx)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
defer diskMonitor.Stop(ctx)

sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering}
sort.Sort(&sorter)
if sorter.err != nil {
t.Fatal(sorter.err)
}

func() {
rc := MakeDiskBackedIndexedRowContainer(ordering, types, &evalCtx, tempEngine, &memoryMonitor, &diskMonitor, 0 /* rowCapacity */)
defer rc.Close(ctx)
if err := rc.spillToDisk(ctx); err != nil {
t.Fatal(err)
}
for _, row := range rows {
if err := rc.AddRow(ctx, row); err != nil {
t.Fatal(err)
}
}
if !rc.Spilled() {
t.Fatal("unexpectedly using memory")
}
rc.Sort(ctx)

// Check that GetRow returns the row we expect at each position.
for i := 0; i < len(rows); i++ {
readRow, err := rc.GetRow(ctx, i)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedRow := sortedRows.rows[i]
readOrderingDatum, err := readRow.GetDatum(ordering[0].ColIdx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if readOrderingDatum.Compare(&evalCtx, expectedRow.Row[ordering[0].ColIdx].Datum) != 0 {
// We're skipping comparison if both rows are equal on the ordering
// column since in this case the order of indexed rows after
// sorting is nondeterministic.
if readRow.GetIdx() != expectedRow.GetIdx() {
t.Fatalf("read row has different idx that what we expect")
}
for col, expectedDatum := range expectedRow.Row {
readDatum, err := readRow.GetDatum(col)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cmp := readDatum.Compare(&evalCtx, expectedDatum.Datum); cmp != 0 {
t.Fatalf("read row is not equal to expected one")
}
}
}
}
}()
}
})

// ReorderingInMemory initializes a DiskBackedIndexedRowContainer with one
// ordering, adds all rows to it, sorts it and makes sure that the rows are
// sorted as expected. Then, it reorders the container to a different
// ordering, sorts it and verifies that the rows are in the order we expect.
// Only in-memory containers should be used.
t.Run("ReorderingInMemory", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -606,6 +689,7 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) {
// container is forced to spill to disk right after initialization.
t.Run("ReorderingOnDisk", func(t *testing.T) {
for i := 0; i < numTestRuns; i++ {
rows := make([]sqlbase.EncDatumRow, numRows)
types := sqlbase.RandSortingColumnTypes(rng, numCols)
for i := 0; i < numRows; i++ {
rows[i] = sqlbase.RandEncDatumRowOfTypes(rng, types)
Expand Down Expand Up @@ -659,19 +743,19 @@ func (ir indexedRows) Len() int {
// TODO(yuzefovich): this is a duplicate of partitionSorter from windower.go.
// There are possibly couple of other duplicates as well in other files, so we
// should refactor it and probably extract the code into a new package.
type sorter struct {
type rowsSorter struct {
evalCtx *tree.EvalContext
rows indexedRows
ordering sqlbase.ColumnOrdering
err error
}

func (n *sorter) Len() int { return n.rows.Len() }
func (n *rowsSorter) Len() int { return n.rows.Len() }

func (n *sorter) Swap(i, j int) {
func (n *rowsSorter) Swap(i, j int) {
n.rows.rows[i], n.rows.rows[j] = n.rows.rows[j], n.rows.rows[i]
}
func (n *sorter) Less(i, j int) bool {
func (n *rowsSorter) Less(i, j int) bool {
if n.err != nil {
// An error occurred in previous calls to Less(). We want to be done with
// sorting and to propagate that error to the caller of Sort().
Expand All @@ -685,13 +769,7 @@ func (n *sorter) Less(i, j int) bool {
return cmp < 0
}

// sorter implements the tree.PeerGroupChecker interface.
func (n *sorter) InSameGroup(i, j int) (bool, error) {
cmp, err := n.Compare(i, j)
return cmp == 0, err
}

func (n *sorter) Compare(i, j int) (int, error) {
func (n *rowsSorter) Compare(i, j int) (int, error) {
ra, rb := n.rows.rows[i], n.rows.rows[j]
for _, o := range n.ordering {
da, err := ra.GetDatum(o.ColIdx)
Expand Down