Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(extensions/nanoarrow_ipc): Add single-threaded stream reader #164

Merged
merged 23 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/build-and-test-ipc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
fail-fast: false
matrix:
config:
- {label: default-build, cmake_args: ""}
- {label: default-build, cmake_args: "-DNANOARROW_IPC_BUILD_APPS=ON"}
- {label: namespaced-build, cmake_args: "-DNANOARROW_NAMESPACE=SomeUserNamespace"}
- {label: bundled-build, cmake_args: "-DNANOARROW_IPC_BUNDLE=ON"}

Expand Down Expand Up @@ -101,6 +101,15 @@ jobs:
cd build
ctest -T test --output-on-failure .

- name: Test dump_stream
if: matrix.config.label == 'default-build'
run: |
$SUBDIR/build/dump_stream || true
$SUBDIR/build/dump_stream this_is_not_a_file || true
$SUBDIR/build/dump_stream examples/cmake-ipc/invalid.arrows || true
$SUBDIR/build/dump_stream examples/cmake-ipc/schema-valid.arrows
cat examples/cmake-ipc/schema-valid.arrows | $SUBDIR/build/dump_stream -

- name: Run tests with valgrind
if: matrix.config.label == 'default-build'
run: |
Expand Down
24 changes: 18 additions & 6 deletions extensions/nanoarrow_ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include(FetchContent)
project(nanoarrow_ipc)

option(NANOARROW_IPC_BUILD_TESTS "Build tests" OFF)
option(NANOARROW_IPC_BUILD_APPS "Build utility applications" OFF)
option(NANOARROW_IPC_BUNDLE "Create bundled nanoarrow_ipc.h and nanoarrow_ipc.c" OFF)
option(NANOARROW_IPC_FLATCC_ROOT_DIR "Root directory for flatcc include and lib directories" OFF)
option(NANOARROW_IPC_FLATCC_INCLUDE_DIR "Include directory for flatcc includes" OFF)
Expand Down Expand Up @@ -91,11 +92,13 @@ if (NANOARROW_IPC_BUNDLE)
file(READ src/nanoarrow/nanoarrow_ipc.h SRC_FILE_CONTENTS)
file(WRITE ${NANOARROW_IPC_H_TEMP} "${SRC_FILE_CONTENTS}")

