Skip to content

Commit

Permalink
refactor: Add wrappers around callbacks to improve syntax and debugga…
Browse files Browse the repository at this point in the history
…bility (#338)

This PR adds thin wrappers around C function pointers (like is done in
Arrow C++) for Arrow C Data/Stream interface structures and uses them in
all tests and internal code. The motivation for this was:

- Calling `x.release(&x)` is a tiny bit verbose and involves specifying
`x` twice
- Wrappers make it easier to debug (by breakpoint or via debug checks)
- Iterating through streams is very verbose if you want to catch all the
error messages (in some places of the code it looks like I didn't
bother, and in other places it looks like I assumed that the result of
`get_last_error` was non-NULL, which is not true). The wrappers let you
do `NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(stream, out,
error))` which better integrates into the nanoarrow error handling
pattern.

The changes are:

- Release functions were added to nanoarrow_types.h
- A few error handling functions were moved to nanoarrow_types.h since
they were used in the wrappers
- All function pointer calls I could find were updated to use the
wrapper functions
  • Loading branch information
paleolimbot authored Dec 19, 2023
1 parent 3e02822 commit 248b498
Show file tree
Hide file tree
Showing 30 changed files with 544 additions and 405 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ jobs:
sudo ldconfig
mkdir build
cd build
cmake .. -DNANOARROW_BUILD_TESTS=ON ${{ matrix.config.cmake_args }}
cmake .. -DNANOARROW_BUILD_TESTS=ON -DCMAKE_POSITION_INDEPENDENT_CODE=ON \
${{ matrix.config.cmake_args }}
cmake --build .
- name: Check for non-namespaced symbols in namespaced build
Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ NANOARROW_RETURN_NOT_OK(ArrowSchemaInitFromType(&schema, NANOARROW_TYPE_STRING))
// or it will leak.
int code = ArrowArrayInitFromSchema(&array, &schema, NULL);
if (code != NANOARROW_OK) {
schema.release(&schema);
ArrowSchemaRelease(&schema);
return code;
}
```
Expand Down
10 changes: 5 additions & 5 deletions extensions/nanoarrow_device/src/nanoarrow/nanoarrow_device.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ static int ArrowDeviceBasicArrayStreamGetNext(struct ArrowDeviceArrayStream* arr
private_data->naive_stream.get_next(&private_data->naive_stream, &tmp));
int result = ArrowDeviceArrayInit(private_data->device, device_array, &tmp);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowArrayRelease(&tmp);
return result;
}

Expand All @@ -244,7 +244,7 @@ static void ArrowDeviceBasicArrayStreamRelease(
struct ArrowDeviceArrayStream* array_stream) {
struct ArrowBasicDeviceArrayStreamPrivate* private_data =
(struct ArrowBasicDeviceArrayStreamPrivate*)array_stream->private_data;
private_data->naive_stream.release(&private_data->naive_stream);
ArrowArrayStreamRelease(&private_data->naive_stream);
ArrowFree(private_data);
array_stream->release = NULL;
}
Expand Down Expand Up @@ -439,19 +439,19 @@ ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src,
int result =
ArrowDeviceArrayViewCopyInternal(src->device, &src->array_view, device_dst, &tmp);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowArrayRelease(&tmp);
return result;
}

result = ArrowArrayFinishBuilding(&tmp, NANOARROW_VALIDATION_LEVEL_MINIMAL, NULL);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowArrayRelease(&tmp);
return result;
}

result = ArrowDeviceArrayInit(device_dst, dst, &tmp);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowArrayRelease(&tmp);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static inline void move_pointer(struct ArrowDeviceArray* src,

static inline void release_pointer(struct ArrowDeviceArray* data) {
if (data->array.release != nullptr) {
data->array.release(&data->array);
ArrowArrayRelease(&data->array);
}

data->sync_event = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static void ArrowDeviceCudaArrayRelease(struct ArrowArray* array) {
struct ArrowDeviceCudaArrayPrivate* private_data =
(struct ArrowDeviceCudaArrayPrivate*)array->private_data;
cudaEventDestroy(private_data->sync_event);
private_data->parent.release(&private_data->parent);
ArrowArrayRelease(&private_data->parent);
ArrowFree(private_data);
array->release = NULL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) {
ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array, gpu, &device_array2), ENOTSUP);
ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, gpu, &device_array2),
NANOARROW_OK);
device_array.array.release(&device_array.array);
ArrowArrayRelease(&device_array.array);

ASSERT_NE(device_array2.array.release, nullptr);
ASSERT_EQ(device_array2.device_id, gpu->device_id);
Expand All @@ -201,7 +201,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) {
} else {
ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, cpu, &device_array),
NANOARROW_OK);
device_array2.array.release(&device_array2.array);
ArrowArrayRelease(&device_array2.array);
}

ASSERT_NE(device_array.array.release, nullptr);
Expand All @@ -213,7 +213,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) {
EXPECT_EQ(memcmp(device_array_view.array_view.buffer_views[2].data.data, "abcdefg", 7),
0);

device_array.array.release(&device_array.array);
ArrowArrayRelease(&device_array.array);
ArrowDeviceArrayViewReset(&device_array_view);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static void ArrowDeviceMetalArrayRelease(struct ArrowArray* array) {
struct ArrowDeviceMetalArrayPrivate* private_data =
(struct ArrowDeviceMetalArrayPrivate*)array->private_data;
private_data->event->release();
private_data->parent.release(&private_data->parent);
ArrowArrayRelease(&private_data->parent);
ArrowFree(private_data);
array->release = NULL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceMetalArrayViewString) {
ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array, metal, &device_array2), ENOTSUP);
ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, metal, &device_array2),
NANOARROW_OK);
device_array.array.release(&device_array.array);
ArrowArrayRelease(&device_array.array);

ASSERT_NE(device_array2.array.release, nullptr);
ASSERT_EQ(device_array2.device_id, metal->device_id);
Expand All @@ -261,7 +261,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceMetalArrayViewString) {
EXPECT_EQ(memcmp(device_array_view.array_view.buffer_views[2].data.data, "abcdefg", 7),
0);

device_array.array.release(&device_array.array);
ArrowArrayRelease(&device_array.array);
ArrowDeviceArrayViewReset(&device_array_view);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCpuArrayViewString) {
ASSERT_NE(device_array2.array.release, nullptr);
ASSERT_EQ(device_array2.device_id, cpu->device_id);

device_array2.array.release(&device_array2.array);
ArrowArrayRelease(&device_array2.array);
ArrowDeviceArrayViewReset(&device_array_view);
}

Expand Down
30 changes: 11 additions & 19 deletions extensions/nanoarrow_ipc/src/apps/dump_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,11 @@ int main(int argc, char* argv[]) {
clock_t begin = clock();

struct ArrowSchema schema;
result = stream.get_schema(&stream, &schema);
result = ArrowArrayStreamGetSchema(&stream, &schema, NULL);
if (result != NANOARROW_OK) {
const char* message = stream.get_last_error(&stream);
if (message == NULL) {
message = "";
}

fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n", result, message);
stream.release(&stream);
fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n", result,
ArrowArrayStreamGetLastError(&stream));
ArrowArrayStreamRelease(&stream);
return 1;
}

Expand All @@ -96,7 +92,7 @@ int main(int argc, char* argv[]) {
char schema_tmp[8096];
memset(schema_tmp, 0, sizeof(schema_tmp));
dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp));
schema.release(&schema);
ArrowSchemaRelease(&schema);

struct ArrowArray array;
array.release = NULL;
Expand All @@ -106,22 +102,18 @@ int main(int argc, char* argv[]) {
begin = clock();

while (1) {
result = stream.get_next(&stream, &array);
result = ArrowArrayStreamGetNext(&stream, &array, NULL);
if (result != NANOARROW_OK) {
const char* message = stream.get_last_error(&stream);
if (message == NULL) {
message = "";
}

fprintf(stderr, "stream.get_next() returned %d with error '%s'\n", result, message);
stream.release(&stream);
fprintf(stderr, "stream.get_next() returned %d with error '%s'\n", result,
ArrowArrayStreamGetLastError(&stream));
ArrowArrayStreamRelease(&stream);
return 1;
}

if (array.release != NULL) {
row_count += array.length;
batch_count++;
array.release(&array);
ArrowArrayRelease(&array);
} else {
break;
}
Expand All @@ -132,7 +124,7 @@ int main(int argc, char* argv[]) {
fprintf(stdout, "Read %ld rows in %ld batch(es) <%.06f seconds>\n", (long)row_count,
(long)batch_count, elapsed);

stream.release(&stream);
ArrowArrayStreamRelease(&stream);
fclose(file_ptr);
return 0;
}
14 changes: 7 additions & 7 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) {
ArrowArrayViewReset(&private_data->array_view);

if (private_data->array.release != NULL) {
private_data->array.release(&private_data->array);
ArrowArrayRelease(&private_data->array);
}

if (private_data->fields != NULL) {
Expand Down Expand Up @@ -1117,21 +1117,21 @@ ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder,
ArrowSchemaInit(&tmp);
int result = ArrowSchemaSetTypeStruct(&tmp, n_fields);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowSchemaRelease(&tmp);
ArrowErrorSet(error, "Failed to allocate struct schema with %ld children",
(long)n_fields);
return result;
}

result = ArrowIpcDecoderSetChildren(&tmp, fields, error);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowSchemaRelease(&tmp);
return result;
}

result = ArrowIpcDecoderSetMetadata(&tmp, ns(Schema_custom_metadata(schema)), error);
if (result != NANOARROW_OK) {
tmp.release(&tmp);
ArrowSchemaRelease(&tmp);
return result;
}

Expand Down Expand Up @@ -1178,7 +1178,7 @@ ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder,
private_data->n_fields = 0;
ArrowArrayViewReset(&private_data->array_view);
if (private_data->array.release != NULL) {
private_data->array.release(&private_data->array);
ArrowArrayRelease(&private_data->array);
}
if (private_data->fields != NULL) {
ArrowFree(private_data->fields);
Expand Down Expand Up @@ -1675,7 +1675,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
int result =
ArrowIpcDecoderDecodeArrayInternal(decoder, i, &temp, validation_level, error);
if (result != NANOARROW_OK && temp.release != NULL) {
temp.release(&temp);
ArrowArrayRelease(&temp);
} else if (result != NANOARROW_OK) {
return result;
}
Expand All @@ -1699,7 +1699,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared(
int result =
ArrowIpcDecoderDecodeArrayInternal(decoder, i, &temp, validation_level, error);
if (result != NANOARROW_OK && temp.release != NULL) {
temp.release(&temp);
ArrowArrayRelease(&temp);
} else if (result != NANOARROW_OK) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
EXPECT_EQ(schema.children[0]->flags, ARROW_FLAG_NULLABLE);
EXPECT_STREQ(schema.children[0]->format, "i");

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand Down Expand Up @@ -351,7 +351,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
memcmp(array.children[0]->buffers[1], one_two_three_le, sizeof(one_two_three_le)),
0);

array.release(&array);
ArrowArrayRelease(&array);

// Check field extract
EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
Expand All @@ -362,7 +362,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
EXPECT_EQ(array.null_count, 0);
EXPECT_EQ(memcmp(array.buffers[1], one_two_three_le, sizeof(one_two_three_le)), 0);

array.release(&array);
ArrowArrayRelease(&array);

// Field extract should fail if compression was set
decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD;
Expand All @@ -389,7 +389,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
EXPECT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, &error), EINVAL);
EXPECT_STREQ(error.message, "Expected 0 field nodes in message but found 1");

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand Down Expand Up @@ -421,7 +421,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSetSchema) {
EXPECT_EQ(decoder_private->n_fields, 2);
EXPECT_EQ(decoder_private->n_buffers, 3);

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand All @@ -442,7 +442,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSetSchemaErrors) {
EXPECT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, &error), EINVAL);
EXPECT_STREQ(error.message, "schema must be a struct type");

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand Down Expand Up @@ -531,7 +531,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) {
memcmp(array.children[0]->buffers[1], one_two_three_le, sizeof(one_two_three_le)),
0);

array.release(&array);
ArrowArrayRelease(&array);

// Check field extract
EXPECT_EQ(ArrowIpcDecoderDecodeArrayFromShared(
Expand All @@ -546,8 +546,8 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) {
EXPECT_EQ(array.null_count, 0);
EXPECT_EQ(memcmp(array.buffers[1], one_two_three_le, sizeof(one_two_three_le)), 0);

array.release(&array);
schema.release(&schema);
ArrowArrayRelease(&array);
ArrowSchemaRelease(&schema);
ArrowBufferReset(&body);
ArrowIpcDecoderReset(&decoder);
}
Expand Down Expand Up @@ -596,15 +596,15 @@ TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) {
// Clean up
ArrowIpcSharedBufferReset(&shared);
ArrowIpcDecoderReset(&decoder);
schema.release(&schema);
ArrowSchemaRelease(&schema);

// Access the data and release from another thread
std::thread threads[10];
for (int i = 0; i < 10; i++) {
threads[i] = std::thread([&arrays, i, &one_two_three_le] {
memcmp(arrays[i].children[0]->buffers[1], one_two_three_le,
sizeof(one_two_three_le));
arrays[i].release(arrays + i);
ArrowArrayRelease(arrays + i);
});
}

Expand Down Expand Up @@ -677,7 +677,7 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), nulls->ToString());
EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand Down Expand Up @@ -928,7 +928,7 @@ TEST_P(ArrowTypeIdParameterizedTestFixture, NanoarrowIpcDecodeSwapEndian) {
array_view->buffer_views[1].size_bytes),
0);

schema.release(&schema);
ArrowSchemaRelease(&schema);
ArrowIpcDecoderReset(&decoder);
}

Expand Down
Loading

0 comments on commit 248b498

Please sign in to comment.