Skip to content

Commit

Permalink
ARROW-17734: [Go] Implement Take for Lists and Dense Union (apache#14130
Browse files Browse the repository at this point in the history
)

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored and fatemehp committed Oct 17, 2022
1 parent b3b9f15 commit 8001207
Show file tree
Hide file tree
Showing 8 changed files with 398 additions and 285 deletions.
3 changes: 3 additions & 0 deletions go/arrow/array/bufferbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (b *bufferBuilder) Finish() (buffer *memory.Buffer) {
buffer = b.buffer
b.buffer = nil
b.Reset()
if buffer == nil {
buffer = memory.NewBufferBytes(nil)
}
return
}

Expand Down
5 changes: 5 additions & 0 deletions go/arrow/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"runtime"
"sync"

"github.com/apache/arrow/go/v10/arrow"
Expand All @@ -44,6 +45,7 @@ type ExecCtx struct {
PreallocContiguous bool
Registry FunctionRegistry
ExecChannelSize int
NP int
}

type ctxExecKey struct{}
Expand All @@ -70,6 +72,9 @@ func init() {
defaultExecCtx.PreallocContiguous = true
defaultExecCtx.Registry = GetFunctionRegistry()
defaultExecCtx.ExecChannelSize = 10
// default level of parallelism
// set to 1 to disable parallelization
defaultExecCtx.NP = runtime.NumCPU()
}

// SetExecCtx returns a new child context containing the passed in ExecCtx
Expand Down
11 changes: 7 additions & 4 deletions go/arrow/compute/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ replace github.com/apache/arrow/go/v10 => ../../
require (
github.com/apache/arrow/go/v10 v10.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.0
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f
)

require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.9.10 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.6+incompatible // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/tools v0.1.12 // indirect
Expand Down
280 changes: 17 additions & 263 deletions go/arrow/compute/go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions go/arrow/compute/internal/exec/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func (a *ArraySpan) MakeData() arrow.ArrayData {
defer dict.Release()
result.SetDictionary(dict)
return result
} else if dt.ID() == arrow.DENSE_UNION || dt.ID() == arrow.SPARSE_UNION {
bufs[0] = nil
}

if len(a.Children) > 0 {
Expand Down
175 changes: 159 additions & 16 deletions go/arrow/compute/internal/kernels/vector_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,32 +1070,32 @@ func takeExec(ctx *exec.KernelCtx, outputLen int64, values, indices *exec.ArrayS
}
}

type outputFn func(*exec.KernelCtx, int64, *exec.ArraySpan, *exec.ArraySpan, *exec.ExecResult, func(int64) error, func() error) error
type implFn func(*exec.KernelCtx, *exec.ExecSpan, int64, *exec.ExecResult, outputFn) error
type selectionOutputFn func(*exec.KernelCtx, int64, *exec.ArraySpan, *exec.ArraySpan, *exec.ExecResult, func(int64) error, func() error) error
type selectionImplFn func(*exec.KernelCtx, *exec.ExecSpan, int64, *exec.ExecResult, selectionOutputFn) error

func FilterExec(impl implFn, fn outputFn) exec.ArrayKernelExec {
func FilterExec(impl selectionImplFn) exec.ArrayKernelExec {
return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
var (
selection = &batch.Values[1].Array
outputLength = getFilterOutputSize(selection, ctx.State.(FilterState).NullSelection)
)
return impl(ctx, batch, outputLength, out, fn)
return impl(ctx, batch, outputLength, out, filterExec)
}
}

func TakeExec(impl implFn, fn outputFn) exec.ArrayKernelExec {
func TakeExec(impl selectionImplFn) exec.ArrayKernelExec {
return func(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
if ctx.State.(TakeState).BoundsCheck {
if err := checkIndexBounds(&batch.Values[1].Array, uint64(batch.Values[0].Array.Len)); err != nil {
return err
}
}

return impl(ctx, batch, batch.Values[1].Array.Len, out, fn)
return impl(ctx, batch, batch.Values[1].Array.Len, out, takeExec)
}
}

func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn outputFn) error {
func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
selection = &batch.Values[1].Array
Expand Down Expand Up @@ -1144,7 +1144,7 @@ func VarBinaryImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecS
return nil
}

func FSBImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn outputFn) error {
func FSBImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
selection = &batch.Values[1].Array
Expand Down Expand Up @@ -1177,6 +1177,149 @@ func FSBImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out
return nil
}

func ListImpl[OffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
selection = &batch.Values[1].Array

rawOffsets = exec.GetSpanOffsets[OffsetT](values, 1)
mem = exec.GetAllocator(ctx.Ctx)
offsetBuilder = newBufferBuilder[OffsetT](mem)
childIdxBuilder = newBufferBuilder[OffsetT](mem)
)

if values.Len > 0 {
dataLength := rawOffsets[values.Len] - rawOffsets[0]
meanListLen := float64(dataLength) / float64(values.Len)
childIdxBuilder.reserve(int(meanListLen))
}

offsetBuilder.reserve(int(outputLength) + 1)
var offset OffsetT
err := fn(ctx, outputLength, values, selection, out,
func(idx int64) error {
offsetBuilder.unsafeAppend(offset)
valueOffset := rawOffsets[idx]
valueLength := rawOffsets[idx+1] - valueOffset
offset += valueLength
childIdxBuilder.reserve(int(valueLength))
for j := valueOffset; j < valueOffset+valueLength; j++ {
childIdxBuilder.unsafeAppend(j)
}
return nil
}, func() error {
offsetBuilder.unsafeAppend(offset)
return nil
})

if err != nil {
return err
}

offsetBuilder.unsafeAppend(offset)
out.Buffers[1].WrapBuffer(offsetBuilder.finish())

out.Children = make([]exec.ArraySpan, 1)
out.Children[0].Type = exec.GetDataType[OffsetT]()
out.Children[0].Len = int64(childIdxBuilder.len())
out.Children[0].Buffers[1].WrapBuffer(childIdxBuilder.finish())

return nil
}

func FSLImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
selection = &batch.Values[1].Array

listSize = values.Type.(*arrow.FixedSizeListType).Len()
baseOffset = values.Offset

childIdxBuilder = array.NewInt64Builder(exec.GetAllocator(ctx.Ctx))
)

// we need to take listSize elements even for null elements of indices
childIdxBuilder.Reserve(int(outputLength) * int(listSize))
err := fn(ctx, outputLength, values, selection, out,
func(idx int64) error {
offset := (baseOffset + idx) * int64(listSize)
for j := offset; j < (offset + int64(listSize)); j++ {
childIdxBuilder.UnsafeAppend(j)
}
return nil
}, func() error {
for n := int32(0); n < listSize; n++ {
childIdxBuilder.AppendNull()
}
return nil
})

if err != nil {
return err
}

arr := childIdxBuilder.NewArray()
defer arr.Release()
out.Children = make([]exec.ArraySpan, 1)
out.Children[0].TakeOwnership(arr.Data())
return nil
}