# combine flatcc-generated headers and nanoarrow_ipc.c
# combine flatcc-generated headers and nanoarrow_ipc sources
set(NANOARROW_IPC_C_TEMP ${CMAKE_BINARY_DIR}/amalgamation/nanoarrow/nanoarrow_ipc.c)
file(READ src/nanoarrow/nanoarrow_ipc_flatcc_generated.h SRC_FILE_CONTENTS)
file(WRITE ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
file(READ src/nanoarrow/nanoarrow_ipc.c SRC_FILE_CONTENTS)
file(READ src/nanoarrow/nanoarrow_ipc_decoder.c SRC_FILE_CONTENTS)
file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")
file(READ src/nanoarrow/nanoarrow_ipc_reader.c SRC_FILE_CONTENTS)
file(APPEND ${NANOARROW_IPC_C_TEMP} "${SRC_FILE_CONTENTS}")

# remove the include for the generated files in the bundled version
Expand Down Expand Up @@ -143,7 +146,9 @@ if (NANOARROW_IPC_BUNDLE)
install(DIRECTORY thirdparty/flatcc/include/flatcc DESTINATION ".")
else()
# This is a normal CMake build that builds + installs some includes and a static lib
add_library(nanoarrow_ipc src/nanoarrow/nanoarrow_ipc.c)
add_library(nanoarrow_ipc
src/nanoarrow/nanoarrow_ipc_decoder.c
src/nanoarrow/nanoarrow_ipc_reader.c)
target_link_libraries(nanoarrow_ipc PRIVATE flatccrt)

target_include_directories(nanoarrow_ipc PUBLIC
Expand Down Expand Up @@ -185,16 +190,23 @@ if (NANOARROW_IPC_BUILD_TESTS)

enable_testing()

add_executable(nanoarrow_ipc_test src/nanoarrow/nanoarrow_ipc_test.cc)
add_executable(nanoarrow_ipc_decoder_test src/nanoarrow/nanoarrow_ipc_decoder_test.cc)
add_executable(nanoarrow_ipc_reader_test src/nanoarrow/nanoarrow_ipc_reader_test.cc)

if(NANOARROW_IPC_CODE_COVERAGE)
target_compile_options(ipc_coverage_config INTERFACE -O0 -g --coverage)
target_link_options(ipc_coverage_config INTERFACE --coverage)
target_link_libraries(nanoarrow_ipc PRIVATE ipc_coverage_config)
endif()

target_link_libraries(nanoarrow_ipc_test nanoarrow_ipc nanoarrow arrow_shared gtest_main)
target_link_libraries(nanoarrow_ipc_decoder_test nanoarrow_ipc nanoarrow arrow_shared gtest_main)
target_link_libraries(nanoarrow_ipc_reader_test nanoarrow_ipc nanoarrow gtest_main)

include(GoogleTest)
gtest_discover_tests(nanoarrow_ipc_test)
gtest_discover_tests(nanoarrow_ipc_decoder_test)
endif()

if (NANOARROW_IPC_BUILD_APPS)
add_executable(dump_stream src/apps/dump_stream.c)
target_link_libraries(dump_stream nanoarrow_ipc nanoarrow)
endif()
138 changes: 138 additions & 0 deletions extensions/nanoarrow_ipc/src/apps/dump_stream.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "nanoarrow/nanoarrow_ipc.h"

#include <stdio.h>
#include <string.h>
#include <time.h>

void dump_schema_to_stdout(struct ArrowSchema* schema, int level, char* buf,
int buf_size) {
int n_chars = ArrowSchemaToString(schema, buf, buf_size, 0);

for (int i = 0; i < level; i++) {
fprintf(stdout, " ");
}

if (schema->name == NULL) {
fprintf(stdout, "%s\n", buf);
} else {
fprintf(stdout, "%s: %s\n", schema->name, buf);
}

for (int64_t i = 0; i < schema->n_children; i++) {
dump_schema_to_stdout(schema->children[i], level + 1, buf, buf_size);
}
}

int main(int argc, char* argv[]) {
// Parse arguments
if (argc != 2) {
fprintf(stderr, "Usage: dump_stream FILENAME (or - for stdin)\n");
return 1;
}

// Sort the input stream
FILE* file_ptr;
if (strcmp(argv[1], "-") == 0) {
file_ptr = freopen(NULL, "rb", stdin);
} else {
file_ptr = fopen(argv[1], "rb");
}

if (file_ptr == NULL) {
fprintf(stderr, "Failed to open input '%s'\n", argv[1]);
return 1;
}

struct ArrowIpcInputStream input;
int result = ArrowIpcInputStreamInitFile(&input, file_ptr, 0);
if (result != NANOARROW_OK) {
fprintf(stderr, "ArrowIpcInputStreamInitFile() failed\n");
return 1;
}

struct ArrowArrayStream stream;
result = ArrowIpcArrayStreamReaderInit(&stream, &input, NULL);
if (result != NANOARROW_OK) {
fprintf(stderr, "ArrowIpcArrayStreamReaderInit() failed\n");
return 1;
}

clock_t begin = clock();

struct ArrowSchema schema;
result = stream.get_schema(&stream, &schema);
if (result != NANOARROW_OK) {
const char* message = stream.get_last_error(&stream);
if (message == NULL) {
message = "";
}

fprintf(stderr, "stream.get_schema() returned %d with error '%s'\n", result, message);
stream.release(&stream);
return 1;
}

clock_t end = clock();
double elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
fprintf(stdout, "Read Schema <%.06f seconds>\n", elapsed);

char schema_tmp[8096];
memset(schema_tmp, 0, sizeof(schema_tmp));
dump_schema_to_stdout(&schema, 0, schema_tmp, sizeof(schema_tmp));
schema.release(&schema);

struct ArrowArray array;
array.release = NULL;

int64_t batch_count = 0;
int64_t row_count = 0;
begin = clock();

while (1) {
result = stream.get_next(&stream, &array);
if (result != NANOARROW_OK) {
const char* message = stream.get_last_error(&stream);
if (message == NULL) {
message = "";
}

fprintf(stderr, "stream.get_next() returned %d with error '%s'\n", result, message);
stream.release(&stream);
return 1;
}

if (array.release != NULL) {
row_count += array.length;
batch_count++;
array.release(&array);
} else {
break;
}
}

end = clock();
elapsed = (end - begin) / ((double)CLOCKS_PER_SEC);
fprintf(stdout, "Read %ld rows in %ld batch(es) <%.06f seconds>\n", (long)row_count,
(long)batch_count, elapsed);

stream.release(&stream);
fclose(file_ptr);
return 0;
}
62 changes: 62 additions & 0 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetSchema)
#define ArrowIpcDecoderSetEndianness \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcDecoderSetEndianness)
#define ArrowIpcInputStreamInitBuffer \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitBuffer)
#define ArrowIpcInputStreamInitFile \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamInitFile)
#define ArrowIpcInputStreamMove \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcInputStreamMove)
#define ArrowIpcArrayStreamReaderInit \
NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowIpcArrayStreamReaderInit)

#endif

Expand Down Expand Up @@ -219,6 +227,60 @@ ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder,
struct ArrowArray* out,
struct ArrowError* error);

