Skip to content

Commit

Permalink
ARROW-17646: [Go][CI] Switch C Data to use cgo.Handle (bumps to Go1.1…
Browse files Browse the repository at this point in the history
…7) (#14067)

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Sep 7, 2022
1 parent d123277 commit 47314c3
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ CUDA=11.0.3
DASK=latest
DOTNET=6.0
GCC_VERSION=""
GO=1.16
GO=1.17
STATICCHECK=v0.2.2
HDFS=3.2.1
JDK=8
Expand Down
24 changes: 12 additions & 12 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -86,9 +86,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -123,9 +123,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -158,9 +158,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -193,9 +193,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down Expand Up @@ -228,9 +228,9 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.16, 1.18]
go: [1.17, 1.18]
include:
- go: 1.16
- go: 1.17
staticcheck: v0.2.2
- go: 1.18
staticcheck: latest
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/debian-10-go.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

ARG arch=amd64
ARG go=1.16
ARG go=1.17
ARG staticcheck=v0.2.2
FROM ${arch}/golang:${go}-buster

Expand Down
2 changes: 1 addition & 1 deletion ci/docker/debian-11-go.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

ARG arch=amd64
ARG go=1.16
ARG go=1.17
ARG staticcheck=v0.2.2
FROM ${arch}/golang:${go}-bullseye

Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/go_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ set -ex
source_dir=${1}/go
ARCH=`uname -m`

# Arm64 CI is triggered by travis and run in arm64v8/golang:1.16-bullseye
# Arm64 CI is triggered by travis and run in arm64v8/golang:1.17-bullseye
if [ "aarch64" == "$ARCH" ]; then
# Install `staticcheck`
GO111MODULE=on go install honnef.co/go/tools/cmd/[email protected]
Expand Down
2 changes: 1 addition & 1 deletion dev/release/verify-release-candidate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ install_go() {
return 0
fi

local version=1.16.12
local version=1.17.13
show_info "Installing go version ${version}..."

local arch="$(uname -m)"
Expand Down
4 changes: 2 additions & 2 deletions dev/tasks/tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1449,13 +1449,13 @@ tasks:
ci: github
template: r/github.linux.revdepcheck.yml

test-debian-11-go-1.16:
test-debian-11-go-1.17:
ci: azure
template: docker-tests/azure.linux.yml
params:
env:
DEBIAN: 11
GO: 1.16
GO: 1.17
image: debian-go

test-ubuntu-default-docs:
Expand Down
18 changes: 12 additions & 6 deletions go/arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ package cdata
// int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); }
// int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); }
// const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); }
// struct ArrowArray* get_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArray* get_arr() {
// struct ArrowArray* out = (struct ArrowArray*)(malloc(sizeof(struct ArrowArray)));
// memset(out, 0, sizeof(struct ArrowArray));
// return out;
// }
// struct ArrowArrayStream* get_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); }
//
import "C"
Expand Down Expand Up @@ -655,18 +659,22 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er
func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) {
rdr.stream = C.get_stream()
C.ArrowArrayStreamMove(stream, rdr.stream)
rdr.arr = C.get_arr()
runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) {
if r.cur != nil {
r.cur.Release()
}
C.ArrowArrayStreamRelease(r.stream)
C.ArrowArrayRelease(r.arr)
C.free(unsafe.Pointer(r.stream))
C.free(unsafe.Pointer(r.arr))
})
}

// Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface
type nativeCRecordBatchReader struct {
stream *CArrowArrayStream
arr *CArrowArray
schema *arrow.Schema

cur arrow.Record
Expand Down Expand Up @@ -713,18 +721,16 @@ func (n *nativeCRecordBatchReader) next() error {
n.cur = nil
}

arr := C.get_arr()
defer C.free(unsafe.Pointer(arr))
errno := C.stream_get_next(n.stream, arr)
errno := C.stream_get_next(n.stream, n.arr)
if errno != 0 {
return n.getError(int(errno))
}

if C.ArrowArrayIsReleased(arr) == 1 {
if C.ArrowArrayIsReleased(n.arr) == 1 {
return io.EOF
}

rec, err := ImportCRecordBatchWithSchema(arr, n.schema)
rec, err := ImportCRecordBatchWithSchema(n.arr, n.schema)
if err != nil {
return err
}
Expand Down
27 changes: 26 additions & 1 deletion go/arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"encoding/binary"
"fmt"
"reflect"
"runtime/cgo"
"strings"
"unsafe"

Expand Down Expand Up @@ -362,7 +363,9 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0]))
}

out.private_data = unsafe.Pointer(storeData(arr.Data()))
arr.Data().Retain()
h := cgo.NewHandle(arr.Data())
out.private_data = unsafe.Pointer(&h)
out.release = (*[0]byte)(C.goReleaseArray)
switch arr := arr.(type) {
case *array.List:
Expand Down Expand Up @@ -400,3 +403,25 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) {
out.children = nil
}
}

type cRecordReader struct {
rdr array.RecordReader
}

func (rr cRecordReader) getSchema(out *CArrowSchema) int {
ExportArrowSchema(rr.rdr.Schema(), out)
return 0
}

func (rr cRecordReader) next(out *CArrowArray) int {
if rr.rdr.Next() {
ExportArrowRecordBatch(rr.rdr.Record(), out, nil)
return 0
}
releaseArr(out)
return 0
}

func (rr cRecordReader) release() {
rr.rdr.Release()
}
35 changes: 35 additions & 0 deletions go/arrow/cdata/cdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
"errors"
"io"
"runtime"
"runtime/cgo"
"testing"
"time"
"unsafe"

"github.com/apache/arrow/go/v10/arrow"
"github.com/apache/arrow/go/v10/arrow/array"
"github.com/apache/arrow/go/v10/arrow/decimal128"
"github.com/apache/arrow/go/v10/arrow/internal/arrdata"
"github.com/apache/arrow/go/v10/arrow/memory"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -659,3 +661,36 @@ func TestRecordReaderStream(t *testing.T) {
assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2))
}
}

func TestExportRecordReaderStream(t *testing.T) {
reclist := arrdata.Records["primitives"]
rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)

out := createTestStreamObj()
ExportRecordReader(rdr, out)

assert.NotNil(t, out.get_schema)
assert.NotNil(t, out.get_next)
assert.NotNil(t, out.get_last_error)
assert.NotNil(t, out.release)
assert.NotNil(t, out.private_data)

h := *(*cgo.Handle)(out.private_data)
assert.Same(t, rdr, h.Value().(cRecordReader).rdr)

importedRdr := ImportCArrayStream(out, nil)
i := 0
for {
rec, err := importedRdr.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
assert.NoError(t, err)
}

assert.Truef(t, array.RecordEqual(reclist[i], rec), "expected: %s\ngot: %s", reclist[i], rec)
i++
}
assert.EqualValues(t, len(reclist), i)
}
10 changes: 9 additions & 1 deletion go/arrow/cdata/cdata_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ package cdata
//
// void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out);
// struct ArrowArray* get_test_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArrayStream* get_test_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); }
// struct ArrowArrayStream* get_test_stream() {
// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream));
// memset(out, 0, sizeof(struct ArrowArrayStream));
// return out;
// }
//
// void release_test_arr(struct ArrowArray* arr) {
// for (int i = 0; i < arr->n_buffers; ++i) {
Expand Down Expand Up @@ -251,6 +255,10 @@ func createCArr(arr arrow.Array) *CArrowArray {
return carr
}

func createTestStreamObj() *CArrowArrayStream {
return C.get_test_stream()
}

func arrayStreamTest() *CArrowArrayStream {
st := C.get_test_stream()
C.setup_array_stream_test(2, st)
Expand Down
Loading

0 comments on commit 47314c3

Please sign in to comment.