Skip to content

Commit

Permalink
ARROW-17600: [Go] Implement Casting for Nested types (#14056)
Browse files Browse the repository at this point in the history
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Sep 7, 2022
1 parent 21491ec commit d123277
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 37 deletions.
15 changes: 14 additions & 1 deletion ci/scripts/go_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@

set -ex

# simplistic semver comparison
verlte() {
[ "$1" = "`echo -e "$1\n$2" | sort -V | head -n1`" ]
}
verlt() {
[ "$1" = "$2" ] && return 1 || verlte $1 $2
}

ver=`go env GOVERSION`

source_dir=${1}/go

testargs="-race"
if [[ "${ver#go}" =~ ^1\.1[8-9] ]] && [ "$(go env GOOS)" != "darwin" ]; then
if verlte "1.18" "${ver#go}" && [ "$(go env GOOS)" != "darwin" ]; then
# asan not supported on darwin/amd64
testargs="-asan"
fi
Expand Down Expand Up @@ -65,6 +73,11 @@ fi

go test $testargs -tags $TAGS ./...

# only test compute when Go is >= 1.18
if verlte "1.18" "${ver#go}"; then
go test $testargs -tags $TAGS ./compute/...
fi

popd

export PARQUET_TEST_DATA=${1}/cpp/submodules/parquet-testing/data
Expand Down
1 change: 1 addition & 0 deletions go/arrow/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type testDataType struct {
func (d *testDataType) ID() arrow.Type { return d.id }
func (d *testDataType) Name() string { panic("implement me") }
func (d *testDataType) BitWidth() int { return 8 }
func (d *testDataType) Bytes() int { return 1 }
func (d *testDataType) Fingerprint() string { return "" }
func (testDataType) Layout() arrow.DataTypeLayout { return arrow.DataTypeLayout{} }
func (testDataType) String() string { return "" }
Expand Down
34 changes: 34 additions & 0 deletions go/arrow/array/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,40 @@ type Struct struct {
fields []arrow.Array
}

// NewStructArray constructs a new Struct Array out of the columns passed
// in and the field names. The length of all cols must be the same and
// there should be the same number of columns as names.
func NewStructArray(cols []arrow.Array, names []string) (*Struct, error) {
return NewStructArrayWithNulls(cols, names, nil, 0, 0)
}

// NewStructArrayWithNulls is like NewStructArray as a convenience function,
// but also takes in a null bitmap, the number of nulls, and an optional offset
// to use for creating the Struct Array.
func NewStructArrayWithNulls(cols []arrow.Array, names []string, nullBitmap *memory.Buffer, nullCount int, offset int) (*Struct, error) {
if len(cols) != len(names) {
return nil, fmt.Errorf("%w: mismatching number of fields and child arrays", arrow.ErrInvalid)
}
if len(cols) == 0 {
return nil, fmt.Errorf("%w: can't infer struct array length with 0 child arrays", arrow.ErrInvalid)
}
length := cols[0].Len()
children := make([]arrow.ArrayData, len(cols))
fields := make([]arrow.Field, len(cols))
for i, c := range cols {
if length != c.Len() {
return nil, fmt.Errorf("%w: mismatching child array lengths", arrow.ErrInvalid)
}
children[i] = c.Data()
fields[i].Name = names[i]
fields[i].Type = c.DataType()
fields[i].Nullable = true
}
data := NewData(arrow.StructOf(fields...), length, []*memory.Buffer{nullBitmap}, children, nullCount, offset)
defer data.Release()
return NewStructData(data), nil
}

// NewStructData returns a new Struct array value from data.
func NewStructData(data arrow.ArrayData) *Struct {
a := &Struct{}
Expand Down
202 changes: 202 additions & 0 deletions go/arrow/compute/cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/bitutil"
"github.com/apache/arrow/go/v10/arrow/compute/internal/exec"
"github.com/apache/arrow/go/v10/arrow/compute/internal/kernels"
)
Expand Down Expand Up @@ -150,6 +151,156 @@ func CastFromExtension(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.Exec
return nil
}

func CastList[SrcOffsetT, DestOffsetT int32 | int64](ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
var (
opts = ctx.State.(kernels.CastState)
childType = out.Type.(arrow.NestedType).Fields()[0].Type
input = &batch.Values[0].Array
offsets = exec.GetSpanOffsets[SrcOffsetT](input, 1)
isDowncast = kernels.SizeOf[SrcOffsetT]() > kernels.SizeOf[DestOffsetT]()
)

out.Buffers[0] = input.Buffers[0]
out.Buffers[1] = input.Buffers[1]

if input.Offset != 0 && len(input.Buffers[0].Buf) > 0 {
out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(input.Len))
bitutil.CopyBitmap(input.Buffers[0].Buf, int(input.Offset), int(input.Len),
out.Buffers[0].Buf, 0)
}

// Handle list offsets
// Several cases possible:
// - The source offset is non-zero, in which case we slice the
// underlying values and shift the list offsets (regardless of
// their respective types)
// - the source offset is zero but the source and destination types
// have different list offset types, in which case we cast the offsets
// - otherwise we simply keep the original offsets
if isDowncast {
if offsets[input.Len] > SrcOffsetT(kernels.MaxOf[DestOffsetT]()) {
return fmt.Errorf("%w: array of type %s too large to convert to %s",
arrow.ErrInvalid, input.Type, out.Type)
}
}

values := input.Children[0].MakeArray()
defer values.Release()

if input.Offset != 0 {
out.Buffers[1].WrapBuffer(
ctx.Allocate(out.Type.(arrow.OffsetsDataType).
OffsetTypeTraits().BytesRequired(int(input.Len) + 1)))

shiftedOffsets := exec.GetSpanOffsets[DestOffsetT](out, 1)
for i := 0; i < int(input.Len)+1; i++ {
shiftedOffsets[i] = DestOffsetT(offsets[i] - offsets[0])
}

values = array.NewSlice(values, int64(offsets[0]), int64(offsets[input.Len]))
defer values.Release()
} else if kernels.SizeOf[SrcOffsetT]() != kernels.SizeOf[DestOffsetT]() {
out.Buffers[1].WrapBuffer(ctx.Allocate(out.Type.(arrow.OffsetsDataType).
OffsetTypeTraits().BytesRequired(int(input.Len) + 1)))

kernels.DoStaticCast(exec.GetSpanOffsets[SrcOffsetT](input, 1),
exec.GetSpanOffsets[DestOffsetT](out, 1))
}

// handle values
opts.ToType = childType

castedValues, err := CastArray(ctx.Ctx, values, &opts)
if err != nil {
return err
}
defer castedValues.Release()

out.Children = make([]exec.ArraySpan, 1)
out.Children[0].SetMembers(castedValues.Data())
for i, b := range out.Children[0].Buffers {
if b.Owner != nil && b.Owner != values.Data().Buffers()[i] {
b.Owner.Retain()
b.SelfAlloc = true
}
}
return nil
}

func CastStruct(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
var (
opts = ctx.State.(kernels.CastState)
inType = batch.Values[0].Array.Type.(*arrow.StructType)
outType = out.Type.(*arrow.StructType)
inFieldCount = len(inType.Fields())
outFieldCount = len(outType.Fields())
)

fieldsToSelect := make([]int, outFieldCount)
for i := range fieldsToSelect {
fieldsToSelect[i] = -1
}

outFieldIndex := 0
for inFieldIndex := 0; inFieldIndex < inFieldCount && outFieldIndex < outFieldCount; inFieldIndex++ {
inField := inType.Field(inFieldIndex)
outField := outType.Field(outFieldIndex)
if inField.Name == outField.Name {
if inField.Nullable && !outField.Nullable {
return fmt.Errorf("%w: cannot cast nullable field to non-nullable field: %s %s",
arrow.ErrType, inType, outType)
}
fieldsToSelect[outFieldIndex] = inFieldIndex
outFieldIndex++
}
}

if outFieldIndex < outFieldCount {
return fmt.Errorf("%w: struct fields don't match or are in the wrong order: Input: %s Output: %s",
arrow.ErrType, inType, outType)
}

input := &batch.Values[0].Array
if len(input.Buffers[0].Buf) > 0 {
out.Buffers[0].WrapBuffer(ctx.AllocateBitmap(input.Len))
bitutil.CopyBitmap(input.Buffers[0].Buf, int(input.Offset), int(input.Len),
out.Buffers[0].Buf, 0)
}

out.Children = make([]exec.ArraySpan, outFieldCount)
for outFieldIndex, idx := range fieldsToSelect {
values := input.Children[idx].MakeArray()
defer values.Release()
values = array.NewSlice(values, input.Offset, input.Len)
defer values.Release()

opts.ToType = outType.Field(outFieldIndex).Type
castedValues, err := CastArray(ctx.Ctx, values, &opts)
if err != nil {
return err
}
defer castedValues.Release()

out.Children[outFieldIndex].TakeOwnership(castedValues.Data())
}
return nil
}

func addListCast[SrcOffsetT, DestOffsetT int32 | int64](fn *castFunction, inType arrow.Type) error {
kernel := exec.NewScalarKernel([]exec.InputType{exec.NewIDInput(inType)},
kernels.OutputTargetType, CastList[SrcOffsetT, DestOffsetT], nil)
kernel.NullHandling = exec.NullComputedNoPrealloc
kernel.MemAlloc = exec.MemNoPrealloc
return fn.AddTypeCast(inType, kernel)
}

func addStructToStructCast(fn *castFunction) error {
kernel := exec.NewScalarKernel([]exec.InputType{exec.NewIDInput(arrow.STRUCT)},
kernels.OutputTargetType, CastStruct, nil)
kernel.NullHandling = exec.NullComputedNoPrealloc
return fn.AddTypeCast(arrow.STRUCT, kernel)
}

func addCastFuncs(fn []*castFunction) {
for _, f := range fn {
f.AddNewTypeCast(arrow.EXTENSION, []exec.InputType{exec.NewIDInput(arrow.EXTENSION)},
Expand All @@ -165,6 +316,12 @@ func initCastTable() {
addCastFuncs(getNumericCasts())
addCastFuncs(getBinaryLikeCasts())
addCastFuncs(getTemporalCasts())
addCastFuncs(getNestedCasts())

nullToExt := newCastFunction("cast_extension", arrow.EXTENSION)
nullToExt.AddNewTypeCast(arrow.NULL, []exec.InputType{exec.NewExactInput(arrow.Null)},
kernels.OutputTargetType, kernels.CastFromNull, exec.NullComputedNoPrealloc, exec.MemNoPrealloc)
castTable[arrow.EXTENSION] = nullToExt
}

func getCastFunction(to arrow.DataType) (*castFunction, error) {
Expand All @@ -178,6 +335,51 @@ func getCastFunction(to arrow.DataType) (*castFunction, error) {
return nil, fmt.Errorf("%w: unsupported cast to %s", arrow.ErrNotImplemented, to)
}

func getNestedCasts() []*castFunction {
out := make([]*castFunction, 0)

addKernels := func(fn *castFunction, kernels []exec.ScalarKernel) {
for _, k := range kernels {
if err := fn.AddTypeCast(k.Signature.InputTypes[0].MatchID(), k); err != nil {
panic(err)
}
}
}

castLists := newCastFunction("cast_list", arrow.LIST)
addKernels(castLists, kernels.GetCommonCastKernels(arrow.LIST, kernels.OutputTargetType))
if err := addListCast[int32, int32](castLists, arrow.LIST); err != nil {
panic(err)
}
if err := addListCast[int64, int32](castLists, arrow.LARGE_LIST); err != nil {
panic(err)
}
out = append(out, castLists)

castLargeLists := newCastFunction("cast_large_list", arrow.LARGE_LIST)
addKernels(castLargeLists, kernels.GetCommonCastKernels(arrow.LARGE_LIST, kernels.OutputTargetType))
if err := addListCast[int32, int64](castLargeLists, arrow.LIST); err != nil {
panic(err)
}
if err := addListCast[int64, int64](castLargeLists, arrow.LARGE_LIST); err != nil {
panic(err)
}
out = append(out, castLargeLists)

castFsl := newCastFunction("cast_fixed_size_list", arrow.FIXED_SIZE_LIST)
addKernels(castFsl, kernels.GetCommonCastKernels(arrow.FIXED_SIZE_LIST, kernels.OutputTargetType))
out = append(out, castFsl)

castStruct := newCastFunction("cast_struct", arrow.STRUCT)
addKernels(castStruct, kernels.GetCommonCastKernels(arrow.STRUCT, kernels.OutputTargetType))
if err := addStructToStructCast(castStruct); err != nil {
panic(err)
}
out = append(out, castStruct)

return out
}

func getBooleanCasts() []*castFunction {
fn := newCastFunction("cast_boolean", arrow.BOOL)
kns := kernels.GetBooleanCastKernels()
Expand Down
Loading

0 comments on commit d123277

Please sign in to comment.