/// \brief An user-extensible input data source
struct ArrowIpcInputStream {
/// \brief Read up to buf_size_bytes from stream into buf
///
/// The actual number of bytes read is placed in the value pointed to by
/// size_read_out. Returns NANOARROW_OK on success.
ArrowErrorCode (*read)(struct ArrowIpcInputStream* stream, uint8_t* buf,
int64_t buf_size_bytes, int64_t* size_read_out,
struct ArrowError* error);

/// \brief Release the stream and any resources it may be holding
///
/// Release callback implementations must set the release member to NULL.
/// Callers must check that the release callback is not NULL before calling
/// read() or release().
void (*release)(struct ArrowIpcInputStream* stream);

/// \brief Private implementation-defined data
void* private_data;
};

/// \brief Transfer ownership of an ArrowIpcInputStream
void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
struct ArrowIpcInputStream* dst);

/// \brief Create an input stream from an ArrowBuffer
ArrowErrorCode ArrowIpcInputStreamInitBuffer(struct ArrowIpcInputStream* stream,
struct ArrowBuffer* input);

/// \brief Create an input stream from a C FILE* pointer
///
/// Note that the ArrowIpcInputStream has no mechanism to communicate an error
/// if file_ptr fails to close. If this behaviour is needed, pass false to
/// close_on_release and handle closing the file independently from stream.
ArrowErrorCode ArrowIpcInputStreamInitFile(struct ArrowIpcInputStream* stream,
void* file_ptr, int close_on_release);

/// \brief Options for ArrowIpcArrayStreamReaderInit()
struct ArrowIpcArrayStreamReaderOptions {
/// \brief The field index to extract. Defaults to -1 (i.e., read all fields).
int64_t field_index;
};

/// \brief Initialize an ArrowArrayStream from an input stream of bytes
///
/// The stream of bytes must begin with a Schema message and be followed by
/// zero or more RecordBatch messages as described in the Arrow IPC stream
/// format specification. Returns NANOARROW_OK on success. If NANOARROW_OK
/// is returned, the ArrowArrayStream takes ownership of input_stream and
/// the caller is responsible for releasing out.
ArrowErrorCode ArrowIpcArrayStreamReaderInit(
struct ArrowArrayStream* out, struct ArrowIpcInputStream* input_stream,
struct ArrowIpcArrayStreamReaderOptions* options);

#ifdef __cplusplus
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,21 +801,22 @@ static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder,
}

int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG;
*message_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
if ((*message_size_bytes) < 0) {
int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian);
*message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t));
if (header_body_size_bytes < 0) {
ArrowErrorSet(
error, "Expected message body size > 0 but found message body size of %ld bytes",
(long)(*message_size_bytes));
(long)header_body_size_bytes);
return EINVAL;
} else if ((*message_size_bytes) > data_mut->size_bytes) {
} else if (header_body_size_bytes > data_mut->size_bytes) {
ArrowErrorSet(error,
"Expected 0 <= message body size <= %ld bytes but found message "
"body size of %ld bytes",
(long)data_mut->size_bytes, (long)(*message_size_bytes));
(long)data_mut->size_bytes, (long)header_body_size_bytes);
return ESPIPE;
}

if (*message_size_bytes == 0) {
if (header_body_size_bytes == 0) {
ArrowErrorSet(error, "End of Arrow stream");
return ENODATA;
}
Expand All @@ -832,7 +833,6 @@ ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));
decoder->header_size_bytes += 2 * sizeof(int32_t);
return NANOARROW_OK;
}

Expand All @@ -847,14 +847,14 @@ ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder,
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));

// Run flatbuffers verification
if (ns(Message_verify_as_root(data.data.as_uint8, decoder->header_size_bytes)) !=
if (ns(Message_verify_as_root(data.data.as_uint8,
decoder->header_size_bytes - (2 * sizeof(int32_t)))) !=
flatcc_verify_ok) {
ArrowErrorSet(error, "Message flatbuffer verification failed");
return EINVAL;
}

// Read some basic information from the message
decoder->header_size_bytes += 2 * sizeof(int32_t);
ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
decoder->metadata_version = ns(Message_version(message));
decoder->message_type = ns(Message_header_type(message));
Expand All @@ -873,7 +873,6 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error));
decoder->header_size_bytes += 2 * sizeof(int32_t);

ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8));
if (!message) {
Expand Down Expand Up @@ -1046,6 +1045,7 @@ ArrowErrorCode ArrowIpcDecoderSetEndianness(struct ArrowIpcDecoder* decoder,
case NANOARROW_IPC_ENDIANNESS_LITTLE:
case NANOARROW_IPC_ENDIANNESS_BIG:
private_data->endianness = endianness;
return NANOARROW_OK;
default:
return EINVAL;
}
Expand Down
Loading