Skip to content

Commit

Permalink
Merge pull request #17302 from RaduBerinde/row-container-stuff
Browse files Browse the repository at this point in the history
sqlbase, distsql: (memory) row container fixes
  • Loading branch information
RaduBerinde authored Jul 31, 2017
2 parents c25b377 + 9712cc2 commit af380d9
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 76 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/disk_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func makeDiskRowContainer(
ctx context.Context,
types []sqlbase.ColumnType,
ordering sqlbase.ColumnOrdering,
rowContainer memRowContainer,
rowContainer *memRowContainer,
e engine.Engine,
) (diskRowContainer, error) {
diskMap := engine.NewRocksDBMap(e)
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/distsqlrun/disk_row_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestDiskRowContainer(t *testing.T) {
row := sqlbase.EncDatumRow(sqlbase.RandEncDatumSliceOfTypes(rng, types))
func() {
d, err := makeDiskRowContainer(
ctx, types, ordering, memRowContainer{}, tempEngine,
ctx, types, ordering, &memRowContainer{}, tempEngine,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -171,7 +171,8 @@ func TestDiskRowContainer(t *testing.T) {
func() {
// Make the diskRowContainer with half of these rows and insert
// the other half normally.
memoryContainer := makeRowContainer(ordering, types, &evalCtx)
var memoryContainer memRowContainer
memoryContainer.init(ordering, types, &evalCtx)
defer memoryContainer.Close(ctx)
midIdx := len(rows) / 2
for i := 0; i < midIdx; i++ {
Expand All @@ -184,7 +185,7 @@ func TestDiskRowContainer(t *testing.T) {
ctx,
types,
ordering,
memoryContainer,
&memoryContainer,
tempEngine,
)
if err != nil {
Expand All @@ -199,7 +200,8 @@ func TestDiskRowContainer(t *testing.T) {

// Make another row container that stores all the rows then sort
// it to compare equality.
sortedRows := makeRowContainer(ordering, types, &evalCtx)
var sortedRows memRowContainer
sortedRows.init(ordering, types, &evalCtx)
defer sortedRows.Close(ctx)
for _, row := range rows {
if err := sortedRows.AddRow(ctx, row); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ func newHashJoiner(
buckets: make(map[string]bucket),
bucketsAcc: flowCtx.evalCtx.Mon.MakeBoundAccount(),
}
h.rows[leftSide] = makeRowContainer(nil /* ordering */, leftSource.Types(), &flowCtx.evalCtx)
h.rows[rightSide] = makeRowContainer(nil /* ordering */, rightSource.Types(), &flowCtx.evalCtx)
h.rows[leftSide].init(nil /* ordering */, leftSource.Types(), &flowCtx.evalCtx)
h.rows[rightSide].init(nil /* ordering */, rightSource.Types(), &flowCtx.evalCtx)

numMergedColumns := 0
if spec.MergedColumns {
Expand Down
74 changes: 36 additions & 38 deletions pkg/sql/distsqlrun/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,91 +94,89 @@ type memRowContainer struct {
var _ heap.Interface = &memRowContainer{}
var _ sortableRowContainer = &memRowContainer{}

func makeRowContainer(
func (mc *memRowContainer) init(
ordering sqlbase.ColumnOrdering, types []sqlbase.ColumnType, evalCtx *parser.EvalContext,
) memRowContainer {
) {
acc := evalCtx.Mon.MakeBoundAccount()
return memRowContainer{
RowContainer: sqlbase.MakeRowContainer(acc, sqlbase.ColTypeInfoFromColTypes(types), 0),
types: types,
ordering: ordering,
scratchRow: make(parser.Datums, len(types)),
scratchEncRow: make(sqlbase.EncDatumRow, len(types)),
evalCtx: evalCtx,
}
mc.RowContainer.Init(acc, sqlbase.ColTypeInfoFromColTypes(types), 0)
mc.types = types
mc.ordering = ordering
mc.scratchRow = make(parser.Datums, len(types))
mc.scratchEncRow = make(sqlbase.EncDatumRow, len(types))
mc.evalCtx = evalCtx
}

// Less is part of heap.Interface and is only meant to be used internally.
func (sv *memRowContainer) Less(i, j int) bool {
cmp := sqlbase.CompareDatums(sv.ordering, sv.evalCtx, sv.At(i), sv.At(j))
if sv.invertSorting {
func (mc *memRowContainer) Less(i, j int) bool {
cmp := sqlbase.CompareDatums(mc.ordering, mc.evalCtx, mc.At(i), mc.At(j))
if mc.invertSorting {
cmp = -cmp
}
return cmp < 0
}

// EncRow returns the idx-th row as an EncDatumRow. The slice itself is reused
// so it is only valid until the next call to EncRow.
func (sv *memRowContainer) EncRow(idx int) sqlbase.EncDatumRow {
datums := sv.At(idx)
func (mc *memRowContainer) EncRow(idx int) sqlbase.EncDatumRow {
datums := mc.At(idx)
for i, d := range datums {
sv.scratchEncRow[i] = sqlbase.DatumToEncDatum(sv.types[i], d)
mc.scratchEncRow[i] = sqlbase.DatumToEncDatum(mc.types[i], d)
}
return sv.scratchEncRow
return mc.scratchEncRow
}

// AddRow adds a row to the container.
func (sv *memRowContainer) AddRow(ctx context.Context, row sqlbase.EncDatumRow) error {
if len(row) != len(sv.types) {
log.Fatalf(ctx, "invalid row length %d, expected %d", len(row), len(sv.types))
func (mc *memRowContainer) AddRow(ctx context.Context, row sqlbase.EncDatumRow) error {
if len(row) != len(mc.types) {
log.Fatalf(ctx, "invalid row length %d, expected %d", len(row), len(mc.types))
}
for i := range row {
err := row[i].EnsureDecoded(&sv.datumAlloc)
err := row[i].EnsureDecoded(&mc.datumAlloc)
if err != nil {
return err
}
sv.scratchRow[i] = row[i].Datum
mc.scratchRow[i] = row[i].Datum
}
_, err := sv.RowContainer.AddRow(ctx, sv.scratchRow)
_, err := mc.RowContainer.AddRow(ctx, mc.scratchRow)
return err
}

func (sv *memRowContainer) Sort() {
sv.invertSorting = false
sort.Sort(sv)
func (mc *memRowContainer) Sort() {
mc.invertSorting = false
sort.Sort(mc)
}

// Push is part of heap.Interface.
func (sv *memRowContainer) Push(_ interface{}) { panic("unimplemented") }
func (mc *memRowContainer) Push(_ interface{}) { panic("unimplemented") }

// Pop is part of heap.Interface.
func (sv *memRowContainer) Pop() interface{} { panic("unimplemented") }
func (mc *memRowContainer) Pop() interface{} { panic("unimplemented") }

// MaybeReplaceMax replaces the maximum element with the given row, if it is smaller.
// Assumes InitMaxHeap was called.
func (sv *memRowContainer) MaybeReplaceMax(row sqlbase.EncDatumRow) error {
max := sv.At(0)
cmp, err := row.CompareToDatums(&sv.datumAlloc, sv.ordering, sv.evalCtx, max)
func (mc *memRowContainer) MaybeReplaceMax(row sqlbase.EncDatumRow) error {
max := mc.At(0)
cmp, err := row.CompareToDatums(&mc.datumAlloc, mc.ordering, mc.evalCtx, max)
if err != nil {
return err
}
if cmp < 0 {
// row is smaller than the max; replace.
for i := range row {
if err := row[i].EnsureDecoded(&sv.datumAlloc); err != nil {
if err := row[i].EnsureDecoded(&mc.datumAlloc); err != nil {
return err
}
max[i] = row[i].Datum
}
heap.Fix(sv, 0)
heap.Fix(mc, 0)
}
return nil
}

// InitMaxHeap rearranges the rows in the rowContainer into a Max-Heap.
func (sv *memRowContainer) InitMaxHeap() {
sv.invertSorting = true
heap.Init(sv)
func (mc *memRowContainer) InitMaxHeap() {
mc.invertSorting = true
heap.Init(mc)
}

// memRowIterator is a rowIterator that iterates over a memRowContainer. This
Expand All @@ -194,8 +192,8 @@ var _ rowIterator = memRowIterator{}
// memRowContainer. Note that this iterator doesn't iterate over a snapshot
// of memRowContainer and that it deletes rows as soon as they are iterated
// over.
func (sv *memRowContainer) NewIterator(_ context.Context) rowIterator {
return memRowIterator{memRowContainer: sv}
func (mc *memRowContainer) NewIterator(_ context.Context) rowIterator {
return memRowIterator{memRowContainer: mc}
}

// Rewind implements the rowIterator interface.
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/distsqlrun/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func (s *sorter) Run(ctx context.Context, wg *sync.WaitGroup) {

evalCtx := s.flowCtx.evalCtx
evalCtx.Mon = &limitedMon
sv = makeRowContainer(s.ordering, s.rawInput.Types(), &evalCtx)
sv.init(s.ordering, s.rawInput.Types(), &evalCtx)
} else {
sv = makeRowContainer(s.ordering, s.rawInput.Types(), &s.flowCtx.evalCtx)
sv.init(s.ordering, s.rawInput.Types(), &s.flowCtx.evalCtx)
}
// Construct the optimal sorterStrategy.
var ss sorterStrategy
Expand All @@ -130,13 +130,13 @@ func (s *sorter) Run(ctx context.Context, wg *sync.WaitGroup) {
// optimizations are possible so we simply load all rows into memory and
// sort all values in-place. It has a worst-case time complexity of
// O(n*log(n)) and a worst-case space complexity of O(n).
ss = newSortAllStrategy(sv, useTempStorage)
ss = newSortAllStrategy(&sv, useTempStorage)
} else {
// No specified ordering match length but specified limit; we can optimize
// our sort procedure by maintaining a max-heap populated with only the
// smallest k rows seen. It has a worst-case time complexity of
// O(n*log(k)) and a worst-case space complexity of O(k).
ss = newSortTopKStrategy(sv, s.count)
ss = newSortTopKStrategy(&sv, s.count)
}
} else {
// Ordering match length is specified. We will be able to use existing
Expand All @@ -146,7 +146,7 @@ func (s *sorter) Run(ctx context.Context, wg *sync.WaitGroup) {
// chunk and then output.
// TODO(irfansharif): Add optimization for case where both ordering match
// length and limit is specified.
ss = newSortChunksStrategy(sv)
ss = newSortChunksStrategy(&sv)
}

sortErr := ss.Execute(ctx, s)
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/distsqlrun/sorterstrategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ type sorterStrategy interface {
//
// The strategy is intended to be used when all values need to be sorted.
type sortAllStrategy struct {
rows memRowContainer
rows *memRowContainer
useTempStorage bool
}

var _ sorterStrategy = &sortAllStrategy{}

func newSortAllStrategy(rows memRowContainer, useTempStorage bool) sorterStrategy {
func newSortAllStrategy(rows *memRowContainer, useTempStorage bool) sorterStrategy {
return &sortAllStrategy{
rows: rows,
useTempStorage: useTempStorage,
Expand All @@ -61,7 +61,7 @@ func newSortAllStrategy(rows memRowContainer, useTempStorage bool) sorterStrateg
// memory error, the strategy will fall back to use disk.
func (ss *sortAllStrategy) Execute(ctx context.Context, s *sorter) error {
defer ss.rows.Close(ctx)
row, err := ss.executeImpl(ctx, s, &ss.rows)
row, err := ss.executeImpl(ctx, s, ss.rows)
// TODO(asubiotto): A memory error could also be returned if a limit other
// than the COCKROACH_WORK_MEM was reached. We should distinguish between
// these cases and log the event to facilitate debugging of queries that
Expand Down Expand Up @@ -162,13 +162,13 @@ func (ss *sortAllStrategy) executeImpl(
//
// TODO(asubiotto): Use diskRowContainer for these other strategies.
type sortTopKStrategy struct {
rows memRowContainer
rows *memRowContainer
k int64
}

var _ sorterStrategy = &sortTopKStrategy{}

func newSortTopKStrategy(rows memRowContainer, k int64) sorterStrategy {
func newSortTopKStrategy(rows *memRowContainer, k int64) sorterStrategy {
ss := &sortTopKStrategy{
rows: rows,
k: k,
Expand Down Expand Up @@ -227,13 +227,13 @@ func (ss *sortTopKStrategy) Execute(ctx context.Context, s *sorter) error {
// If we're scanning an index with a prefix matching an ordering prefix, we only accumulate values
// for equal fields in this prefix, sort the accumulated chunk and then output.
type sortChunksStrategy struct {
rows memRowContainer
rows *memRowContainer
alloc sqlbase.DatumAlloc
}

var _ sorterStrategy = &sortChunksStrategy{}

func newSortChunksStrategy(rows memRowContainer) sorterStrategy {
func newSortChunksStrategy(rows *memRowContainer) sorterStrategy {
return &sortChunksStrategy{
rows: rows,
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,6 @@ SELECT COUNT(*) FROM one AS a, one AS b, two AS c
----
1000

statement error memory budget exceeded
SELECT SUM(d1.a), MAX(d1.a), MIN(d1.a) FROM data as d1, data as d2 GROUP BY (d1.b, d1.c, d1.d, d2.a, d2.b, d2.c, d2.d)

query T
SELECT "URL" FROM [EXPLAIN (DISTSQL) SELECT SUM(a), SUM(b), SUM(c) FROM data GROUP BY d HAVING SUM(a+b) > 10]
----
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/mon/mem_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,16 @@ func (b *BoundAccount) ResizeItem(ctx context.Context, oldSz, newSz int64) error
return b.mon.ResizeItem(ctx, &b.MemoryAccount, oldSz, newSz)
}

// Grow is an accessor for b.mon.Grow.
// Grow is an accessor for b.mon.GrowAccount.
func (b *BoundAccount) Grow(ctx context.Context, x int64) error {
return b.mon.GrowAccount(ctx, &b.MemoryAccount, x)
}

// Shrink is an accessor for b.mon.ShrinkAccount.
func (b *BoundAccount) Shrink(ctx context.Context, x int64) {
b.mon.ShrinkAccount(ctx, &b.MemoryAccount, x)
}

// reserveMemory declares an allocation to this monitor. An error is
// returned if the allocation is denied.
func (mm *MemoryMonitor) reserveMemory(ctx context.Context, x int64) error {
Expand Down
39 changes: 25 additions & 14 deletions pkg/sql/sqlbase/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/mon"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/util"
)

const (
Expand Down Expand Up @@ -72,6 +73,10 @@ type RowContainer struct {
// memAcc tracks the current memory consumption of this
// RowContainer.
memAcc mon.BoundAccount

// We should not copy this structure around; each copy would have a different
// memAcc (among other things like aliasing chunks).
noCopy util.NoCopy
}

// ColTypeInfo is a type that allows multiple representations of column type
Expand Down Expand Up @@ -139,21 +144,19 @@ func (ti ColTypeInfo) Type(idx int) parser.Type {
// column selections could cause unchecked and potentially dangerous
// memory growth.
func NewRowContainer(acc mon.BoundAccount, ti ColTypeInfo, rowCapacity int) *RowContainer {
c := MakeRowContainer(acc, ti, rowCapacity)
return &c
c := &RowContainer{}
c.Init(acc, ti, rowCapacity)
return c
}

// MakeRowContainer is the non-pointer version of NewRowContainer, suitable to
// avoid unnecessary indirections when RowContainer is already part of an on-heap
// structure.
func MakeRowContainer(acc mon.BoundAccount, ti ColTypeInfo, rowCapacity int) RowContainer {
// Init can be used instead of NewRowContainer if we have a RowContainer that is
// already part of an on-heap structure.
func (c *RowContainer) Init(acc mon.BoundAccount, ti ColTypeInfo, rowCapacity int) {
nCols := ti.NumColumns()

c := RowContainer{
numCols: nCols,
memAcc: acc,
preallocChunks: 1,
}
c.numCols = nCols
c.memAcc = acc
c.preallocChunks = 1

if nCols != 0 {
c.rowsPerChunk = (targetChunkSize + nCols - 1) / nCols
Expand All @@ -179,8 +182,6 @@ func MakeRowContainer(acc mon.BoundAccount, ti ColTypeInfo, rowCapacity int) Row
// chunk and the slice pointing at the chunk.
c.chunkMemSize = SizeOfDatum * int64(c.rowsPerChunk*c.numCols)
c.chunkMemSize += SizeOfDatums

return c
}

// Clear resets the container and releases the associated memory. This allows
Expand Down Expand Up @@ -304,10 +305,20 @@ func (c *RowContainer) PopFirst() {
if c.numCols != 0 {
c.deletedRows++
if c.deletedRows == c.rowsPerChunk {
c.deletedRows = 0
// We release the memory for rows in chunks. This includes the
// chunk slice (allocated by allocChunks) and the Datums.
size := c.chunkMemSize
for i, pos := 0, 0; i < c.rowsPerChunk; i, pos = i+1, pos+c.numCols {
size += c.rowSize(c.chunks[0][pos : pos+c.numCols])
}
// Reset the pointer so the slice can be garbage collected.
c.chunks[0] = nil
c.deletedRows = 0
c.chunks = c.chunks[1:]

// We don't have a context plumbed here, but that's ok: it's not actually
// used in the shrink paths.
c.memAcc.Shrink(context.TODO(), size)
}
}
}
Expand Down
Loading

0 comments on commit af380d9

Please sign in to comment.