Skip to content

Commit

Permalink
colserde: perform memory accounting for scratch slices in the converter
Browse files Browse the repository at this point in the history
This commit introduces the memory accounting for values and offsets
scratch slices that are being reused across `BatchToArrow` calls since
recently. This required a lot of plumbing to propagate the memory
account from all places. The converter is careful to only grow and
shrink the account by its own usage, so the same memory account can be
reused across multiple converters (as long as there is no concurrency
going on, and we only can have concurrency for hash router outputs, so
there we give each output a separate account).

An additional minor change is that now in `diskQueue.Enqueue` we
properly `Close` the `FileSerializer` before `nil`-ing it out. This
isn't a problem per se since it is the caller's responsibility to close
the account used by the serializer, but it's nice to properly release
the accounted for bytes.

Release note: None
  • Loading branch information
yuzefovich committed Jan 4, 2023
1 parent 5c892eb commit 2922777
Show file tree
Hide file tree
Showing 57 changed files with 434 additions and 205 deletions.
1 change: 1 addition & 0 deletions pkg/col/colserde/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/duration",
"//pkg/util/mon",
"@com_github_apache_arrow_go_arrow//array",
"@com_github_apache_arrow_go_arrow//memory",
"@com_github_cockroachdb_errors//:errors",
Expand Down
58 changes: 51 additions & 7 deletions pkg/col/colserde/arrowbatchconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package colserde

