From 46caf6253026fda893f067fc3a6cbf92394f251c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 20 Mar 2023 15:38:48 -0300 Subject: [PATCH 01/23] rename nanoarrow_ipc.c to nanoarrow_ipc_decoder.c --- extensions/nanoarrow_ipc/CMakeLists.txt | 12 ++++++------ .../{nanoarrow_ipc.c => nanoarrow_ipc_decoder.c} | 0 ...row_ipc_test.cc => nanoarrow_ipc_decoder_test.cc} | 0 3 files changed, 6 insertions(+), 6 deletions(-) rename extensions/nanoarrow_ipc/src/nanoarrow/{nanoarrow_ipc.c => nanoarrow_ipc_decoder.c} (100%) rename extensions/nanoarrow_ipc/src/nanoarrow/{nanoarrow_ipc_test.cc => nanoarrow_ipc_decoder_test.cc} (100%) diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt b/extensions/nanoarrow_ipc/CMakeLists.txt index 38517f767..3b8a3163b 100644 --- a/extensions/nanoarrow_ipc/CMakeLists.txt +++ b/extensions/nanoarrow_ipc/CMakeLists.txt @@ -91,11 +91,11 @@ if (NANOARROW_IPC_BUNDLE) file(READ src/nanoarrow/nanoarrow_ipc.h SRC_FILE_CONTENTS) file(WRITE ${NANOARROW_IPC_H_TEMP} "${SRC_FILE_CONTENTS}") - # combine flatcc-generated headers and nanoarrow_ipc.c + # combine flatcc-generated headers and nanoarrow_ipc sources set(NANOARROW_IPC_C_TEMP ${CMAKE_BINARY_DIR}/amalgamation/nanoarrow/nanoarrow_ipc.c) file(READ src/nanoarrow/nanoarrow_ipc_flatcc_generated.h SRC_FILE_CONTENTS) file(WRITE ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}") - file(READ src/nanoarrow/nanoarrow_ipc.c SRC_FILE_CONTENTS) + file(READ src/nanoarrow/nanoarrow_ipc_decoder.c SRC_FILE_CONTENTS) file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}") # remove the include for the generated files in the bundled version @@ -143,7 +143,7 @@ if (NANOARROW_IPC_BUNDLE) install(DIRECTORY thirdparty/flatcc/include/flatcc DESTINATION ".") else() # This is a normal CMake build that builds + installs some includes and a static lib - add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc.c) + add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc_decoder.c) target_link_libraries(nanoarrow_ipc PRIVATE flatccrt) target_include_directories(nanoarrow_ipc PUBLIC @@ -185,7 +185,7 @@ if (NANOARROW_IPC_BUILD_TESTS) enable_testing() - add_executable(nanoarrow_ipc_test src/nanoarrow/nanoarrow_ipc_test.cc) + add_executable(nanoarrow_ipc_decoder_test src/nanoarrow/nanoarrow_ipc_decoder_test.cc) if(NANOARROW_IPC_CODE_COVERAGE) target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage) @@ -193,8 +193,8 @@ if (NANOARROW_IPC_BUILD_TESTS) target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config) endif() - target_link_libraries(nanoarrow_ipc_test nanoarrow_ipc nanoarrow arrow_shared gtest_main) + target_link_libraries(nanoarrow_ipc_decoder_test nanoarrow_ipc nanoarrow arrow_shared gtest_main) include(GoogleTest) - gtest_discover_tests(nanoarrow_ipc_test) + gtest_discover_tests(nanoarrow_ipc_decoder_test) endif() diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c similarity index 100% rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc similarity index 100% rename from extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc rename to extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc From 2c0fc34bd678707ccacadbe0b182ac3a987fea4e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 20 Mar 2023 15:53:23 -0300 Subject: [PATCH 02/23] add reader source file/test --- extensions/nanoarrow_ipc/CMakeLists.txt | 8 +++++- .../src/nanoarrow/nanoarrow_ipc.h | 9 ++++++ .../src/nanoarrow/nanoarrow_ipc_reader.c | 28 +++++++++++++++++++ .../nanoarrow/nanoarrow_ipc_reader_test.cc | 24 ++++++++++++++++ 4 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c create mode 100644 extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt b/extensions/nanoarrow_ipc/CMakeLists.txt index 3b8a3163b..3f750f374 100644 --- a/extensions/nanoarrow_ipc/CMakeLists.txt +++ b/extensions/nanoarrow_ipc/CMakeLists.txt @@ -97,6 +97,8 @@ if (NANOARROW_IPC_BUNDLE) file(WRITE ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}") file(READ src/nanoarrow/nanoarrow_ipc_decoder.c SRC_FILE_CONTENTS) file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}") + file(READ src/nanoarrow/nanoarrow_ipc_reader.c SRC_FILE_CONTENTS) + file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}") # remove the include for the generated files in the bundled version file(READ ${NANOARROW_IPC_C_TEMP} SRC_FILE_CONTENTS) @@ -143,7 +145,9 @@ if (NANOARROW_IPC_BUNDLE) install(DIRECTORY thirdparty/flatcc/include/flatcc DESTINATION ".") else() # This is a normal CMake build that builds + installs some includes and a static lib - add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc_decoder.c) + add_library(nanoarrow_ipc + src/nanoarrow/nanoarrow_ipc_decoder.c + src/nanoarrow/nanoarrow_ipc_reader.c) target_link_libraries(nanoarrow_ipc PRIVATE flatccrt) target_include_directories(nanoarrow_ipc PUBLIC @@ -186,6 +190,7 @@ if (NANOARROW_IPC_BUILD_TESTS) enable_testing() add_executable(nanoarrow_ipc_decoder_test src/nanoarrow/nanoarrow_ipc_decoder_test.cc) + add_executable(nanoarrow_ipc_reader_test src/nanoarrow/nanoarrow_ipc_reader_test.cc) if(NANOARROW_IPC_CODE_COVERAGE) target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage) @@ -194,6 +199,7 @@ if (NANOARROW_IPC_BUILD_TESTS) endif() target_link_libraries(nanoarrow_ipc_decoder_test nanoarrow_ipc nanoarrow arrow_shared gtest_main) + target_link_libraries(nanoarrow_ipc_reader_test nanoarrow_ipc nanoarrow gtest_main) include(GoogleTest) gtest_discover_tests(nanoarrow_ipc_decoder_test) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 155150ede..2e1b7915a 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -219,6 +219,15 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, struct ArrowArray* out, struct ArrowError* error); +struct ArrowIpcInputStream { + ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, void* buf, int64_t n_bytes); + void (*release)(struct ArrowIpcInputStream* stream); + void* private_data; +}; + +ArrowErrorCode ArrowIpcInputStreamInitLiteral(struct ArrowIpcInputStream* stream, + struct ArrowBuffer* input); + #ifdef __cplusplus } #endif diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c new file mode 100644 index 000000000..c2663f4eb --- /dev/null +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "nanoarrow.h" +#include "nanoarrow_ipc.h" + +ArrowErrorCode ArrowIpcInputStreamInitLiteral(struct ArrowIpcInputStream* stream, + struct ArrowBuffer* input) { + return ENOTSUP; +} diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc new file mode 100644 index 000000000..9f1168f73 --- /dev/null +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "nanoarrow_ipc.h" + +TEST(NanoarrowIpcReader, LiteralStream) { + EXPECT_EQ(1, 1); +} From 3b4935f19c0a7387b614d5fa94c04154b676ee65 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 20 Mar 2023 16:17:40 -0300 Subject: [PATCH 03/23] stub literal input stream impl --- .../src/nanoarrow/nanoarrow_ipc.h | 4 +- .../src/nanoarrow/nanoarrow_ipc_reader.c | 55 ++++++++++++++++++- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 2e1b7915a..5ee8d52d4 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -220,7 +220,9 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, struct ArrowError* error); struct ArrowIpcInputStream { - ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, void* buf, int64_t n_bytes); + ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, void* buf, + int64_t buf_size_bytes, int64_t* size_read_out, + struct ArrowError* error); void (*release)(struct ArrowIpcInputStream* stream); void* private_data; }; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index c2663f4eb..2afd1a158 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -22,7 +22,60 @@ #include "nanoarrow.h" #include "nanoarrow_ipc.h" +struct ArrowIpcInputStreamLiteralPrivate { + struct ArrowBuffer input; + int64_t cursor_bytes; +}; + +static ArrowErrorCode ArrowIpcInputStreamLiteralRead(struct ArrowIpcInputStream* stream, + void* buf, int64_t buf_size_bytes, + int64_t* size_read_out, + struct ArrowError* error) { + if (buf_size_bytes == 0) { + return NANOARROW_OK; + } + + struct ArrowIpcInputStreamLiteralPrivate* private_data = + (struct ArrowIpcInputStreamLiteralPrivate*)stream->private_data; + int64_t bytes_remaining = private_data->input.size_bytes - private_data->cursor_bytes; + int64_t bytes_to_read; + if (bytes_remaining > buf_size_bytes) { + bytes_to_read = buf_size_bytes; + } else { + bytes_to_read = bytes_remaining; + } + + if (bytes_to_read > 0) { + memcpy(buf, private_data->input.data + private_data->cursor_bytes, bytes_to_read); + } + + *size_read_out = bytes_to_read; + private_data->cursor_bytes += bytes_to_read; + return NANOARROW_OK; +} + +static void ArrowIpcInputStreamLiteralRelease(struct ArrowIpcInputStream* stream) { + struct ArrowIpcInputStreamLiteralPrivate* private_data = + (struct ArrowIpcInputStreamLiteralPrivate*)stream->private_data; + ArrowBufferReset(&private_data->input); + ArrowFree(private_data); + stream->release = NULL; +} + ArrowErrorCode ArrowIpcInputStreamInitLiteral(struct ArrowIpcInputStream* stream, struct ArrowBuffer* input) { - return ENOTSUP; + struct ArrowIpcInputStreamLiteralPrivate* private_data = + (struct ArrowIpcInputStreamLiteralPrivate*)ArrowMalloc( + sizeof(struct ArrowIpcInputStreamLiteralPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + ArrowBufferMove(input, &private_data->input); + private_data->cursor_bytes = 0; + stream->read = &ArrowIpcInputStreamLiteralRead; + stream->release = &ArrowIpcInputStreamLiteralRelease; + stream->private_data = private_data; + + return NANOARROW_OK; } From a29b24d26208041e0ef5e7ef5dfc048538efa240 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 10:11:19 -0300 Subject: [PATCH 04/23] test the literal input --- .../nanoarrow/nanoarrow_ipc_reader_test.cc | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 9f1168f73..535b202d4 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -19,6 +19,39 @@ #include "nanoarrow_ipc.h" -TEST(NanoarrowIpcReader, LiteralStream) { - EXPECT_EQ(1, 1); +TEST(NanoarrowIpcReader, InputStreamLiteral) { + uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; + struct ArrowBuffer input; + ArrowBufferInit(&input); + ASSERT_EQ(ArrowBufferAppend(&input, input_data, sizeof(input_data)), NANOARROW_OK); + + struct ArrowIpcInputStream stream; + uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff}; + int64_t size_read_bytes; + + ASSERT_EQ(ArrowIpcInputStreamInitLiteral(&stream, &input), NANOARROW_OK); + EXPECT_EQ(input.data, nullptr); + + EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 2); + uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff}; + EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 2); + uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff}; + EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 1); + uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05}; + EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 0); + + EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 0); + + stream.release(&stream); } From a15a7a9ea706ffeb78fbaa0d132387e9920674a9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 10:11:56 -0300 Subject: [PATCH 05/23] maybe fix symbol error --- extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 5ee8d52d4..972227c28 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -39,6 +39,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema) #define ArrowIpcDecoderSetEndianness \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness) +#define ArrowIpcInputStreamInitLiteral \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitLiteral) #endif From 00a13c410f401cacd3c1c6e5be1e05303239ee2a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 10:26:21 -0300 Subject: [PATCH 06/23] literal -> buffer --- .../src/nanoarrow/nanoarrow_ipc.h | 9 ++--- .../src/nanoarrow/nanoarrow_ipc_reader.c | 34 +++++++++---------- .../nanoarrow/nanoarrow_ipc_reader_test.cc | 13 ++++--- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 972227c28..ace2a517a 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -39,8 +39,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema) #define ArrowIpcDecoderSetEndianness \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness) -#define ArrowIpcInputStreamInitLiteral \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitLiteral) +#define ArrowIpcInputStreamInitBuffer \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer) #endif @@ -229,8 +229,9 @@ struct ArrowIpcInputStream { void* private_data; }; -ArrowErrorCode ArrowIpcInputStreamInitLiteral(struct ArrowIpcInputStream* stream, - struct ArrowBuffer* input); +/// \brief Create an input stream from an ArrowBuffer +ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, + struct ArrowBuffer* input); #ifdef __cplusplus } diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 2afd1a158..46562b581 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -22,21 +22,21 @@ #include "nanoarrow.h" #include "nanoarrow_ipc.h" -struct ArrowIpcInputStreamLiteralPrivate { +struct ArrowIpcInputStreamBufferPrivate { struct ArrowBuffer input; int64_t cursor_bytes; }; -static ArrowErrorCode ArrowIpcInputStreamLiteralRead(struct ArrowIpcInputStream* stream, - void* buf, int64_t buf_size_bytes, - int64_t* size_read_out, - struct ArrowError* error) { +static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct ArrowIpcInputStream* stream, + void* buf, int64_t buf_size_bytes, + int64_t* size_read_out, + struct ArrowError* error) { if (buf_size_bytes == 0) { return NANOARROW_OK; } - struct ArrowIpcInputStreamLiteralPrivate* private_data = - (struct ArrowIpcInputStreamLiteralPrivate*)stream->private_data; + struct ArrowIpcInputStreamBufferPrivate* private_data = + (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data; int64_t bytes_remaining = private_data->input.size_bytes - private_data->cursor_bytes; int64_t bytes_to_read; if (bytes_remaining > buf_size_bytes) { @@ -54,27 +54,27 @@ static ArrowErrorCode ArrowIpcInputStreamLiteralRead(struct ArrowIpcInputStream* return NANOARROW_OK; } -static void ArrowIpcInputStreamLiteralRelease(struct ArrowIpcInputStream* stream) { - struct ArrowIpcInputStreamLiteralPrivate* private_data = - (struct ArrowIpcInputStreamLiteralPrivate*)stream->private_data; +static void ArrowIpcInputStreamBufferRelease(struct ArrowIpcInputStream* stream) { + struct ArrowIpcInputStreamBufferPrivate* private_data = + (struct ArrowIpcInputStreamBufferPrivate*)stream->private_data; ArrowBufferReset(&private_data->input); ArrowFree(private_data); stream->release = NULL; } -ArrowErrorCode ArrowIpcInputStreamInitLiteral(struct ArrowIpcInputStream* stream, - struct ArrowBuffer* input) { - struct ArrowIpcInputStreamLiteralPrivate* private_data = - (struct ArrowIpcInputStreamLiteralPrivate*)ArrowMalloc( - sizeof(struct ArrowIpcInputStreamLiteralPrivate)); +ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, + struct ArrowBuffer* input) { + struct ArrowIpcInputStreamBufferPrivate* private_data = + (struct ArrowIpcInputStreamBufferPrivate*)ArrowMalloc( + sizeof(struct ArrowIpcInputStreamBufferPrivate)); if (private_data == NULL) { return ENOMEM; } ArrowBufferMove(input, &private_data->input); private_data->cursor_bytes = 0; - stream->read = &ArrowIpcInputStreamLiteralRead; - stream->release = &ArrowIpcInputStreamLiteralRelease; + stream->read = &ArrowIpcInputStreamBufferRead; + stream->release = &ArrowIpcInputStreamBufferRelease; stream->private_data = private_data; return NANOARROW_OK; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 535b202d4..4c5b97035 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -19,7 +19,7 @@ #include "nanoarrow_ipc.h" -TEST(NanoarrowIpcReader, InputStreamLiteral) { +TEST(NanoarrowIpcReader, InputStreamBuffer) { uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; struct ArrowBuffer input; ArrowBufferInit(&input); @@ -29,20 +29,23 @@ TEST(NanoarrowIpcReader, InputStreamLiteral) { uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff}; int64_t size_read_bytes; - ASSERT_EQ(ArrowIpcInputStreamInitLiteral(&stream, &input), NANOARROW_OK); + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&stream, &input), NANOARROW_OK); EXPECT_EQ(input.data, nullptr); - EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr), + NANOARROW_OK); EXPECT_EQ(size_read_bytes, 2); uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff}; EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0); - EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, nullptr), + NANOARROW_OK); EXPECT_EQ(size_read_bytes, 2); uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff}; EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0); - EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, nullptr), + NANOARROW_OK); EXPECT_EQ(size_read_bytes, 1); uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05}; EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0); From 10e2cc0ba544baaca8f4fad3e60af81327237827 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 11:08:20 -0300 Subject: [PATCH 07/23] document the struct --- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index ace2a517a..c1d1a4264 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -221,11 +221,24 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, struct ArrowArray* out, struct ArrowError* error); +/// \brief An user-extensible input data source struct ArrowIpcInputStream { + /// \brief Read up to buf_size_bytes from stream into buf + /// + /// The actual number of bytes read is placed in the value pointed to by + /// size_read_out. Returns NANOARROW_OK on success. ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, void* buf, int64_t buf_size_bytes, int64_t* size_read_out, struct ArrowError* error); + + /// \brief Release the stream and any resources it may be holding + /// + /// Release callback implementations must set the release member to NULL. + /// Callers must check that the release callback is not NULL before calling + /// read() or release(). void (*release)(struct ArrowIpcInputStream* stream); + + /// \brief Private implementation-defined data void* private_data; }; From 15dbc327752d3cbb266e10145947fc2405a6f9ef Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 15:25:48 -0300 Subject: [PATCH 08/23] sketch reader --- .../src/nanoarrow/nanoarrow_ipc.h | 8 + .../src/nanoarrow/nanoarrow_ipc_reader.c | 244 ++++++++++++++++++ 2 files changed, 252 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index c1d1a4264..be693fc53 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -246,6 +246,14 @@ struct ArrowIpcInputStream { ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, struct ArrowBuffer* input); +struct ArrowIpcArrayStreamReaderOptions { + int64_t field_index; +}; + +ArrowErrorCode ArrowIpcArrayStreamReaderInit(struct ArrowArrayStream* out, + struct ArrowIpcInputStream* input_stream, + struct ArrowIpcArrayStreamReaderOptions options); + #ifdef __cplusplus } #endif diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 46562b581..330f86087 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -79,3 +79,247 @@ ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, return NANOARROW_OK; } + +struct ArrowIpcArrayStreamReaderPrivate { + struct ArrowIpcInputStream input; + struct ArrowIpcDecoder decoder; + struct ArrowSchema out_schema; + int64_t field_index; + struct ArrowBuffer header; + struct ArrowBuffer body; + struct ArrowError error; +}; + +static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; + + if (private_data->input.release != NULL) { + private_data->input.release(&private_data->input); + } + + ArrowIpcDecoderReset(&private_data->decoder); + + if (private_data->out_schema.release != NULL) { + private_data->out_schema.release(&private_data->out_schema); + } + + ArrowBufferReset(&private_data->header); + ArrowBufferReset(&private_data->body); + + ArrowFree(private_data); + stream->release = NULL; +} + +#define NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE 65536 + +static int ArrowIpcArrayStreamReaderRead( + struct ArrowIpcArrayStreamReaderPrivate* private_data, struct ArrowBuffer* buffer, + int64_t* bytes_read) { + NANOARROW_RETURN_NOT_OK(ArrowBufferReserve( + &private_data->header, NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE)); + + NANOARROW_RETURN_NOT_OK(private_data->input.read( + &private_data->input, buffer->data + buffer->size_bytes, + NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE, bytes_read, &private_data->error)); + + buffer->size_bytes += *bytes_read; + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderNextHeader( + struct ArrowIpcArrayStreamReaderPrivate* private_data) { + private_data->header.size_bytes = 0; + struct ArrowBufferView input_view; + + int64_t bytes_read = 0; + int result; + do { + NANOARROW_RETURN_NOT_OK( + ArrowIpcArrayStreamReaderRead(private_data, &private_data->header, &bytes_read)); + input_view.data.data = private_data->header.data; + input_view.size_bytes = private_data->header.size_bytes; + result = ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view, + &private_data->error); + } while (result == ESPIPE || bytes_read == 0); + + if (result != NANOARROW_OK && bytes_read == 0) { + return ENODATA; + } + + NANOARROW_RETURN_NOT_OK(result); + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, input_view, + &private_data->error)); + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderNextBody( + struct ArrowIpcArrayStreamReaderPrivate* private_data) { + int64_t bytes_read; + int64_t bytes_to_read = private_data->decoder.body_size_bytes; + + // Reserve space in the body buffer + private_data->body.size_bytes = 0; + NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body, bytes_to_read)); + + // Copy any body bytes from the header buffer + int64_t extra_bytes_in_header = + private_data->header.size_bytes - private_data->decoder.header_size_bytes; + memcpy( + private_data->body.data, + private_data->header.data + private_data->header.size_bytes - extra_bytes_in_header, + extra_bytes_in_header); + + // Read the rest of the body buffer + NANOARROW_RETURN_NOT_OK(private_data->input.read( + &private_data->input, private_data->body.data + extra_bytes_in_header, + bytes_to_read - extra_bytes_in_header, &bytes_read, &private_data->error)); + + // Set the size of the buffer + private_data->body.size_bytes = bytes_to_read; + + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded( + struct ArrowIpcArrayStreamReaderPrivate* private_data) { + if (private_data->out_schema.release != NULL) { + return NANOARROW_OK; + } + + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(private_data)); + + // Error if this isn't a schema message + if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) { + ArrowErrorSet(&private_data->error, + "Unexpected message type at start of input (expected Schema)"); + return EINVAL; + } + + // ...or if it uses features we don't support + if (private_data->decoder.feature_flags & NANOARROW_IPC_FEATURE_COMPRESSED_BODY) { + ArrowErrorSet(&private_data->error, + "This stream uses unsupported feature COMPRESSED_BODY"); + return EINVAL; + } + + if (private_data->decoder.feature_flags & + NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT) { + ArrowErrorSet(&private_data->error, + "This stream uses unsupported feature DICTIONARY_REPLACEMENT"); + return EINVAL; + } + + // Notify the decoder of buffer endianness + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetEndianness(&private_data->decoder, + private_data->decoder.endianness)); + + struct ArrowSchema tmp; + NANOARROW_RETURN_NOT_OK( + ArrowIpcDecoderDecodeSchema(&private_data->decoder, &tmp, &private_data->error)); + + // Only support "read the whole thing" for now + if (private_data->field_index != -1) { + tmp.release(&tmp); + ArrowErrorSet(&private_data->error, "Field index != -1 is not yet supported"); + return ENOTSUP; + } + + // Notify the decoder of the schema for forthcoming messages + int result = + ArrowIpcDecoderSetSchema(&private_data->decoder, &tmp, &private_data->error); + if (result != NANOARROW_OK) { + tmp.release(&tmp); + return result; + } + + ArrowSchemaMove(&tmp, &private_data->out_schema); + return NANOARROW_OK; +} + +static int ArrowIpcArrayStreamReaderGetSchema(struct ArrowArrayStream* stream, + struct ArrowSchema* out) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; + private_data->error.message[0] = '\0'; + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); + return ArrowSchemaDeepCopy(&private_data->out_schema, out); +} + +static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, + struct ArrowArray* out) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; + // Check if we are all done + if (private_data->input.release == NULL) { + out->release = NULL; + return NANOARROW_OK; + } + + private_data->error.message[0] = '\0'; + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); + + // Read + decode the next header + int result = ArrowIpcArrayStreamReaderNextHeader(private_data); + if (result == ENODATA) { + // If the stream is finished, release the input + private_data->input.release(&private_data->input); + out->release = NULL; + return NANOARROW_OK; + } + + // Make sure we have a RecordBatch message + if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) { + ArrowErrorSet(&private_data->error, "Unexpected message type (expected RecordBatch)"); + return EINVAL; + } + + // Read in the body + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); + + struct ArrowBufferView body_view; + body_view.data.data = private_data->body.data + private_data->body.size_bytes; + body_view.size_bytes = 0; + + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, body_view, + private_data->field_index, out, + &private_data->error)); + + return NANOARROW_OK; +} + +static const char* ArrowIpcArrayStreamReaderGetLastError( + struct ArrowArrayStream* stream) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)stream->private_data; + return private_data->error.message; +} + +ArrowErrorCode ArrowIpcArrayStreamReaderInit( + struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream, + struct ArrowIpcArrayStreamReaderOptions options) { + struct ArrowIpcArrayStreamReaderPrivate* private_data = + (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc( + sizeof(struct ArrowIpcArrayStreamReaderPrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + int result = ArrowIpcDecoderInit(&private_data->decoder); + if (result != NANOARROW_OK) { + ArrowFree(private_data); + return result; + } + + ArrowBufferInit(&private_data->header); + ArrowBufferInit(&private_data->body); + private_data->out_schema.release = NULL; + + out->private_data = private_data; + out->get_schema = &ArrowIpcArrayStreamReaderGetSchema; + out->get_next = &ArrowIpcArrayStreamReaderGetNext; + out->get_last_error = &ArrowIpcArrayStreamReaderGetLastError; + out->release = &ArrowIpcArrayStreamReaderRelease; + + return NANOARROW_OK; +} From 6aab96616c30d881e25b5aaa14f6dc4a714faed4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 15:35:28 -0300 Subject: [PATCH 09/23] basic lifecycle test --- .../nanoarrow/nanoarrow_ipc_reader_test.cc | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 4c5b97035..753ae63bb 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -19,6 +19,42 @@ #include "nanoarrow_ipc.h" +static uint8_t kSimpleSchema[] = { + 0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x84, 0xff, + 0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, + 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 0x18, 0x00, + 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x14, 0x00, + 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 0x70, 0x00, + 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, + 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + +static uint8_t kSimpleRecordBatch[] = { + 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + TEST(NanoarrowIpcReader, InputStreamBuffer) { uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; struct ArrowBuffer input; @@ -58,3 +94,21 @@ TEST(NanoarrowIpcReader, InputStreamBuffer) { stream.release(&stream); } + +TEST(NanoarrowIpcReader, StreamReaderBasic) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + ASSERT_EQ( + ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, sizeof(kSimpleRecordBatch)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, {-1}), NANOARROW_OK); + + stream.release(&stream); +} From b5c4f52678934f487a9f12430ae4e805692500f3 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 15:43:03 -0300 Subject: [PATCH 10/23] fix segfault, add more tests --- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 2 ++ .../src/nanoarrow/nanoarrow_ipc_reader.c | 7 +++++++ .../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 14 ++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index be693fc53..a7949bc99 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -242,6 +242,8 @@ struct ArrowIpcInputStream { void* private_data; }; +void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, struct ArrowIpcInputStream* dst); + /// \brief Create an input stream from an ArrowBuffer ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, struct ArrowBuffer* input); diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 330f86087..318a1413a 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -22,6 +22,12 @@ #include "nanoarrow.h" #include "nanoarrow_ipc.h" +void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, + struct ArrowIpcInputStream* dst) { + memcpy(dst, src, sizeof(struct ArrowIpcInputStream)); + src->release = NULL; +} + struct ArrowIpcInputStreamBufferPrivate { struct ArrowBuffer input; int64_t cursor_bytes; @@ -314,6 +320,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( ArrowBufferInit(&private_data->header); ArrowBufferInit(&private_data->body); private_data->out_schema.release = NULL; + ArrowIpcInputStreamMove(input_stream, &private_data->input); out->private_data = private_data; out->get_schema = &ArrowIpcArrayStreamReaderGetSchema; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 753ae63bb..13d5f5b7b 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -110,5 +110,19 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) { struct ArrowArrayStream stream; ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, {-1}), NANOARROW_OK); + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); + schema.release(&schema); + + struct ArrowArray array; + ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + array.release(&array); + + ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + EXPECT_EQ(array.release, nullptr); + + ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + EXPECT_EQ(array.release, nullptr); + stream.release(&stream); } From 5292b0997d248a574abb506916e5a6b0be840d09 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 16:31:43 -0300 Subject: [PATCH 11/23] whee! passing without checking much about decoded values --- .../src/nanoarrow/nanoarrow_ipc_decoder.c | 20 ++-- .../src/nanoarrow/nanoarrow_ipc_reader.c | 95 +++++++++---------- 2 files changed, 56 insertions(+), 59 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c index 91011b8e1..e9c673e71 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c @@ -801,21 +801,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder, } int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG; - *message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian); - if ((*message_size_bytes) < 0) { + int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian); + *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t)); + if (header_body_size_bytes < 0) { ArrowErrorSet( error, "Expected message body size > 0 but found message body size of %ld bytes", - (long)(*message_size_bytes)); + (long)header_body_size_bytes); return EINVAL; - } else if ((*message_size_bytes) > data_mut->size_bytes) { + } else if (header_body_size_bytes > data_mut->size_bytes) { ArrowErrorSet(error, "Expected 0 <= message body size <= %ld bytes but found message " "body size of %ld bytes", - (long)data_mut->size_bytes, (long)(*message_size_bytes)); + (long)data_mut->size_bytes, (long)header_body_size_bytes); return ESPIPE; } - if (*message_size_bytes == 0) { + if (header_body_size_bytes == 0) { ArrowErrorSet(error, "End of Arrow stream"); return ENODATA; } @@ -832,7 +833,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder, ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); - decoder->header_size_bytes += 2 * sizeof(int32_t); return NANOARROW_OK; } @@ -847,14 +847,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder, ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); // Run flatbuffers verification - if (ns(Message_verify_as_root(data.data.as_uint8, decoder->header_size_bytes)) != + if (ns(Message_verify_as_root(data.data.as_uint8, + decoder->header_size_bytes - (2 * sizeof(int32_t)))) != flatcc_verify_ok) { ArrowErrorSet(error, "Message flatbuffer verification failed"); return EINVAL; } // Read some basic information from the message - decoder->header_size_bytes += 2 * sizeof(int32_t); ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8)); decoder->metadata_version = ns(Message_version(message)); decoder->message_type = ns(Message_header_type(message)); @@ -873,7 +873,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); - decoder->header_size_bytes += 2 * sizeof(int32_t); ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8)); if (!message) { @@ -1046,6 +1045,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct ArrowIpcDecoder* decoder, case NANOARROW_IPC_ENDIANNESS_LITTLE: case NANOARROW_IPC_ENDIANNESS_BIG: private_data->endianness = endianness; + return NANOARROW_OK; default: return EINVAL; } diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 318a1413a..008783572 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -117,43 +117,51 @@ static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) { stream->release = NULL; } -#define NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE 65536 - -static int ArrowIpcArrayStreamReaderRead( - struct ArrowIpcArrayStreamReaderPrivate* private_data, struct ArrowBuffer* buffer, - int64_t* bytes_read) { - NANOARROW_RETURN_NOT_OK(ArrowBufferReserve( - &private_data->header, NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE)); - - NANOARROW_RETURN_NOT_OK(private_data->input.read( - &private_data->input, buffer->data + buffer->size_bytes, - NANOARROW_IPC_ARRAY_STREAM_READER_CHUNK_SIZE, bytes_read, &private_data->error)); - - buffer->size_bytes += *bytes_read; - return NANOARROW_OK; -} - static int ArrowIpcArrayStreamReaderNextHeader( struct ArrowIpcArrayStreamReaderPrivate* private_data) { private_data->header.size_bytes = 0; - struct ArrowBufferView input_view; - int64_t bytes_read = 0; - int result; - do { - NANOARROW_RETURN_NOT_OK( - ArrowIpcArrayStreamReaderRead(private_data, &private_data->header, &bytes_read)); - input_view.data.data = private_data->header.data; - input_view.size_bytes = private_data->header.size_bytes; - result = ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view, - &private_data->error); - } while (result == ESPIPE || bytes_read == 0); - - if (result != NANOARROW_OK && bytes_read == 0) { + + // Read 8 bytes (continuation + header size in bytes) + NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->header, 8)); + NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input, + private_data->header.data, 8, + &bytes_read, &private_data->error)); + private_data->header.size_bytes += bytes_read; + + if (bytes_read == 0) { return ENODATA; + } else if (bytes_read != 8) { + ArrowErrorSet(&private_data->error, + "Expected at least 8 bytes in remainder of stream"); + return EINVAL; } - NANOARROW_RETURN_NOT_OK(result); + struct ArrowBufferView input_view; + input_view.data.data = private_data->header.data; + input_view.size_bytes = private_data->header.size_bytes; + + // Use PeekHeader to fill in decoder.header_size_bytes + int result = + ArrowIpcDecoderPeekHeader(&private_data->decoder, input_view, &private_data->error); + if (result == ENODATA) { + return result; + } + + // Read the header bytes + int64_t expected_header_bytes = private_data->decoder.header_size_bytes - 8; + NANOARROW_RETURN_NOT_OK( + ArrowBufferReserve(&private_data->header, expected_header_bytes)); + NANOARROW_RETURN_NOT_OK( + private_data->input.read(&private_data->input, private_data->header.data + 8, + expected_header_bytes, &bytes_read, &private_data->error)); + private_data->header.size_bytes += bytes_read; + + // Verify + decode the header + input_view.data.data = private_data->header.data; + input_view.size_bytes = private_data->header.size_bytes; + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view, + &private_data->error)); NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, input_view, &private_data->error)); return NANOARROW_OK; @@ -164,25 +172,13 @@ static int ArrowIpcArrayStreamReaderNextBody( int64_t bytes_read; int64_t bytes_to_read = private_data->decoder.body_size_bytes; - // Reserve space in the body buffer + // Read the body bytes private_data->body.size_bytes = 0; NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(&private_data->body, bytes_to_read)); - - // Copy any body bytes from the header buffer - int64_t extra_bytes_in_header = - private_data->header.size_bytes - private_data->decoder.header_size_bytes; - memcpy( - private_data->body.data, - private_data->header.data + private_data->header.size_bytes - extra_bytes_in_header, - extra_bytes_in_header); - - // Read the rest of the body buffer - NANOARROW_RETURN_NOT_OK(private_data->input.read( - &private_data->input, private_data->body.data + extra_bytes_in_header, - bytes_to_read - extra_bytes_in_header, &bytes_read, &private_data->error)); - - // Set the size of the buffer - private_data->body.size_bytes = bytes_to_read; + NANOARROW_RETURN_NOT_OK(private_data->input.read(&private_data->input, + private_data->body.data, bytes_to_read, + &bytes_read, &private_data->error)); + private_data->body.size_bytes += bytes_read; return NANOARROW_OK; } @@ -284,8 +280,8 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextBody(private_data)); struct ArrowBufferView body_view; - body_view.data.data = private_data->body.data + private_data->body.size_bytes; - body_view.size_bytes = 0; + body_view.data.data = private_data->body.data; + body_view.size_bytes = private_data->body.size_bytes; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArray(&private_data->decoder, body_view, private_data->field_index, out, @@ -321,6 +317,7 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( ArrowBufferInit(&private_data->body); private_data->out_schema.release = NULL; ArrowIpcInputStreamMove(input_stream, &private_data->input); + private_data->field_index = options.field_index; out->private_data = private_data; out->get_schema = &ArrowIpcArrayStreamReaderGetSchema; From da73f3de7eb31e3cdc974ef551456300b25e34e8 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 16:32:52 -0300 Subject: [PATCH 12/23] test some output --- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 13d5f5b7b..db45b0642 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -112,10 +112,12 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) { struct ArrowSchema schema; ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); + EXPECT_STREQ(schema.format, "+s"); schema.release(&schema); struct ArrowArray array; ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + EXPECT_EQ(array.length, 3); array.release(&array); ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); From a0225c3ab749a62d8fe5acb19653a7b5b8f17d3d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 21 Mar 2023 16:33:58 -0300 Subject: [PATCH 13/23] maybe fix namespace --- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index a7949bc99..7cab8912f 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -41,6 +41,10 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness) #define ArrowIpcInputStreamInitBuffer \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer) +#define ArrowIpcInputStreamMove \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove) +#define ArrowIpcArrayStreamReaderInit \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit) #endif @@ -242,7 +246,8 @@ struct ArrowIpcInputStream { void* private_data; }; -void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, struct ArrowIpcInputStream* dst); +void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, + struct ArrowIpcInputStream* dst); /// \brief Create an input stream from an ArrowBuffer ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, @@ -252,9 +257,9 @@ struct ArrowIpcArrayStreamReaderOptions { int64_t field_index; }; -ArrowErrorCode ArrowIpcArrayStreamReaderInit(struct ArrowArrayStream* out, - struct ArrowIpcInputStream* input_stream, - struct ArrowIpcArrayStreamReaderOptions options); +ArrowErrorCode ArrowIpcArrayStreamReaderInit( + struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream, + struct ArrowIpcArrayStreamReaderOptions options); #ifdef __cplusplus } From 8fdc449a7fc325354fca3e69305ca12fc018d992 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 10:44:57 -0300 Subject: [PATCH 14/23] file stream implementation --- .../src/nanoarrow/nanoarrow_ipc.h | 4 + .../src/nanoarrow/nanoarrow_ipc_reader.c | 75 +++++++++++++++++++ .../nanoarrow/nanoarrow_ipc_reader_test.cc | 42 +++++++++++ 3 files changed, 121 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 7cab8912f..a3f6165bd 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -253,6 +253,10 @@ void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, struct ArrowBuffer* input); +/// \brief Create an input stream from a C FILE* pointer +ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream, + void* file_ptr, int close_on_release); + struct ArrowIpcArrayStreamReaderOptions { int64_t field_index; }; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 008783572..0446a9eea 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -86,6 +86,81 @@ ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, return NANOARROW_OK; } +struct ArrowIpcInputStreamFilePrivate { + FILE* file_ptr; + int stream_finished; + int close_on_release; +}; + +static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream) { + struct ArrowIpcInputStreamFilePrivate* private_data = + (struct ArrowIpcInputStreamFilePrivate*)stream->private_data; + + if (private_data->file_ptr != NULL && private_data->close_on_release) { + fclose(private_data->file_ptr); + } + + ArrowFree(private_data); + stream->release = NULL; +} + +static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* stream, void* buf, + int64_t buf_size_bytes, int64_t* size_read_out, + struct ArrowError* error) { + struct ArrowIpcInputStreamFilePrivate* private_data = + (struct ArrowIpcInputStreamFilePrivate*)stream->private_data; + + if (private_data->stream_finished) { + *size_read_out = 0; + return NANOARROW_OK; + } + + // Do the read + int64_t bytes_read = (int64_t)fread(buf, 1, buf_size_bytes, private_data->file_ptr); + *size_read_out = bytes_read; + + if (bytes_read != buf_size_bytes) { + private_data->stream_finished = 1; + + // Inspect error + int has_error = !feof(private_data->file_ptr) && ferror(private_data->file_ptr); + + // Try to close the file now + if (private_data->close_on_release) { + if (fclose(private_data->file_ptr) == 0) { + private_data->file_ptr = NULL; + } + } + + // Maybe return error + if (has_error) { + ArrowErrorSet(error, "ArrowIpcInputStreamFile IO error"); + return EIO; + } + } + + return NANOARROW_OK; +} + +ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream, + void* file_ptr, int close_on_release) { + struct ArrowIpcInputStreamFilePrivate* private_data = + (struct ArrowIpcInputStreamFilePrivate*)ArrowMalloc( + sizeof(struct ArrowIpcInputStreamFilePrivate)); + if (private_data == NULL) { + return ENOMEM; + } + + private_data->file_ptr = (FILE*)file_ptr; + private_data->close_on_release = close_on_release; + private_data->stream_finished = 0; + + stream->read = &ArrowIpcInputStreamFileRead; + stream->release = &ArrowIpcInputStreamFileRelease; + stream->private_data = private_data; + return NANOARROW_OK; +} + struct ArrowIpcArrayStreamReaderPrivate { struct ArrowIpcInputStream input; struct ArrowIpcDecoder decoder; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index db45b0642..4eebf58f1 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -17,6 +17,8 @@ #include +#include + #include "nanoarrow_ipc.h" static uint8_t kSimpleSchema[] = { @@ -95,6 +97,46 @@ TEST(NanoarrowIpcReader, InputStreamBuffer) { stream.release(&stream); } +TEST(NanoarrowIpcReader, InputStreamFile) { + uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; + FILE* file_ptr = tmpfile(); + ASSERT_NE(file_ptr, nullptr); + ASSERT_EQ(fwrite(input_data, 1, sizeof(input_data), file_ptr), sizeof(input_data)); + fseek(file_ptr, 0, SEEK_SET); + + struct ArrowIpcInputStream stream; + uint8_t output_data[] = {0xff, 0xff, 0xff, 0xff, 0xff}; + int64_t size_read_bytes; + + ASSERT_EQ(ArrowIpcInputStreamInitFile(&stream, file_ptr, 1), NANOARROW_OK); + + EXPECT_EQ(stream.read(&stream, output_data, 2, &size_read_bytes, nullptr), + NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 2); + uint8_t output_data1[] = {0x01, 0x02, 0xff, 0xff, 0xff}; + EXPECT_EQ(memcmp(output_data, output_data1, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, output_data + 2, 2, &size_read_bytes, nullptr), + NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 2); + uint8_t output_data2[] = {0x01, 0x02, 0x03, 0x04, 0xff}; + EXPECT_EQ(memcmp(output_data, output_data2, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, output_data + 4, 2, &size_read_bytes, nullptr), + NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 1); + uint8_t output_data3[] = {0x01, 0x02, 0x03, 0x04, 0x05}; + EXPECT_EQ(memcmp(output_data, output_data3, sizeof(output_data)), 0); + + EXPECT_EQ(stream.read(&stream, nullptr, 2, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 0); + + EXPECT_EQ(stream.read(&stream, nullptr, 0, &size_read_bytes, nullptr), NANOARROW_OK); + EXPECT_EQ(size_read_bytes, 0); + + stream.release(&stream); +} + TEST(NanoarrowIpcReader, StreamReaderBasic) { struct ArrowBuffer input_buffer; ArrowBufferInit(&input_buffer); From a86b9af8d30baa6a6e58708fb4349c09d5443a36 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 10:52:50 -0300 Subject: [PATCH 15/23] document --- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 16 +++++++++++++++- .../src/nanoarrow/nanoarrow_ipc_reader.c | 16 +++++++++++----- .../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 2 +- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index a3f6165bd..75b1f935f 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -246,6 +246,7 @@ struct ArrowIpcInputStream { void* private_data; }; +/// \brief Transfer ownership of an ArrowIpcInputStream void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, struct ArrowIpcInputStream* dst); @@ -254,16 +255,29 @@ ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream, struct ArrowBuffer* input); /// \brief Create an input stream from a C FILE* pointer +/// +/// Note that the ArrowIpcInputStream has no mechanism to communicate an error +/// if file_ptr fails to close. If this behaviour is needed, pass false to +/// close_on_release and handle closing the file independently from stream. ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream, void* file_ptr, int close_on_release); +/// \brief Options for ArrowIpcArrayStreamReaderInit() struct ArrowIpcArrayStreamReaderOptions { + /// \brief The field index to extract. Defaults to -1 (i.e., read all fields). int64_t field_index; }; +/// \brief Initialize an ArrowArrayStream from an input stream of bytes +/// +/// The stream of bytes must begin with a Schema message and be followed by +/// zero or more RecordBatch messages as described in the Arrow IPC stream +/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK +/// is returned, the ArrowArrayStream takes ownership of input_stream and +/// the caller is responsible for releasing out. ArrowErrorCode ArrowIpcArrayStreamReaderInit( struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream, - struct ArrowIpcArrayStreamReaderOptions options); + struct ArrowIpcArrayStreamReaderOptions* options); #ifdef __cplusplus } diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 0446a9eea..afaa20ba5 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -104,9 +104,10 @@ static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream) { stream->release = NULL; } -static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* stream, void* buf, - int64_t buf_size_bytes, int64_t* size_read_out, - struct ArrowError* error) { +static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* stream, + void* buf, int64_t buf_size_bytes, + int64_t* size_read_out, + struct ArrowError* error) { struct ArrowIpcInputStreamFilePrivate* private_data = (struct ArrowIpcInputStreamFilePrivate*)stream->private_data; @@ -374,7 +375,7 @@ static const char* ArrowIpcArrayStreamReaderGetLastError( ArrowErrorCode ArrowIpcArrayStreamReaderInit( struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream, - struct ArrowIpcArrayStreamReaderOptions options) { + struct ArrowIpcArrayStreamReaderOptions* options) { struct ArrowIpcArrayStreamReaderPrivate* private_data = (struct ArrowIpcArrayStreamReaderPrivate*)ArrowMalloc( sizeof(struct ArrowIpcArrayStreamReaderPrivate)); @@ -392,7 +393,12 @@ ArrowErrorCode ArrowIpcArrayStreamReaderInit( ArrowBufferInit(&private_data->body); private_data->out_schema.release = NULL; ArrowIpcInputStreamMove(input_stream, &private_data->input); - private_data->field_index = options.field_index; + + if (options != NULL) { + private_data->field_index = options->field_index; + } else { + private_data->field_index = -1; + } out->private_data = private_data; out->get_schema = &ArrowIpcArrayStreamReaderGetSchema; diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 4eebf58f1..1f88775f1 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -150,7 +150,7 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) { ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); struct ArrowArrayStream stream; - ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, {-1}), NANOARROW_OK); + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); struct ArrowSchema schema; ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); From 7942a33a24cea3fb1a766bb5e67a7fc3864b5b19 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 10:53:57 -0300 Subject: [PATCH 16/23] fix namespace --- extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 75b1f935f..2dc16e77d 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -41,6 +41,8 @@ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness) #define ArrowIpcInputStreamInitBuffer \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer) +#define ArrowIpcInputStreamInitFile \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile) #define ArrowIpcInputStreamMove \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove) #define ArrowIpcArrayStreamReaderInit \ From 756dcda8e982003192b70063c13e5af8d6e76e7f Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 13:01:59 -0300 Subject: [PATCH 17/23] utility app --- extensions/nanoarrow_ipc/CMakeLists.txt | 6 + .../nanoarrow_ipc/src/apps/dump_stream.c | 122 ++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 extensions/nanoarrow_ipc/src/apps/dump_stream.c diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt b/extensions/nanoarrow_ipc/CMakeLists.txt index 3f750f374..8830b356b 100644 --- a/extensions/nanoarrow_ipc/CMakeLists.txt +++ b/extensions/nanoarrow_ipc/CMakeLists.txt @@ -22,6 +22,7 @@ include(FetchContent) project(nanoarrow_ipc) option(NANOARROW_IPC_BUILD_TESTS "Build tests" OFF) +option(NANOARROW_IPC_BUILD_APPS "Build utility applications" OFF) option(NANOARROW_IPC_BUNDLE "Create bundled nanoarrow_ipc.h and nanoarrow_ipc.c" OFF) option(NANOARROW_IPC_FLATCC_ROOT_DIR "Root directory for flatcc include and lib directories" OFF) option(NANOARROW_IPC_FLATCC_INCLUDE_DIR "Include directory for flatcc includes" OFF) @@ -204,3 +205,8 @@ if (NANOARROW_IPC_BUILD_TESTS) include(GoogleTest) gtest_discover_tests(nanoarrow_ipc_decoder_test) endif() + +if (NANOARROW_IPC_BUILD_APPS) + add_executable(dump_stream src/apps/dump_stream.c) + target_link_libraries(dump_stream nanoarrow_ipc nanoarrow) +endif() diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c b/extensions/nanoarrow_ipc/src/apps/dump_stream.c new file mode 100644 index 000000000..ad9b31a8f --- /dev/null +++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "nanoarrow/nanoarrow_ipc.h" + +#include +#include + +void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf, + int buf_size) { + int n_chars = ArrowSchemaToString(schema, buf, buf_size, 0); + + const char* name; + if (schema->name == NULL) { + name = ""; + } else { + name = schema->name; + } + + for (int i = 0; i < level; i++) { + fprintf(stdout, " "); + } + fprintf(stdout, "%s: %s\n", name, buf); + + for (int64_t i = 0; i < schema->n_children; i++) { + dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size); + } +} + +int main(int argc, char* argv[]) { + // Parse arguments + if (argc != 2) { + fprintf(stderr, "Usage: dump_stream FILENAME (or - for stdin)\n"); + return 1; + } + + // Sort the input stream + FILE* file_ptr; + if (strcmp(argv[1], "-") == 0) { + file_ptr = freopen(NULL, "rb", stdin); + } else { + file_ptr = fopen(argv[1], "rb"); + } + + if (file_ptr == NULL) { + fprintf(stderr, "Failed to open input '%s'\n", argv[1]); + return 1; + } + + struct ArrowIpcInputStream input; + int result = ArrowIpcInputStreamInitFile(&input, file_ptr, 0); + if (result != NANOARROW_OK) { + fprintf(stderr, "ArrowIpcInputStreamInitFile() failed\n"); + return 1; + } + + struct ArrowArrayStream stream; + result = ArrowIpcArrayStreamReaderInit(&stream, &input, NULL); + if (result != NANOARROW_OK) { + fprintf(stderr, "ArrowIpcArrayStreamReaderInit() failed\n"); + return 1; + } + + struct ArrowSchema schema; + result = stream.get_schema(&stream, &schema); + if (result != NANOARROW_OK) { + const char* message = stream.get_last_error(&stream); + if (message == NULL) { + message = ""; + } + + fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n", result, message); + stream.release(&stream); + return 1; + } + + char schema_tmp[8096]; + memset(schema_tmp, 0, sizeof(schema_tmp)); + dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp)); + schema.release(&schema); + + struct ArrowArray array; + array.release = NULL; + int64_t batch_count = 0; + while (1) { + result = stream.get_next(&stream, &array); + if (result != NANOARROW_OK) { + const char* message = stream.get_last_error(&stream); + if (message == NULL) { + message = ""; + } + + fprintf(stderr, "stream.get_next() returned %d with error '%s'\n", result, message); + stream.release(&stream); + return 1; + } + + if (array.release != NULL) { + array.release(&array); + } else { + break; + } + } + + stream.release(&stream); + fclose(file_ptr); + return 0; +} From 362f524bda81f7f93192690821191f730dff808d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 13:26:13 -0300 Subject: [PATCH 18/23] less jarring output --- extensions/nanoarrow_ipc/src/apps/dump_stream.c | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c b/extensions/nanoarrow_ipc/src/apps/dump_stream.c index ad9b31a8f..dc9bc86e3 100644 --- a/extensions/nanoarrow_ipc/src/apps/dump_stream.c +++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c @@ -24,17 +24,16 @@ void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf, int buf_size) { int n_chars = ArrowSchemaToString(schema, buf, buf_size, 0); - const char* name; + for (int i = 0; i < level; i++) { + fprintf(stdout, " "); + } + if (schema->name == NULL) { - name = ""; + fprintf(stdout, "%s\n", buf); } else { - name = schema->name; + fprintf(stdout, "%s: %s\n", schema->name, buf); } - for (int i = 0; i < level; i++) { - fprintf(stdout, " "); - } - fprintf(stdout, "%s: %s\n", name, buf); for (int64_t i = 0; i < schema->n_children; i++) { dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size); From 510c2638cff84ab533b193489a49f03c63c3a4de Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 13:32:01 -0300 Subject: [PATCH 19/23] minimal dump_stream test --- .github/workflows/build-and-test-ipc.yaml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-and-test-ipc.yaml b/.github/workflows/build-and-test-ipc.yaml index 5c5b7801c..af2d5438f 100644 --- a/.github/workflows/build-and-test-ipc.yaml +++ b/.github/workflows/build-and-test-ipc.yaml @@ -41,7 +41,7 @@ jobs: fail-fast: false matrix: config: - - {label: default-build, cmake_args: ""} + - {label: default-build, cmake_args: "-DNANOARROW_IPC_BUILD_APPS=ON"} - {label: namespaced-build, cmake_args: "-DNANOARROW_NAMESPACE=SomeUserNamespace"} - {label: bundled-build, cmake_args: "-DNANOARROW_IPC_BUNDLE=ON"} @@ -101,6 +101,15 @@ jobs: cd build ctest -T test --output-on-failure . + - name: Test dump_stream + if: matrix.config.label == 'default-build' + run: | + $SUBDIR/build/dump_stream || true + $SUBDIR/build/dump_stream this_is_not_a_file || true + $SUBDIR/build/dump_stream examples/cmake-ipc/invalid.arrows || true + $SUBDIR/build/dump_stream examples/cmake-ipc/schema-valid.arrows + cat examples/cmake-ipc/schema-valid.arrows | $SUBDIR/build/dump_stream - + - name: Run tests with valgrind if: matrix.config.label == 'default-build' run: | From 0e835877bc235e92b0e09896127b9bf652a10be9 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 15:05:40 -0300 Subject: [PATCH 20/23] add some other output --- .../nanoarrow_ipc/src/apps/dump_stream.c | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c b/extensions/nanoarrow_ipc/src/apps/dump_stream.c index dc9bc86e3..1f1aa7ff6 100644 --- a/extensions/nanoarrow_ipc/src/apps/dump_stream.c +++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c @@ -19,6 +19,7 @@ #include #include +#include void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf, int buf_size) { @@ -34,7 +35,6 @@ void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf, fprintf(stdout, "%s: %s\n", schema->name, buf); } - for (int64_t i = 0; i < schema->n_children; i++) { dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size); } @@ -47,6 +47,10 @@ int main(int argc, char* argv[]) { return 1; } + // Allocate a buffer for file IO. The default size (4096 bytes) results in + // very slow IO operations. + char io_buffer[1048576]; + // Sort the input stream FILE* file_ptr; if (strcmp(argv[1], "-") == 0) { @@ -74,6 +78,8 @@ int main(int argc, char* argv[]) { return 1; } + clock_t begin = clock(); + struct ArrowSchema schema; result = stream.get_schema(&stream, &schema); if (result != NANOARROW_OK) { @@ -87,6 +93,10 @@ int main(int argc, char* argv[]) { return 1; } + clock_t end = clock(); + double elapsed = (end - begin) / ((double)CLOCKS_PER_SEC); + fprintf(stdout, "Read Schema <%.06f seconds>\n", elapsed); + char schema_tmp[8096]; memset(schema_tmp, 0, sizeof(schema_tmp)); dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp)); @@ -94,7 +104,11 @@ int main(int argc, char* argv[]) { struct ArrowArray array; array.release = NULL; + int64_t batch_count = 0; + int64_t row_count = 0; + begin = clock(); + while (1) { result = stream.get_next(&stream, &array); if (result != NANOARROW_OK) { @@ -109,12 +123,19 @@ int main(int argc, char* argv[]) { } if (array.release != NULL) { + row_count += array.length; + batch_count++; array.release(&array); } else { break; } } + end = clock(); + elapsed = (end - begin) / ((double)CLOCKS_PER_SEC); + fprintf(stdout, "Read %ld rows in %ld batch(es) <%.06f seconds>\n", (long)row_count, + (long)batch_count, elapsed); + stream.release(&stream); fclose(file_ptr); return 0; From 8d9a655d4eabaf800297bd2c18a798bf132fd138 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 15:47:46 -0300 Subject: [PATCH 21/23] test errors --- .../src/nanoarrow/nanoarrow_ipc_reader.c | 20 ++- .../nanoarrow/nanoarrow_ipc_reader_test.cc | 142 ++++++++++++++++++ 2 files changed, 159 insertions(+), 3 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index afaa20ba5..8288eceea 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -194,7 +194,8 @@ static void ArrowIpcArrayStreamReaderRelease(struct ArrowArrayStream* stream) { } static int ArrowIpcArrayStreamReaderNextHeader( - struct ArrowIpcArrayStreamReaderPrivate* private_data) { + struct ArrowIpcArrayStreamReaderPrivate* private_data, + enum ArrowIpcMessageType message_type) { private_data->header.size_bytes = 0; int64_t bytes_read = 0; @@ -206,6 +207,10 @@ static int ArrowIpcArrayStreamReaderNextHeader( private_data->header.size_bytes += bytes_read; if (bytes_read == 0) { + // The caller might not use this error message (e.g., if the end of the stream + // is one of the valid outcomes) but we set the error anyway in case it gets + // propagated higher (e.g., if the stream is emtpy and there's no schema message) + ArrowErrorSet(&private_data->error, "No data available on stream"); return ENODATA; } else if (bytes_read != 8) { ArrowErrorSet(&private_data->error, @@ -238,6 +243,13 @@ static int ArrowIpcArrayStreamReaderNextHeader( input_view.size_bytes = private_data->header.size_bytes; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderVerifyHeader(&private_data->decoder, input_view, &private_data->error)); + + // Don't decode the message if it's of the wrong type (because the error message + // is better communicated by the caller) + if (private_data->decoder.message_type != message_type) { + return NANOARROW_OK; + } + NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeHeader(&private_data->decoder, input_view, &private_data->error)); return NANOARROW_OK; @@ -265,7 +277,8 @@ static int ArrowIpcArrayStreamReaderReadSchemaIfNeeded( return NANOARROW_OK; } - NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader(private_data)); + NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderNextHeader( + private_data, NANOARROW_IPC_MESSAGE_TYPE_SCHEMA)); // Error if this isn't a schema message if (private_data->decoder.message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) { @@ -338,7 +351,8 @@ static int ArrowIpcArrayStreamReaderGetNext(struct ArrowArrayStream* stream, NANOARROW_RETURN_NOT_OK(ArrowIpcArrayStreamReaderReadSchemaIfNeeded(private_data)); // Read + decode the next header - int result = ArrowIpcArrayStreamReaderNextHeader(private_data); + int result = ArrowIpcArrayStreamReaderNextHeader( + private_data, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH); if (result == ENODATA) { // If the stream is finished, release the input private_data->input.release(&private_data->input); diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc index 1f88775f1..126aaf32f 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc @@ -57,6 +57,8 @@ static uint8_t kSimpleRecordBatch[] = { 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; +static uint8_t kEndOfStream[] = {0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00}; + TEST(NanoarrowIpcReader, InputStreamBuffer) { uint8_t input_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; struct ArrowBuffer input; @@ -170,3 +172,143 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) { stream.release(&stream); } + +TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + ASSERT_EQ( + ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, sizeof(kSimpleRecordBatch)), + NANOARROW_OK); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kEndOfStream, sizeof(kEndOfStream)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); + EXPECT_STREQ(schema.format, "+s"); + schema.release(&schema); + + struct ArrowArray array; + ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + EXPECT_EQ(array.length, 3); + array.release(&array); + + ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK); + EXPECT_EQ(array.release, nullptr); + + stream.release(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderExpectedRecordBatch) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), NANOARROW_OK); + EXPECT_STREQ(schema.format, "+s"); + schema.release(&schema); + + struct ArrowArray array; + ASSERT_EQ(stream.get_next(&stream, &array), EINVAL); + EXPECT_STREQ(stream.get_last_error(&stream), + "Unexpected message type (expected RecordBatch)"); + + stream.release(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderExpectedSchema) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, sizeof(kSimpleSchema)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL); + EXPECT_STREQ(stream.get_last_error(&stream), + "Unexpected message type at start of input (expected Schema)"); + + stream.release(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderUnsupportedFieldIndex) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppend(&input_buffer, kSimpleSchema, sizeof(kSimpleSchema)), + NANOARROW_OK); + ASSERT_EQ( + ArrowBufferAppend(&input_buffer, kSimpleRecordBatch, sizeof(kSimpleRecordBatch)), + NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + struct ArrowIpcArrayStreamReaderOptions options; + options.field_index = 0; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, &options), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), ENOTSUP); + EXPECT_STREQ(stream.get_last_error(&stream), "Field index != -1 is not yet supported"); + + stream.release(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderEmptyInput) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), ENODATA); + EXPECT_STREQ(stream.get_last_error(&stream), "No data available on stream"); + + stream.release(&stream); +} + +TEST(NanoarrowIpcReader, StreamReaderIncompletePrefix) { + struct ArrowBuffer input_buffer; + ArrowBufferInit(&input_buffer); + ASSERT_EQ(ArrowBufferAppendUInt8(&input_buffer, 0x00), NANOARROW_OK); + + struct ArrowIpcInputStream input; + ASSERT_EQ(ArrowIpcInputStreamInitBuffer(&input, &input_buffer), NANOARROW_OK); + + struct ArrowArrayStream stream; + ASSERT_EQ(ArrowIpcArrayStreamReaderInit(&stream, &input, nullptr), NANOARROW_OK); + + struct ArrowSchema schema; + ASSERT_EQ(stream.get_schema(&stream, &schema), EINVAL); + EXPECT_STREQ(stream.get_last_error(&stream), + "Expected at least 8 bytes in remainder of stream"); + + stream.release(&stream); +} From 7045cb0da4947d5c5ac2adb0ed42c3aed2ea7e1b Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 15:59:18 -0300 Subject: [PATCH 22/23] remove unused declaration --- extensions/nanoarrow_ipc/src/apps/dump_stream.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/apps/dump_stream.c b/extensions/nanoarrow_ipc/src/apps/dump_stream.c index 1f1aa7ff6..1a6b323d9 100644 --- a/extensions/nanoarrow_ipc/src/apps/dump_stream.c +++ b/extensions/nanoarrow_ipc/src/apps/dump_stream.c @@ -47,10 +47,6 @@ int main(int argc, char* argv[]) { return 1; } - // Allocate a buffer for file IO. The default size (4096 bytes) results in - // very slow IO operations. - char io_buffer[1048576]; - // Sort the input stream FILE* file_ptr; if (strcmp(argv[1], "-") == 0) { From d21096d9e02f2d620e7d00ef371bfe7bbd80f27e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 22 Mar 2023 16:47:14 -0300 Subject: [PATCH 23/23] review suggestions --- extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h | 2 +- .../nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h index 2dc16e77d..071b2991d 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h @@ -233,7 +233,7 @@ struct ArrowIpcInputStream { /// /// The actual number of bytes read is placed in the value pointed to by /// size_read_out. Returns NANOARROW_OK on success. - ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, void* buf, + ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, struct ArrowError* error); diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 8288eceea..f813cab46 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -34,10 +34,11 @@ struct ArrowIpcInputStreamBufferPrivate { }; static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct ArrowIpcInputStream* stream, - void* buf, int64_t buf_size_bytes, + uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, struct ArrowError* error) { if (buf_size_bytes == 0) { + *size_read_out = 0; return NANOARROW_OK; } @@ -105,7 +106,7 @@ static void ArrowIpcInputStreamFileRelease(struct ArrowIpcInputStream* stream) { } static ArrowErrorCode ArrowIpcInputStreamFileRead(struct ArrowIpcInputStream* stream, - void* buf, int64_t buf_size_bytes, + uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, struct ArrowError* error) { struct ArrowIpcInputStreamFilePrivate* private_data =