func DenseUnionImpl(ctx *exec.KernelCtx, batch *exec.ExecSpan, outputLength int64, out *exec.ExecResult, fn selectionOutputFn) error {
var (
values = &batch.Values[0].Array
selection = &batch.Values[1].Array

mem = exec.GetAllocator(ctx.Ctx)
valueOffsetBldr = newBufferBuilder[int32](mem)
childIdBldr = newBufferBuilder[int8](mem)
typeCodes = values.Type.(arrow.UnionType).TypeCodes()
childIndicesBldrs = make([]*array.Int32Builder, len(typeCodes))
)

for i := range childIndicesBldrs {
childIndicesBldrs[i] = array.NewInt32Builder(mem)
}

childIdBldr.reserve(int(outputLength))
valueOffsetBldr.reserve(int(outputLength))

typedValues := values.MakeArray().(*array.DenseUnion)
defer typedValues.Release()

err := fn(ctx, outputLength, values, selection, out,
func(idx int64) error {
childID := typedValues.ChildID(int(idx))
childIdBldr.unsafeAppend(typeCodes[childID])
valueOffset := typedValues.ValueOffset(int(idx))
valueOffsetBldr.unsafeAppend(int32(childIndicesBldrs[childID].Len()))
childIndicesBldrs[childID].Append(valueOffset)
return nil
}, func() error {
childID := 0
childIdBldr.unsafeAppend(typeCodes[childID])
valueOffsetBldr.unsafeAppend(int32(childIndicesBldrs[childID].Len()))
childIndicesBldrs[childID].AppendNull()
return nil
})
if err != nil {
return err
}

out.Type = typedValues.DataType()
out.Buffers[1].WrapBuffer(childIdBldr.finish())
out.Buffers[2].WrapBuffer(valueOffsetBldr.finish())

out.Children = make([]exec.ArraySpan, len(childIndicesBldrs))
for i, b := range childIndicesBldrs {
arr := b.NewArray()
out.Children[i].TakeOwnership(arr.Data())
arr.Release()
b.Release()
}
return nil
}

func FilterBinary(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
var (
nullSelect = ctx.State.(FilterState).NullSelection
Expand Down Expand Up @@ -1228,21 +1371,21 @@ func GetVectorSelectionKernels() (filterkernels, takeKernels []SelectionKernelDa
filterkernels = []SelectionKernelData{
{In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveFilter},
{In: exec.NewExactInput(arrow.Null), Exec: NullFilter},
{In: exec.NewIDInput(arrow.DECIMAL128), Exec: FilterExec(FSBImpl, filterExec)},
{In: exec.NewIDInput(arrow.DECIMAL256), Exec: FilterExec(FSBImpl, filterExec)},
{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: FilterExec(FSBImpl, filterExec)},
{In: exec.NewIDInput(arrow.DECIMAL128), Exec: FilterExec(FSBImpl)},
{In: exec.NewIDInput(arrow.DECIMAL256), Exec: FilterExec(FSBImpl)},
{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: FilterExec(FSBImpl)},
{In: exec.NewMatchedInput(exec.BinaryLike()), Exec: FilterBinary},
{In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: FilterBinary},
}

takeKernels = []SelectionKernelData{
{In: exec.NewExactInput(arrow.Null), Exec: NullTake},
{In: exec.NewMatchedInput(exec.Primitive()), Exec: PrimitiveTake},
{In: exec.NewIDInput(arrow.DECIMAL128), Exec: TakeExec(FSBImpl, takeExec)},
{In: exec.NewIDInput(arrow.DECIMAL256), Exec: TakeExec(FSBImpl, takeExec)},
{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: TakeExec(FSBImpl, takeExec)},
{In: exec.NewMatchedInput(exec.BinaryLike()), Exec: TakeExec(VarBinaryImpl[int32], takeExec)},
{In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: TakeExec(VarBinaryImpl[int64], takeExec)},
{In: exec.NewIDInput(arrow.DECIMAL128), Exec: TakeExec(FSBImpl)},
{In: exec.NewIDInput(arrow.DECIMAL256), Exec: TakeExec(FSBImpl)},
{In: exec.NewIDInput(arrow.FIXED_SIZE_BINARY), Exec: TakeExec(FSBImpl)},
{In: exec.NewMatchedInput(exec.BinaryLike()), Exec: TakeExec(VarBinaryImpl[int32])},
{In: exec.NewMatchedInput(exec.LargeBinaryLike()), Exec: TakeExec(VarBinaryImpl[int64])},
}
return
}
Loading

0 comments on commit 8001207

Please sign in to comment.