import (
"context"
"encoding/binary"
"fmt"
"reflect"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand All @@ -47,8 +49,14 @@ const (
type ArrowBatchConverter struct {
typs []*types.T

// TODO(yuzefovich): perform memory accounting for these slices.
scratch struct {
// Fields below are only used during batch-to-arrow conversion.

// usesOffsets indicates whether at least one vector needs values and
// offsets scratch slices (in other words, it needs three buffers).
usesOffsets bool
acc *mon.BoundAccount
accountedFor int64
scratch struct {
// arrowData is used as scratch space returned as the corresponding
// conversion result.
arrowData []array.Data
Expand All @@ -68,13 +76,23 @@ type ArrowBatchConverter struct {
// NewArrowBatchConverter converts coldata.Batches to []array.Data and back
// again according to the schema specified by typs. Converting data that does
// not conform to typs results in undefined behavior.
func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchConverter, error) {
c := &ArrowBatchConverter{typs: typs}
//
// acc must be a non-nil unlimited memory account unless ArrowToBatchOnly mode
// is used. The account will be shrunk in Release, however, it is the caller's
// responsibility to close the account.
// NOTE: Release can only be called before the account is closed.
func NewArrowBatchConverter(
typs []*types.T, mode ConversionMode, acc *mon.BoundAccount,
) (*ArrowBatchConverter, error) {
c := &ArrowBatchConverter{typs: typs, acc: acc}
if mode == ArrowToBatchOnly {
// All the allocations below are only used in BatchToArrow, so we don't
// need to allocate them.
return c, nil
}
if acc == nil {
return nil, errors.AssertionFailedf("nil account is given with the conversion mode other than ArrowToBatchOnly")
}
c.scratch.arrowData = make([]array.Data, len(typs))
c.scratch.buffers = make([][]*memory.Buffer, len(typs))
// Calculate the number of buffers needed for all types to be able to batch
Expand All @@ -99,6 +117,7 @@ func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchCo
}
}
if needOffsets {
c.usesOffsets = true
c.scratch.values = make([][]byte, len(typs))
c.scratch.offsets = make([][]int32, len(typs))
}
Expand All @@ -109,7 +128,9 @@ func NewArrowBatchConverter(typs []*types.T, mode ConversionMode) (*ArrowBatchCo
// arrow []array.Data. It is assumed that the batch is not larger than
// coldata.BatchSize(). The returned []array.Data may only be used until the
// next call to BatchToArrow.
func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]array.Data, error) {
func (c *ArrowBatchConverter) BatchToArrow(
ctx context.Context, batch coldata.Batch,
) ([]array.Data, error) {
if batch.Width() != len(c.typs) {
return nil, errors.AssertionFailedf("mismatched batch width and schema length: %d != %d", batch.Width(), len(c.typs))
}
Expand Down Expand Up @@ -314,7 +335,7 @@ func (c *ArrowBatchConverter) BatchToArrow(batch coldata.Batch) ([]array.Data, e
nil /* dtype */, n, c.scratch.buffers[vecIdx], nil /* childData */, 0 /* nulls */, 0, /* offset */
)
}
return c.scratch.arrowData, nil
return c.scratch.arrowData, c.accountForScratch(ctx)
}

// unsafeCastOffsetsArray unsafe-casts the input offsetsBytes slice to point at
Expand All @@ -328,6 +349,26 @@ func unsafeCastOffsetsArray(offsetsInt32 []int32, offsetsBytes *[]byte) {
bytesHeader.Cap = int32Header.Cap * int(memsize.Int32)
}

// accountForScratch performs memory accounting for values and offsets scratch
// slices. Note that we ignore the overhead of the slices themselves as well as
// of other scratch objects since those should be negligible in footprint.
func (c *ArrowBatchConverter) accountForScratch(ctx context.Context) error {
if !c.usesOffsets {
return nil
}
var footprint int64
for i := range c.scratch.values {
// Some of these might be nil in which case footprint won't change.
footprint += int64(cap(c.scratch.values[i]))
footprint += int64(cap(c.scratch.offsets[i])) * memsize.Int32
}
if err := c.acc.Resize(ctx, c.accountedFor, footprint); err != nil {
return err
}
c.accountedFor = footprint
return nil
}

// ArrowToBatch converts []array.Data to a coldata.Batch. There must not be
// more than coldata.BatchSize() elements in data.
//
Expand Down Expand Up @@ -528,6 +569,9 @@ func getValueBytesAndOffsets(

// Release should be called once the converter is no longer needed so that its
// memory could be GCed.
func (c *ArrowBatchConverter) Release() {
func (c *ArrowBatchConverter) Release(ctx context.Context) {
if c.acc != nil {
c.acc.Shrink(ctx, c.accountedFor)
}
*c = ArrowBatchConverter{}
}
22 changes: 14 additions & 8 deletions pkg/col/colserde/arrowbatchconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde_test

import (
"bytes"
"context"
"fmt"
"testing"

Expand Down Expand Up @@ -47,14 +48,15 @@ func TestArrowBatchConverterRandom(t *testing.T) {
defer leaktest.AfterTest(t)()

typs, b := randomBatch(testAllocator)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional, testMemAcc)
require.NoError(t, err)
defer c.Release(context.Background())

// Make a copy of the original batch because the converter modifies and casts
// data without copying for performance reasons.
expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory)

arrowData, err := c.BatchToArrow(b)
arrowData, err := c.BatchToArrow(context.Background(), b)
require.NoError(t, err)
actual := testAllocator.NewMemBatchWithFixedCapacity(typs, b.Length())
require.NoError(t, c.ArrowToBatch(arrowData, b.Length(), actual))
Expand All @@ -68,7 +70,7 @@ func roundTripBatch(
src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer,
) error {
var buf bytes.Buffer
arrowDataIn, err := c.BatchToArrow(src)
arrowDataIn, err := c.BatchToArrow(context.Background(), src)
if err != nil {
return err
}
Expand Down Expand Up @@ -100,8 +102,9 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) {
typs, src = randomBatch(testAllocator)
}
dest := testAllocator.NewMemBatchWithMaxCapacity(typs)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter(typs, colserde.BiDirectional, testMemAcc)
require.NoError(t, err)
defer c.Release(context.Background())
r, err := colserde.NewRecordBatchSerializer(typs)
require.NoError(t, err)

Expand Down Expand Up @@ -215,16 +218,18 @@ func runConversionBenchmarks(
}

func BenchmarkArrowBatchConverter(b *testing.B) {
ctx := context.Background()
runConversionBenchmarks(
b,
"BatchToArrow",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional, testMemAcc)
require.NoError(b, err)
defer c.Release(ctx)
var data []array.Data
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, _ = c.BatchToArrow(batch)
data, _ = c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -235,9 +240,10 @@ func BenchmarkArrowBatchConverter(b *testing.B) {
},
"ArrowToBatch",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BiDirectional, testMemAcc)
require.NoError(b, err)
data, err := c.BatchToArrow(batch)
defer c.Release(ctx)
data, err := c.BatchToArrow(ctx, batch)
dataCopy := make([]array.Data, len(data))
require.NoError(b, err)
result := testAllocator.NewMemBatchWithMaxCapacity([]*types.T{typ})
Expand Down
12 changes: 8 additions & 4 deletions pkg/col/colserde/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde_test

import (
"bytes"
"context"
"testing"

"github.com/apache/arrow/go/arrow/array"
Expand All @@ -22,8 +23,9 @@ import (
)

func BenchmarkConversion(b *testing.B) {
ctx := context.Background()
createSerializer := func(typ *types.T) (*colserde.ArrowBatchConverter, *colserde.RecordBatchSerializer) {
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BatchToArrowOnly)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.BatchToArrowOnly, testMemAcc)
require.NoError(b, err)
s, err := colserde.NewRecordBatchSerializer([]*types.T{typ})
require.NoError(b, err)
Expand All @@ -34,10 +36,11 @@ func BenchmarkConversion(b *testing.B) {
"Serialize",
func(b *testing.B, batch coldata.Batch, typ *types.T) {
c, s := createSerializer(typ)
defer c.Release(ctx)
var buf bytes.Buffer
b.ResetTimer()
for i := 0; i < b.N; i++ {
data, _ := c.BatchToArrow(batch)
data, _ := c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -56,8 +59,9 @@ func BenchmarkConversion(b *testing.B) {
var serialized []byte
{
c, s := createSerializer(typ)
defer c.Release(ctx)
var buf bytes.Buffer
data, _ := c.BatchToArrow(batch)
data, _ := c.BatchToArrow(ctx, batch)
if len(data) != 1 {
b.Fatal("expected arrow batch of length 1")
}
Expand All @@ -70,7 +74,7 @@ func BenchmarkConversion(b *testing.B) {
}
serialized = buf.Bytes()
}
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.ArrowToBatchOnly)
c, err := colserde.NewArrowBatchConverter([]*types.T{typ}, colserde.ArrowToBatchOnly, nil /* acc */)
require.NoError(b, err)
s, err := colserde.NewRecordBatchSerializer([]*types.T{typ})
require.NoError(b, err)
Expand Down
24 changes: 14 additions & 10 deletions pkg/col/colserde/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colserde

import (
"bytes"
"context"
"encoding/binary"
"io"
"os"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
mmap "github.com/edsrzf/mmap-go"
flatbuffers "github.com/google/flatbuffers/go"
Expand Down Expand Up @@ -54,9 +56,11 @@ type FileSerializer struct {
}

// NewFileSerializer creates a FileSerializer for the given types. The caller is
// responsible for closing the given writer.
func NewFileSerializer(w io.Writer, typs []*types.T) (*FileSerializer, error) {
a, err := NewArrowBatchConverter(typs, BatchToArrowOnly)
// responsible for closing the given writer as well as the given memory account.
func NewFileSerializer(
w io.Writer, typs []*types.T, acc *mon.BoundAccount,
) (*FileSerializer, error) {
a, err := NewArrowBatchConverter(typs, BatchToArrowOnly, acc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,10 +108,10 @@ func (s *FileSerializer) Reset(w io.Writer) error {
}

// AppendBatch adds one batch of columnar data to the file.
func (s *FileSerializer) AppendBatch(batch coldata.Batch) error {
func (s *FileSerializer) AppendBatch(ctx context.Context, batch coldata.Batch) error {
offset := int64(s.w.written)

arrow, err := s.a.BatchToArrow(batch)
arrow, err := s.a.BatchToArrow(ctx, batch)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,8 +157,8 @@ func (s *FileSerializer) Finish() error {
}

// Close releases the resources of the serializer.
func (s *FileSerializer) Close() {
s.a.Release()
func (s *FileSerializer) Close(ctx context.Context) {
s.a.Release(ctx)
}

// FileDeserializer decodes columnar data batches from files encoded according
Expand Down Expand Up @@ -213,7 +217,7 @@ func newFileDeserializer(
}
d.typs = typs

if d.a, err = NewArrowBatchConverter(typs, ArrowToBatchOnly); err != nil {
if d.a, err = NewArrowBatchConverter(typs, ArrowToBatchOnly, nil /* acc */); err != nil {
return nil, err
}
if d.rb, err = NewRecordBatchSerializer(typs); err != nil {
Expand All @@ -225,8 +229,8 @@ func newFileDeserializer(
}

// Close releases any resources held by this deserializer.
func (d *FileDeserializer) Close() error {
d.a.Release()
func (d *FileDeserializer) Close(ctx context.Context) error {
d.a.Release(ctx)
return d.bufCloseFn()
}

Expand Down
Loading

0 comments on commit 2922777

Please sign in to comment.