Skip to content

Commit

Permalink
ARROW-17671: [Go] Filter kernels for Binary/String (#14098)
Browse files Browse the repository at this point in the history
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Sep 13, 2022
1 parent 6fc33e7 commit 8bf60b5
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 4 deletions.
10 changes: 6 additions & 4 deletions go/arrow/compute/internal/exec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ func GetSpanOffsets[T int32 | int64](span *ArraySpan, i int) []T {
return ret[span.Offset:]
}

func GetBytes[T constraints.Integer](in []T) []byte {
return unsafe.Slice((*byte)(unsafe.Pointer(&in[0])), len(in)*int(unsafe.Sizeof(T(0))))
func GetBytes[T FixedWidthTypes](in []T) []byte {
var z T
return unsafe.Slice((*byte)(unsafe.Pointer(&in[0])), len(in)*int(unsafe.Sizeof(z)))
}

func GetData[T NumericTypes](in []byte) []T {
return unsafe.Slice((*T)(unsafe.Pointer(&in[0])), len(in)/int(unsafe.Sizeof(T(0))))
func GetData[T FixedWidthTypes](in []byte) []T {
var z T
return unsafe.Slice((*T)(unsafe.Pointer(&in[0])), len(in)/int(unsafe.Sizeof(z)))
}

func Min[T constraints.Ordered](a, b T) T {
Expand Down
132 changes: 132 additions & 0 deletions go/arrow/compute/internal/kernels/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kernels

import (
"fmt"
"unsafe"

"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/bitutil"
Expand Down Expand Up @@ -513,3 +514,134 @@ func (v *validityBuilder) Finish() (buf *memory.Buffer) {
v.buffer = nil
return
}

type execBufBuilder struct {
mem memory.Allocator
buffer *memory.Buffer
data []byte
sz int
}

func (bldr *execBufBuilder) resize(newcap int) {
if bldr.buffer == nil {
bldr.buffer = memory.NewResizableBuffer(bldr.mem)
}

bldr.buffer.ResizeNoShrink(newcap)
bldr.data = bldr.buffer.Bytes()
}

func (bldr *execBufBuilder) reserve(additional int) {
if bldr.buffer == nil {
bldr.buffer = memory.NewResizableBuffer(bldr.mem)
}

mincap := bldr.sz + additional
if mincap <= cap(bldr.data) {
return
}
bldr.buffer.ResizeNoShrink(mincap)
bldr.data = bldr.buffer.Buf()
}

func (bldr *execBufBuilder) unsafeAppend(data []byte) {
copy(bldr.data[bldr.sz:], data)
bldr.sz += len(data)
}

func (bldr *execBufBuilder) unsafeAppendN(n int, val byte) {
bldr.data[bldr.sz] = val
for i := 1; i < n; i *= 2 {
copy(bldr.data[bldr.sz+i:], bldr.data[bldr.sz:bldr.sz+i])
}
bldr.sz += n
}

func (bldr *execBufBuilder) append(data []byte) {
if bldr.sz+len(data) > cap(bldr.data) {
bldr.resize(bldr.sz + len(data))
}
bldr.unsafeAppend(data)
}

func (bldr *execBufBuilder) appendN(n int, val byte) {
bldr.reserve(n)
bldr.unsafeAppendN(n, val)
}

func (bldr *execBufBuilder) advance(n int) {
bldr.reserve(n)
bldr.sz += n
}

func (bldr *execBufBuilder) unsafeAdvance(n int) {
bldr.sz += n
}

func (bldr *execBufBuilder) finish() (buf *memory.Buffer) {
bldr.buffer.Resize(bldr.sz)
buf = bldr.buffer
bldr.buffer, bldr.sz = nil, 0
return
}

type bufferBuilder[T exec.FixedWidthTypes] struct {
execBufBuilder
zero T
}

func newBufferBuilder[T exec.FixedWidthTypes](mem memory.Allocator) *bufferBuilder[T] {
return &bufferBuilder[T]{
execBufBuilder: execBufBuilder{
mem: mem,
},
}
}

func (b *bufferBuilder[T]) reserve(additional int) {
b.execBufBuilder.reserve(additional * int(unsafe.Sizeof(b.zero)))
}

func (b *bufferBuilder[T]) resize(newcap int) {
b.execBufBuilder.resize(newcap * int(unsafe.Sizeof(b.zero)))
}

func (b *bufferBuilder[T]) unsafeAppend(value T) {
b.execBufBuilder.unsafeAppend(exec.GetBytes([]T{value}))
}

func (b *bufferBuilder[T]) unsafeAppendN(n int, value T) {
data := exec.GetData[T](b.data)[b.len():]
b.execBufBuilder.unsafeAdvance(n * int(unsafe.Sizeof(value)))
data[0] = value
for i := 1; i < n; i *= 2 {
copy(data[i:], data[:i])
}
}

func (b *bufferBuilder[T]) unsafeAppendSlice(values []T) {
b.execBufBuilder.unsafeAppend(exec.GetBytes(values))
}

func (b *bufferBuilder[T]) advance(n int) {
b.execBufBuilder.advance(n * int(unsafe.Sizeof(b.zero)))
}

func (b *bufferBuilder[T]) append(value T) {
b.execBufBuilder.append(exec.GetBytes([]T{value}))
}

func (b *bufferBuilder[T]) len() int { return b.sz / int(unsafe.Sizeof(b.zero)) }

func (b *bufferBuilder[T]) appendN(n int, value T) {
b.reserve(n + b.len())
b.unsafeAppendN(n, value)
}

func (b *bufferBuilder[T]) appendSlice(values []T) {
b.execBufBuilder.append(exec.GetBytes(values))
}

func (b *bufferBuilder[T]) cap() int {
return cap(b.data) / int(unsafe.Sizeof(b.zero))
}
Loading

0 comments on commit 8bf60b5

Please sign in to comment.