From 51b596447de80624c41b8d4eca52ce521a7734bd Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 30 May 2024 21:54:11 +0000 Subject: [PATCH 01/23] First pass at adding an implementation for arrow streams --- cpp/CMakeLists.txt | 1 + cpp/include/cudf/interop.hpp | 20 +++++++ cpp/src/interop/from_arrow_stream.cu | 88 ++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 cpp/src/interop/from_arrow_stream.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f637db66c2c..26593090bc1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -365,6 +365,7 @@ add_library( src/interop/to_arrow_device.cu src/interop/from_arrow_device.cu src/interop/from_arrow_host.cu + src/interop/from_arrow_stream.cu src/interop/to_arrow_schema.cpp src/interop/detail/arrow_allocator.cpp src/io/avro/avro.cpp diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index f3ff0009d5c..efde1cb151c 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -48,6 +48,8 @@ struct ArrowSchema; struct ArrowArray; +struct ArrowArrayStream; + namespace cudf { /** * @addtogroup interop_dlpack @@ -412,6 +414,24 @@ std::unique_ptr from_arrow_host( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief Create `cudf::table` from given ArrowArrayStream input + * + * @throws std::invalid_argument if input is NULL + * + * The conversion will not call release on the input ArrayArrayStream or its + * constituent arrays or schema. + * + * @param input `ArrowArrayStream` pointer to object that will produce ArrowArray data + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to perform cuda allocation + * @return cudf table generated from the given Arrow data + */ +std::unique_ptr
from_arrow_stream( + ArrowArrayStream* input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Create `cudf::column` from given ArrowDeviceArray input * diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu new file mode 100644 index 00000000000..ae34433bbec --- /dev/null +++ b/cpp/src/interop/from_arrow_stream.cu @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace cudf { +namespace detail { + + +std::unique_ptr
from_arrow_stream( + ArrowArrayStream* input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(input != nullptr, + "input ArrowArrayStream must not be NULL", + std::invalid_argument); + + // Potential future optimization: Since the from_arrow API accepts an + // ArrowSchema we're allocating one here instead of using a view, which we + // could avoid with a different underlying implementation. + ArrowSchema schema; + NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetSchema(input, &schema, nullptr)); + + // Assume that each chunk is a column. + // TODO: Is that restrictive? If we need to support actual chunking of a + // single array, how do we differentiate? Do we need a parameter in the API, + // or a different overload? + ArrowArray chunk; + + std::vector> chunks; + while (true) { + NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr)); + if (chunk.release == nullptr) { + break; + } + chunks.push_back(from_arrow(&schema, &chunk, stream, mr)); + } + + input->release(input); + auto chunk_views = std::vector{}; + chunk_views.reserve(chunks.size()); + std::transform(chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) { return chunk->view(); }); + return cudf::concatenate(chunk_views, stream, mr); +} + +} // namespace detail + + +std::unique_ptr
from_arrow_stream( + ArrowArrayStream* input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + + return detail::from_arrow_stream(input, stream, mr); +} +} // namespace cudf From 2378bb76d9b5241ec9cb57f6485021606e73ae4e Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 30 May 2024 22:25:41 +0000 Subject: [PATCH 02/23] Expose the new function to Python --- python/cudf/cudf/_lib/pylibcudf/interop.pyx | 10 ++++++++-- python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd | 9 +++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/interop.pyx b/python/cudf/cudf/_lib/pylibcudf/interop.pyx index 07e9d1ead11..6ec77b5f227 100644 --- a/python/cudf/cudf/_lib/pylibcudf/interop.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/interop.pyx @@ -1,5 +1,6 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. +from cpython cimport pycapsule from cython.operator cimport dereference from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.utility cimport move @@ -12,8 +13,10 @@ from functools import singledispatch from pyarrow import lib as pa from cudf._lib.pylibcudf.libcudf.interop cimport ( + ArrowArrayStream, column_metadata, from_arrow as cpp_from_arrow, + from_arrow_stream as cpp_from_arrow_stream, to_arrow as cpp_to_arrow, ) from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport ( @@ -124,11 +127,14 @@ def _from_arrow_datatype(pyarrow_object): def _from_arrow_table(pyarrow_object, *, DataType data_type=None): if data_type is not None: raise ValueError("data_type may not be passed for tables") - cdef shared_ptr[pa.CTable] arrow_table = pa.pyarrow_unwrap_table(pyarrow_object) + capsule = pyarrow_object.__arrow_c_stream__() + cdef ArrowArrayStream* c_stream = ( + pycapsule.PyCapsule_GetPointer(capsule, "arrow_array_stream") + ) cdef unique_ptr[table] c_result with nogil: - c_result = move(cpp_from_arrow(dereference(arrow_table))) + c_result = move(cpp_from_arrow_stream(c_stream)) return Table.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd index 471b78505fb..e176f32265f 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd @@ -16,6 +16,13 @@ cdef extern from "dlpack/dlpack.h" nogil: ctypedef struct DLManagedTensor: void(*deleter)(DLManagedTensor*) except + + +# The Arrow structs are not namespaced. +cdef extern from "cudf/interop.hpp" nogil: + cdef struct ArrowArrayStream: + void (*release)(ArrowArrayStream*) noexcept nogil + + cdef extern from "cudf/interop.hpp" namespace "cudf" \ nogil: cdef unique_ptr[table] from_dlpack(const DLManagedTensor* tensor @@ -42,3 +49,5 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \ const scalar& input, column_metadata metadata, ) except + + + cdef unique_ptr[table] from_arrow_stream(ArrowArrayStream* input) except + From 080542dab4f9b7bcca0ff9ffe1db128ae21b17c1 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 30 May 2024 23:22:02 +0000 Subject: [PATCH 03/23] Update column overload in Python --- python/cudf/cudf/_lib/pylibcudf/interop.pyx | 24 +++++++++++++++---- .../cudf/_lib/pylibcudf/libcudf/interop.pxd | 10 ++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/interop.pyx b/python/cudf/cudf/_lib/pylibcudf/interop.pyx index 6ec77b5f227..4ceca974df9 100644 --- a/python/cudf/cudf/_lib/pylibcudf/interop.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/interop.pyx @@ -12,10 +12,14 @@ from functools import singledispatch from pyarrow import lib as pa +from cudf._lib.pylibcudf.libcudf.column.column cimport column from cudf._lib.pylibcudf.libcudf.interop cimport ( + ArrowArray, ArrowArrayStream, + ArrowSchema, column_metadata, from_arrow as cpp_from_arrow, + from_arrow_column as cpp_from_arrow_column, from_arrow_stream as cpp_from_arrow_stream, to_arrow as cpp_to_arrow, ) @@ -127,9 +131,9 @@ def _from_arrow_datatype(pyarrow_object): def _from_arrow_table(pyarrow_object, *, DataType data_type=None): if data_type is not None: raise ValueError("data_type may not be passed for tables") - capsule = pyarrow_object.__arrow_c_stream__() + stream = pyarrow_object.__arrow_c_stream__() cdef ArrowArrayStream* c_stream = ( - pycapsule.PyCapsule_GetPointer(capsule, "arrow_array_stream") + pycapsule.PyCapsule_GetPointer(stream, "arrow_array_stream") ) cdef unique_ptr[table] c_result @@ -196,8 +200,20 @@ def _from_arrow_scalar(pyarrow_object, *, DataType data_type=None): def _from_arrow_column(pyarrow_object, *, DataType data_type=None): if data_type is not None: raise ValueError("data_type may not be passed for arrays") - pa_table = pa.table([pyarrow_object], [""]) - return from_arrow(pa_table).columns()[0] + + schema, array = pyarrow_object.__arrow_c_array__() + cdef ArrowSchema* c_schema = ( + pycapsule.PyCapsule_GetPointer(schema, "arrow_schema") + ) + cdef ArrowArray* c_array = ( + pycapsule.PyCapsule_GetPointer(array, "arrow_array") + ) + + cdef unique_ptr[column] c_result + with nogil: + c_result = move(cpp_from_arrow_column(c_schema, c_array)) + + return Column.from_libcudf(move(c_result)) @singledispatch diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd index e176f32265f..65b1f89fd97 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd @@ -7,6 +7,7 @@ from pyarrow.lib cimport CScalar, CTable from cudf._lib.types import cudf_to_np_types, np_to_cudf_types +from cudf._lib.pylibcudf.libcudf.column.column cimport column from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport scalar from cudf._lib.pylibcudf.libcudf.table.table cimport table from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view @@ -19,6 +20,12 @@ cdef extern from "dlpack/dlpack.h" nogil: # The Arrow structs are not namespaced. cdef extern from "cudf/interop.hpp" nogil: + cdef struct ArrowSchema: + void (*release)(ArrowSchema*) noexcept nogil + + cdef struct ArrowArray: + void (*release)(ArrowArray*) noexcept nogil + cdef struct ArrowArrayStream: void (*release)(ArrowArrayStream*) noexcept nogil @@ -51,3 +58,6 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \ ) except + cdef unique_ptr[table] from_arrow_stream(ArrowArrayStream* input) except + + cdef unique_ptr[column] from_arrow_column( + const ArrowSchema* schema, const ArrowArray* input + ) except + From 49305e3bb8c8e9b5569085e36f3a632f9eb49e98 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 30 May 2024 23:23:35 +0000 Subject: [PATCH 04/23] Add missing default values --- cpp/include/cudf/interop.hpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index efde1cb151c..0e83fa14e9d 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -367,10 +367,11 @@ std::unique_ptr from_arrow( * @param mr Device memory resource used to allocate `cudf::table` * @return cudf table generated from given arrow data */ -std::unique_ptr from_arrow(ArrowSchema const* schema, - ArrowArray const* input, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); +std::unique_ptr from_arrow( + ArrowSchema const* schema, + ArrowArray const* input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** * @brief Create `cudf::column` from a given ArrowArray and ArrowSchema input @@ -385,10 +386,11 @@ std::unique_ptr from_arrow(ArrowSchema const* schema, * @param mr Device memory resource used to allocate `cudf::column` * @return cudf column generated from given arrow data */ -std::unique_ptr from_arrow_column(ArrowSchema const* schema, - ArrowArray const* input, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); +std::unique_ptr from_arrow_column( + ArrowSchema const* schema, + ArrowArray const* input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** * @brief Create `cudf::table` from given ArrowDeviceArray input From 37814c611b960da1e21db65fa38d27c9ac9aabc0 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 30 May 2024 23:25:47 +0000 Subject: [PATCH 05/23] Formatting --- python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd index 65b1f89fd97..2151da28d4b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/interop.pxd @@ -59,5 +59,6 @@ cdef extern from "cudf/interop.hpp" namespace "cudf" \ cdef unique_ptr[table] from_arrow_stream(ArrowArrayStream* input) except + cdef unique_ptr[column] from_arrow_column( - const ArrowSchema* schema, const ArrowArray* input + const ArrowSchema* schema, + const ArrowArray* input ) except + From 04666b0e13a06bb4ebb2529bba9285e6b718e283 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 31 May 2024 00:30:12 +0000 Subject: [PATCH 06/23] Add basic handling for empty stream --- cpp/src/interop/from_arrow_stream.cu | 74 ++++++++++++++++++---------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index ae34433bbec..37d8fd408b5 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -14,10 +14,13 @@ * limitations under the License. */ -#include -#include +#include "arrow_utilities.hpp" + +#include #include #include +#include +#include #include #include @@ -26,7 +29,6 @@ #include #include -#include #include #include #include @@ -35,15 +37,39 @@ namespace cudf { namespace detail { +namespace { -std::unique_ptr
from_arrow_stream( - ArrowArrayStream* input, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +std::unique_ptr
make_empty_table(ArrowSchema const& schema, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { - CUDF_EXPECTS(input != nullptr, - "input ArrowArrayStream must not be NULL", - std::invalid_argument); + if (schema.n_children == 0) { + // If there are no chunks but the schema has children, we need to construct a suitable empty + // table. + return std::make_unique(); + } + + std::vector> columns; + for (int i = 0; i < schema.n_children; i++) { + ArrowSchema* child = schema.children[i]; + CUDF_EXPECTS(child->n_children == 0, + "Nested types in empty columns not yet supported", + std::invalid_argument); + // If the child has children, we need to construct a suitable empty table. + ArrowSchemaView schema_view; + NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&schema_view, child, nullptr)); + columns.push_back(cudf::make_empty_column(arrow_to_cudf_type(&schema_view))); + } + return std::make_unique(std::move(columns)); +} + +} // namespace + +std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_EXPECTS(input != nullptr, "input ArrowArrayStream must not be NULL", std::invalid_argument); // Potential future optimization: Since the from_arrow API accepts an // ArrowSchema we're allocating one here instead of using a view, which we @@ -51,35 +77,31 @@ std::unique_ptr
from_arrow_stream( ArrowSchema schema; NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetSchema(input, &schema, nullptr)); - // Assume that each chunk is a column. - // TODO: Is that restrictive? If we need to support actual chunking of a - // single array, how do we differentiate? Do we need a parameter in the API, - // or a different overload? - ArrowArray chunk; - std::vector> chunks; + ArrowArray chunk; while (true) { NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr)); - if (chunk.release == nullptr) { - break; - } + if (chunk.release == nullptr) { break; } chunks.push_back(from_arrow(&schema, &chunk, stream, mr)); } - input->release(input); + + if (chunks.empty()) { return make_empty_table(schema, stream, mr); } + auto chunk_views = std::vector{}; chunk_views.reserve(chunks.size()); - std::transform(chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) { return chunk->view(); }); + std::transform( + chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) { + return chunk->view(); + }); return cudf::concatenate(chunk_views, stream, mr); } } // namespace detail - -std::unique_ptr
from_arrow_stream( - ArrowArrayStream* input, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); From d925a84367977532f64f0afe9256e50072773492 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Fri, 31 May 2024 18:54:57 +0000 Subject: [PATCH 07/23] Properly support null columns, requires concatenate support too --- cpp/src/copying/concatenate.cu | 10 ++++++++++ cpp/src/interop/from_arrow_stream.cu | 11 +++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/cpp/src/copying/concatenate.cu b/cpp/src/copying/concatenate.cu index 47e74a5cb48..c97a3b2b231 100644 --- a/cpp/src/copying/concatenate.cu +++ b/cpp/src/copying/concatenate.cu @@ -477,6 +477,7 @@ void bounds_and_type_check(host_span cols, rmm::cuda_stream_v std::overflow_error); // traverse children + if (cols.front().type().id() == cudf::type_id::EMPTY) { return; } cudf::type_dispatcher(cols.front().type(), traverse_children{}, cols, stream); } @@ -498,6 +499,15 @@ std::unique_ptr concatenate(host_span columns_to_conc return empty_like(columns_to_concat.front()); } + // For empty columns, we can just create an EMPTY column of the appropriate length. + if (columns_to_concat.front().type().id() == cudf::type_id::EMPTY) { + auto length = std::accumulate( + columns_to_concat.begin(), columns_to_concat.end(), 0, [](auto a, auto const& b) { + return a + b.size(); + }); + return std::make_unique( + data_type(type_id::EMPTY), length, rmm::device_buffer{}, rmm::device_buffer{}, length); + } return type_dispatcher( columns_to_concat.front().type(), concatenate_dispatch{columns_to_concat, stream, mr}); } diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index 37d8fd408b5..4eb379be7b2 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -51,14 +51,21 @@ std::unique_ptr
make_empty_table(ArrowSchema const& schema, std::vector> columns; for (int i = 0; i < schema.n_children; i++) { - ArrowSchema* child = schema.children[i]; + ArrowSchema* child{schema.children[i]}; CUDF_EXPECTS(child->n_children == 0, "Nested types in empty columns not yet supported", std::invalid_argument); // If the child has children, we need to construct a suitable empty table. ArrowSchemaView schema_view; NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&schema_view, child, nullptr)); - columns.push_back(cudf::make_empty_column(arrow_to_cudf_type(&schema_view))); + + auto const type{arrow_to_cudf_type(&schema_view)}; + columns.push_back([&type] { + return type.id() != type_id::EMPTY + ? cudf::make_empty_column(type) + : std::make_unique( + data_type(type_id::EMPTY), 0, rmm::device_buffer{}, rmm::device_buffer{}, 0); + }()); } return std::make_unique(std::move(columns)); } From 4de47cad5480fd1c93ff15d569a802c1e35691ee Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 1 Jun 2024 00:19:09 +0000 Subject: [PATCH 08/23] Handle nested types in empty table generation --- cpp/src/interop/from_arrow_stream.cu | 59 ++++++++++++++++++---------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index 4eb379be7b2..851e9beef01 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -39,33 +39,52 @@ namespace detail { namespace { +std::unique_ptr make_empty_column_from_schema(ArrowSchema const* schema, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + ArrowSchemaView schema_view; + NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, nullptr)); + + auto const type{arrow_to_cudf_type(&schema_view)}; + switch (type.id()) { + case type_id::EMPTY: { + return std::make_unique( + data_type(type_id::EMPTY), 0, rmm::device_buffer{}, rmm::device_buffer{}, 0); + } + case type_id::LIST: { + return cudf::make_lists_column(0, + cudf::make_empty_column(data_type{type_id::INT32}), + make_empty_column_from_schema(schema->children[0], stream, mr), + 0, + {}, + stream, + mr); + } + case type_id::STRUCT: { + std::vector> child_columns; + for (int i = 0; i < schema->n_children; i++) { + child_columns.push_back(make_empty_column_from_schema(schema->children[i], stream, mr)); + } + return cudf::make_structs_column(0, std::move(child_columns), 0, {}, stream, mr); + } + default: { + return cudf::make_empty_column(type); + } + } +} + std::unique_ptr
make_empty_table(ArrowSchema const& schema, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - if (schema.n_children == 0) { - // If there are no chunks but the schema has children, we need to construct a suitable empty - // table. - return std::make_unique(); - } + if (schema.n_children == 0) { return std::make_unique(); } + // If there are no chunks but the schema has children, we need to construct a suitable empty + // table. std::vector> columns; for (int i = 0; i < schema.n_children; i++) { - ArrowSchema* child{schema.children[i]}; - CUDF_EXPECTS(child->n_children == 0, - "Nested types in empty columns not yet supported", - std::invalid_argument); - // If the child has children, we need to construct a suitable empty table. - ArrowSchemaView schema_view; - NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&schema_view, child, nullptr)); - - auto const type{arrow_to_cudf_type(&schema_view)}; - columns.push_back([&type] { - return type.id() != type_id::EMPTY - ? cudf::make_empty_column(type) - : std::make_unique( - data_type(type_id::EMPTY), 0, rmm::device_buffer{}, rmm::device_buffer{}, 0); - }()); + columns.push_back(make_empty_column_from_schema(schema.children[i], stream, mr)); } return std::make_unique(std::move(columns)); } From 03c76c85c7a70e704362a69fcfc0f47052d6c7fa Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Sat, 1 Jun 2024 00:29:32 +0000 Subject: [PATCH 09/23] Some cleanup and improved use of algorithms --- cpp/src/interop/from_arrow_stream.cu | 39 ++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index 851e9beef01..094259f37a7 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -63,9 +63,12 @@ std::unique_ptr make_empty_column_from_schema(ArrowSchema const* schema, } case type_id::STRUCT: { std::vector> child_columns; - for (int i = 0; i < schema->n_children; i++) { - child_columns.push_back(make_empty_column_from_schema(schema->children[i], stream, mr)); - } + child_columns.reserve(schema->n_children); + std::transform( + schema->children, + schema->children + schema->n_children, + std::back_inserter(child_columns), + [&](auto const& child) { return make_empty_column_from_schema(child, stream, mr); }); return cudf::make_structs_column(0, std::move(child_columns), 0, {}, stream, mr); } default: { @@ -74,21 +77,6 @@ std::unique_ptr make_empty_column_from_schema(ArrowSchema const* schema, } } -std::unique_ptr
make_empty_table(ArrowSchema const& schema, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) -{ - if (schema.n_children == 0) { return std::make_unique(); } - - // If there are no chunks but the schema has children, we need to construct a suitable empty - // table. - std::vector> columns; - for (int i = 0; i < schema.n_children; i++) { - columns.push_back(make_empty_column_from_schema(schema.children[i], stream, mr)); - } - return std::make_unique(std::move(columns)); -} - } // namespace std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, @@ -112,7 +100,20 @@ std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, } input->release(input); - if (chunks.empty()) { return make_empty_table(schema, stream, mr); } + if (chunks.empty()) { + if (schema.n_children == 0) { return std::make_unique(); } + + // If there are no chunks but the schema has children, we need to construct a suitable empty + // table. + std::vector> columns; + columns.reserve(chunks.size()); + std::transform( + schema.children, + schema.children + schema.n_children, + std::back_inserter(columns), + [&](auto const& child) { return make_empty_column_from_schema(child, stream, mr); }); + return std::make_unique(std::move(columns)); + } auto chunk_views = std::vector{}; chunk_views.reserve(chunks.size()); From 6454a8a204549e33ce1a791740f1628d44574407 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 3 Jun 2024 22:58:06 +0000 Subject: [PATCH 10/23] Generalize check in concatenate --- cpp/src/copying/concatenate.cu | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cpp/src/copying/concatenate.cu b/cpp/src/copying/concatenate.cu index c97a3b2b231..6acbafd24fb 100644 --- a/cpp/src/copying/concatenate.cu +++ b/cpp/src/copying/concatenate.cu @@ -463,10 +463,6 @@ void traverse_children::operator()(host_span */ void bounds_and_type_check(host_span cols, rmm::cuda_stream_view stream) { - CUDF_EXPECTS(cudf::all_have_same_types(cols.begin(), cols.end()), - "Type mismatch in columns to concatenate.", - cudf::data_type_error); - // total size of all concatenated rows size_t const total_row_count = std::accumulate(cols.begin(), cols.end(), std::size_t{}, [](size_t a, auto const& b) { @@ -476,8 +472,22 @@ void bounds_and_type_check(host_span cols, rmm::cuda_stream_v "Total number of concatenated rows exceeds the column size limit", std::overflow_error); + if (std::any_of(cols.begin(), cols.end(), [](column_view const& c) { + return c.type().id() == cudf::type_id::EMPTY; + })) { + CUDF_EXPECTS( + std::all_of(cols.begin(), + cols.end(), + [](column_view const& c) { return c.type().id() == cudf::type_id::EMPTY; }), + "Mismatch in columns to concatenate.", + cudf::data_type_error); + return; + } + CUDF_EXPECTS(cudf::all_have_same_types(cols.begin(), cols.end()), + "Type mismatch in columns to concatenate.", + cudf::data_type_error); + // traverse children - if (cols.front().type().id() == cudf::type_id::EMPTY) { return; } cudf::type_dispatcher(cols.front().type(), traverse_children{}, cols, stream); } From e5f0c35cd9e1c58555c6a5769bdad59af5af4576 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 3 Jun 2024 22:59:36 +0000 Subject: [PATCH 11/23] Add concatenate tests for EMPTY dtype --- cpp/tests/copying/concatenate_tests.cpp | 60 +++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/cpp/tests/copying/concatenate_tests.cpp b/cpp/tests/copying/concatenate_tests.cpp index 3b7bff69938..b132985da6a 100644 --- a/cpp/tests/copying/concatenate_tests.cpp +++ b/cpp/tests/copying/concatenate_tests.cpp @@ -1658,3 +1658,63 @@ TEST_F(DictionaryConcatTest, ErrorsTest) std::vector empty; EXPECT_THROW(cudf::concatenate(empty), cudf::logic_error); } + +struct EmptyColumnTest : public cudf::test::BaseFixture {}; + +TEST_F(EmptyColumnTest, SimpleTest) +{ + std::vector columns; + constexpr auto num_copies = 10; + constexpr auto num_rows = 10; + for (auto i = 0; i < num_copies; ++i) { + columns.emplace_back(cudf::data_type(cudf::type_id::EMPTY), + num_rows, + rmm::device_buffer{}, + rmm::device_buffer{}, + 0); + } + + // Create views from columns + std::vector views; + for (auto& col : columns) { + views.push_back(col.view()); + } + auto result = cudf::concatenate(views); + + ASSERT_EQ(result->size(), num_copies * num_rows); + ASSERT_EQ(result->type().id(), cudf::type_id::EMPTY); +} + +struct TableOfEmptyColumnsTest : public cudf::test::BaseFixture {}; + +TEST_F(TableOfEmptyColumnsTest, SimpleTest) +{ + std::vector tables; + constexpr auto num_copies = 10; + constexpr auto num_rows = 10; + constexpr auto num_columns = 10; + for (auto i = 0; i < num_copies; ++i) { + std::vector> columns; + for (auto j = 0; j < num_columns; ++j) { + columns.push_back(std::make_unique(cudf::data_type(cudf::type_id::EMPTY), + num_rows, + rmm::device_buffer{}, + rmm::device_buffer{}, + 0)); + } + tables.emplace_back(std::move(columns)); + } + + // Create views from columns + std::vector views; + for (auto& tbl : tables) { + views.push_back(tbl.view()); + } + auto result = cudf::concatenate(views); + + ASSERT_EQ(result->num_rows(), num_copies * num_rows); + ASSERT_EQ(result->num_columns(), num_columns); + for (auto i = 0; i < num_columns; ++i) { + ASSERT_EQ(result->get_column(i).type().id(), cudf::type_id::EMPTY); + } +} From b5dd6d8a4cfeddab724e031539f21de0506452cd Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 4 Jun 2024 06:21:21 +0000 Subject: [PATCH 12/23] Add main test for from_arrow_stream --- cpp/tests/CMakeLists.txt | 1 + cpp/tests/interop/from_arrow_stream_test.cpp | 189 +++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 cpp/tests/interop/from_arrow_stream_test.cpp diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 2f2c12f265c..4d27dbb45b6 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -270,6 +270,7 @@ ConfigureTest( interop/from_arrow_test.cpp interop/from_arrow_device_test.cpp interop/from_arrow_host_test.cpp + interop/from_arrow_stream_test.cpp interop/dlpack_test.cpp EXTRA_LIB nanoarrow diff --git a/cpp/tests/interop/from_arrow_stream_test.cpp b/cpp/tests/interop/from_arrow_stream_test.cpp new file mode 100644 index 00000000000..6501073122f --- /dev/null +++ b/cpp/tests/interop/from_arrow_stream_test.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "nanoarrow_utils.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +// create a cudf::table and equivalent arrow table with host memory +std::tuple, nanoarrow::UniqueSchema, nanoarrow::UniqueArray> +get_nanoarrow_host_tables_for_stream(cudf::size_type length) +{ + auto [table, schema, test_data] = get_nanoarrow_cudf_table(length); + + auto int64_array = get_nanoarrow_array(test_data.int64_data, test_data.validity); + auto string_array = + get_nanoarrow_array(test_data.string_data, test_data.validity); + cudf::dictionary_column_view view(table->get_column(2).view()); + auto keys = cudf::test::to_host(view.keys()).first; + auto indices = cudf::test::to_host(view.indices()).first; + auto dict_array = get_nanoarrow_dict_array(std::vector(keys.begin(), keys.end()), + std::vector(indices.begin(), indices.end()), + test_data.validity); + auto boolarray = get_nanoarrow_array(test_data.bool_data, test_data.bool_validity); + auto list_array = get_nanoarrow_list_array(test_data.list_int64_data, + test_data.list_offsets, + test_data.list_int64_data_validity, + test_data.bool_data_validity); + + nanoarrow::UniqueArray arrow; + NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(arrow.get(), schema.get(), nullptr)); + arrow->length = length; + + int64_array.move(arrow->children[0]); + string_array.move(arrow->children[1]); + dict_array.move(arrow->children[2]); + boolarray.move(arrow->children[3]); + list_array.move(arrow->children[4]); + + int64_array = get_nanoarrow_array(test_data.int64_data, test_data.validity); + string_array = get_nanoarrow_array(test_data.string_data, test_data.validity); + int64_array.move(arrow->children[5]->children[0]); + string_array.move(arrow->children[5]->children[1]); + + ArrowBitmap struct_validity; + ArrowBitmapInit(&struct_validity); + NANOARROW_THROW_NOT_OK(ArrowBitmapReserve(&struct_validity, length)); + ArrowBitmapAppendInt8Unsafe( + &struct_validity, reinterpret_cast(test_data.bool_data_validity.data()), length); + arrow->children[5]->length = length; + ArrowArraySetValidityBitmap(arrow->children[5], &struct_validity); + arrow->children[5]->null_count = + length - ArrowBitCountSet(ArrowArrayValidityBitmap(arrow->children[5])->buffer.data, 0, length); + + ArrowError error; + if (ArrowArrayFinishBuilding(arrow.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, &error) != + NANOARROW_OK) { + std::cerr << ArrowErrorMessage(&error) << std::endl; + CUDF_FAIL("failed to build example arrays"); + } + + return std::make_tuple(std::move(table), std::move(schema), std::move(arrow)); +} + +static void null_release_schema(ArrowSchema* stream) {} +static void null_release_array(ArrowArray* stream) {} + +struct VectorOfArrays { + std::vector arrays; + nanoarrow::UniqueSchema schema; + size_t index{0}; + + static int get_schema(ArrowArrayStream* stream, ArrowSchema* out_schema) + { + auto private_data = static_cast(stream->private_data); + // Copy the schema attributes every time, but leave the release function as null since the + // schema is owned by the VectorOfArrays and released upon its destruction. + out_schema->format = private_data->schema->format; + out_schema->name = private_data->schema->name; + out_schema->metadata = private_data->schema->metadata; + out_schema->flags = private_data->schema->flags; + out_schema->n_children = private_data->schema->n_children; + out_schema->children = private_data->schema->children; + out_schema->dictionary = private_data->schema->dictionary; + out_schema->release = null_release_schema; + + return 0; + } + + static int get_next(ArrowArrayStream* stream, ArrowArray* out_array) + { + auto private_data = static_cast(stream->private_data); + if (private_data->index >= private_data->arrays.size()) { + out_array->release = nullptr; + return 0; + } + // Copy the attributes, but leave the release function as null since the + // array is owned by the VectorOfArrays and released upon its destruction. + auto ret_array = private_data->arrays[private_data->index++].get(); + out_array->length = ret_array->length; + out_array->null_count = ret_array->null_count; + out_array->offset = ret_array->offset; + out_array->n_buffers = ret_array->n_buffers; + out_array->buffers = ret_array->buffers; + out_array->n_children = ret_array->n_children; + out_array->children = ret_array->children; + out_array->dictionary = ret_array->dictionary; + out_array->release = null_release_array; + + return 0; + } + + static const char* get_last_error(ArrowArrayStream* stream) { return nullptr; } + + static void release(ArrowArrayStream* stream) + { + delete static_cast(stream->private_data); + } +}; + +struct FromArrowStreamTest : public cudf::test::BaseFixture {}; + +void makeStreamFromArrays(std::vector arrays, + nanoarrow::UniqueSchema schema, + ArrowArrayStream* out) +{ + auto* private_data = new VectorOfArrays{std::move(arrays), std::move(schema)}; + out->get_schema = VectorOfArrays::get_schema; + out->get_next = VectorOfArrays::get_next; + out->get_last_error = VectorOfArrays::get_last_error; + out->release = VectorOfArrays::release; + out->private_data = private_data; +} + +TEST_F(FromArrowStreamTest, BasicTest) +{ + constexpr auto num_copies = 3; + std::vector> tables; + // The schema is unique across all tables. + nanoarrow::UniqueSchema schema; + std::vector arrays; + for (auto i = 0; i < num_copies; ++i) { + auto [tbl, sch, arr] = get_nanoarrow_host_tables_for_stream(0); + tables.push_back(std::move(tbl)); + arrays.push_back(std::move(arr)); + if (i == 0) { sch.move(schema.get()); } + } + std::vector table_views; + for (auto const& table : tables) { + table_views.push_back(table->view()); + } + auto expected = cudf::concatenate(table_views); + + ArrowArrayStream stream; + makeStreamFromArrays(std::move(arrays), std::move(schema), &stream); + auto result = cudf::from_arrow_stream(&stream); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result->view()); +} From 01c4de214f8d8ffae0d0b57fbd19637ba16c917a Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 4 Jun 2024 18:08:23 +0000 Subject: [PATCH 13/23] Add a test with an empty stream --- cpp/tests/interop/from_arrow_stream_test.cpp | 40 +++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/cpp/tests/interop/from_arrow_stream_test.cpp b/cpp/tests/interop/from_arrow_stream_test.cpp index 6501073122f..7ff823d526b 100644 --- a/cpp/tests/interop/from_arrow_stream_test.cpp +++ b/cpp/tests/interop/from_arrow_stream_test.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include @@ -93,7 +94,6 @@ get_nanoarrow_host_tables_for_stream(cudf::size_type length) return std::make_tuple(std::move(table), std::move(schema), std::move(arrow)); } -static void null_release_schema(ArrowSchema* stream) {} static void null_release_array(ArrowArray* stream) {} struct VectorOfArrays { @@ -104,16 +104,11 @@ struct VectorOfArrays { static int get_schema(ArrowArrayStream* stream, ArrowSchema* out_schema) { auto private_data = static_cast(stream->private_data); - // Copy the schema attributes every time, but leave the release function as null since the - // schema is owned by the VectorOfArrays and released upon its destruction. - out_schema->format = private_data->schema->format; - out_schema->name = private_data->schema->name; - out_schema->metadata = private_data->schema->metadata; - out_schema->flags = private_data->schema->flags; - out_schema->n_children = private_data->schema->n_children; - out_schema->children = private_data->schema->children; - out_schema->dictionary = private_data->schema->dictionary; - out_schema->release = null_release_schema; + // TODO: Can the deep copy be avoided here? I tried creating a new schema + // with a shallow copy of the fields and seeing the release function to a + // no-op, but that resulted in the children being freed somehow in the + // EmptyTest (I didn't investigate further). + ArrowSchemaDeepCopy(private_data->schema.get(), out_schema); return 0; } @@ -125,9 +120,14 @@ struct VectorOfArrays { out_array->release = nullptr; return 0; } - // Copy the attributes, but leave the release function as null since the - // array is owned by the VectorOfArrays and released upon its destruction. - auto ret_array = private_data->arrays[private_data->index++].get(); + auto ret_array = private_data->arrays[private_data->index++].get(); + // TODO: This shallow copy seems to work, but is it safe, especially with + // respect to the children? I believe we should be safe from double-freeing + // because everything will check for a null release pointer before freeing, + // but that could produce use-after-free bugs especially with the children + // since that pointer is just copied over. My current tests won't reflect + // that but creating multiple streams from the same set of arrays would. + // Is that even a valid use case? out_array->length = ret_array->length; out_array->null_count = ret_array->null_count; out_array->offset = ret_array->offset; @@ -187,3 +187,15 @@ TEST_F(FromArrowStreamTest, BasicTest) auto result = cudf::from_arrow_stream(&stream); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result->view()); } + +TEST_F(FromArrowStreamTest, EmptyTest) +{ + auto [tbl, sch, arr] = get_nanoarrow_host_tables_for_stream(0); + std::vector table_views{tbl->view()}; + auto expected = cudf::concatenate(table_views); + + ArrowArrayStream stream; + makeStreamFromArrays({}, std::move(sch), &stream); + auto result = cudf::from_arrow_stream(&stream); + cudf::have_same_types(expected->view(), result->view()); +} From 5bf1469c387145821378b228428babe027f87aa9 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 4 Jun 2024 18:18:38 +0000 Subject: [PATCH 14/23] Reuse host table creation --- cpp/tests/interop/from_arrow_stream_test.cpp | 60 +------------------- cpp/tests/interop/nanoarrow_utils.hpp | 3 + 2 files changed, 5 insertions(+), 58 deletions(-) diff --git a/cpp/tests/interop/from_arrow_stream_test.cpp b/cpp/tests/interop/from_arrow_stream_test.cpp index 7ff823d526b..1082de826eb 100644 --- a/cpp/tests/interop/from_arrow_stream_test.cpp +++ b/cpp/tests/interop/from_arrow_stream_test.cpp @@ -38,62 +38,6 @@ #include -// create a cudf::table and equivalent arrow table with host memory -std::tuple, nanoarrow::UniqueSchema, nanoarrow::UniqueArray> -get_nanoarrow_host_tables_for_stream(cudf::size_type length) -{ - auto [table, schema, test_data] = get_nanoarrow_cudf_table(length); - - auto int64_array = get_nanoarrow_array(test_data.int64_data, test_data.validity); - auto string_array = - get_nanoarrow_array(test_data.string_data, test_data.validity); - cudf::dictionary_column_view view(table->get_column(2).view()); - auto keys = cudf::test::to_host(view.keys()).first; - auto indices = cudf::test::to_host(view.indices()).first; - auto dict_array = get_nanoarrow_dict_array(std::vector(keys.begin(), keys.end()), - std::vector(indices.begin(), indices.end()), - test_data.validity); - auto boolarray = get_nanoarrow_array(test_data.bool_data, test_data.bool_validity); - auto list_array = get_nanoarrow_list_array(test_data.list_int64_data, - test_data.list_offsets, - test_data.list_int64_data_validity, - test_data.bool_data_validity); - - nanoarrow::UniqueArray arrow; - NANOARROW_THROW_NOT_OK(ArrowArrayInitFromSchema(arrow.get(), schema.get(), nullptr)); - arrow->length = length; - - int64_array.move(arrow->children[0]); - string_array.move(arrow->children[1]); - dict_array.move(arrow->children[2]); - boolarray.move(arrow->children[3]); - list_array.move(arrow->children[4]); - - int64_array = get_nanoarrow_array(test_data.int64_data, test_data.validity); - string_array = get_nanoarrow_array(test_data.string_data, test_data.validity); - int64_array.move(arrow->children[5]->children[0]); - string_array.move(arrow->children[5]->children[1]); - - ArrowBitmap struct_validity; - ArrowBitmapInit(&struct_validity); - NANOARROW_THROW_NOT_OK(ArrowBitmapReserve(&struct_validity, length)); - ArrowBitmapAppendInt8Unsafe( - &struct_validity, reinterpret_cast(test_data.bool_data_validity.data()), length); - arrow->children[5]->length = length; - ArrowArraySetValidityBitmap(arrow->children[5], &struct_validity); - arrow->children[5]->null_count = - length - ArrowBitCountSet(ArrowArrayValidityBitmap(arrow->children[5])->buffer.data, 0, length); - - ArrowError error; - if (ArrowArrayFinishBuilding(arrow.get(), NANOARROW_VALIDATION_LEVEL_MINIMAL, &error) != - NANOARROW_OK) { - std::cerr << ArrowErrorMessage(&error) << std::endl; - CUDF_FAIL("failed to build example arrays"); - } - - return std::make_tuple(std::move(table), std::move(schema), std::move(arrow)); -} - static void null_release_array(ArrowArray* stream) {} struct VectorOfArrays { @@ -171,7 +115,7 @@ TEST_F(FromArrowStreamTest, BasicTest) nanoarrow::UniqueSchema schema; std::vector arrays; for (auto i = 0; i < num_copies; ++i) { - auto [tbl, sch, arr] = get_nanoarrow_host_tables_for_stream(0); + auto [tbl, sch, arr] = get_nanoarrow_host_tables(0); tables.push_back(std::move(tbl)); arrays.push_back(std::move(arr)); if (i == 0) { sch.move(schema.get()); } @@ -190,7 +134,7 @@ TEST_F(FromArrowStreamTest, BasicTest) TEST_F(FromArrowStreamTest, EmptyTest) { - auto [tbl, sch, arr] = get_nanoarrow_host_tables_for_stream(0); + auto [tbl, sch, arr] = get_nanoarrow_host_tables(0); std::vector table_views{tbl->view()}; auto expected = cudf::concatenate(table_views); diff --git a/cpp/tests/interop/nanoarrow_utils.hpp b/cpp/tests/interop/nanoarrow_utils.hpp index a79e6fdc49c..1ae544c39e2 100644 --- a/cpp/tests/interop/nanoarrow_utils.hpp +++ b/cpp/tests/interop/nanoarrow_utils.hpp @@ -375,3 +375,6 @@ nanoarrow::UniqueArray get_nanoarrow_list_array(std::initializer_list data, std::tuple, nanoarrow::UniqueSchema, generated_test_data> get_nanoarrow_cudf_table(cudf::size_type length); + +std::tuple, nanoarrow::UniqueSchema, nanoarrow::UniqueArray> +get_nanoarrow_host_tables(cudf::size_type length); From 655ad9fc9ca22a91f3c7de14e4e441ae092ebb3f Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Thu, 6 Jun 2024 00:04:59 +0000 Subject: [PATCH 15/23] Address PR comments --- cpp/tests/interop/from_arrow_stream_test.cpp | 26 +------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/cpp/tests/interop/from_arrow_stream_test.cpp b/cpp/tests/interop/from_arrow_stream_test.cpp index 1082de826eb..418ec057303 100644 --- a/cpp/tests/interop/from_arrow_stream_test.cpp +++ b/cpp/tests/interop/from_arrow_stream_test.cpp @@ -38,8 +38,6 @@ #include -static void null_release_array(ArrowArray* stream) {} - struct VectorOfArrays { std::vector arrays; nanoarrow::UniqueSchema schema; @@ -48,12 +46,7 @@ struct VectorOfArrays { static int get_schema(ArrowArrayStream* stream, ArrowSchema* out_schema) { auto private_data = static_cast(stream->private_data); - // TODO: Can the deep copy be avoided here? I tried creating a new schema - // with a shallow copy of the fields and seeing the release function to a - // no-op, but that resulted in the children being freed somehow in the - // EmptyTest (I didn't investigate further). ArrowSchemaDeepCopy(private_data->schema.get(), out_schema); - return 0; } @@ -64,24 +57,7 @@ struct VectorOfArrays { out_array->release = nullptr; return 0; } - auto ret_array = private_data->arrays[private_data->index++].get(); - // TODO: This shallow copy seems to work, but is it safe, especially with - // respect to the children? I believe we should be safe from double-freeing - // because everything will check for a null release pointer before freeing, - // but that could produce use-after-free bugs especially with the children - // since that pointer is just copied over. My current tests won't reflect - // that but creating multiple streams from the same set of arrays would. - // Is that even a valid use case? - out_array->length = ret_array->length; - out_array->null_count = ret_array->null_count; - out_array->offset = ret_array->offset; - out_array->n_buffers = ret_array->n_buffers; - out_array->buffers = ret_array->buffers; - out_array->n_children = ret_array->n_children; - out_array->children = ret_array->children; - out_array->dictionary = ret_array->dictionary; - out_array->release = null_release_array; - + ArrowArrayMove(private_data->arrays[private_data->index++].get(), out_array); return 0; } From cb2c17c08514aa64b9627c78c197472590028908 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 18 Jun 2024 23:54:57 +0000 Subject: [PATCH 16/23] Change from_arrow_stream to release the arrays --- cpp/include/cudf/interop.hpp | 4 ++-- cpp/src/interop/from_arrow_stream.cu | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/cpp/include/cudf/interop.hpp b/cpp/include/cudf/interop.hpp index cd67e90b7d5..502ffb9ba4f 100644 --- a/cpp/include/cudf/interop.hpp +++ b/cpp/include/cudf/interop.hpp @@ -423,8 +423,8 @@ std::unique_ptr
from_arrow_host( * * @throws std::invalid_argument if input is NULL * - * The conversion will not call release on the input ArrayArrayStream or its - * constituent arrays or schema. + * The conversion WILL release the input ArrayArrayStream and its constituent + * arrays or schema since Arrow streams are not suitable for multiple reads. * * @param input `ArrowArrayStream` pointer to object that will produce ArrowArray data * @param stream CUDA stream used for device memory operations and kernel launches diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index 094259f37a7..cb3c19ce570 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -97,11 +97,15 @@ std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, NANOARROW_THROW_NOT_OK(ArrowArrayStreamGetNext(input, &chunk, nullptr)); if (chunk.release == nullptr) { break; } chunks.push_back(from_arrow(&schema, &chunk, stream, mr)); + chunk.release(&chunk); } input->release(input); if (chunks.empty()) { - if (schema.n_children == 0) { return std::make_unique(); } + if (schema.n_children == 0) { + schema.release(&schema); + return std::make_unique(); + } // If there are no chunks but the schema has children, we need to construct a suitable empty // table. @@ -112,9 +116,12 @@ std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, schema.children + schema.n_children, std::back_inserter(columns), [&](auto const& child) { return make_empty_column_from_schema(child, stream, mr); }); + schema.release(&schema); return std::make_unique(std::move(columns)); } + schema.release(&schema); + auto chunk_views = std::vector{}; chunk_views.reserve(chunks.size()); std::transform( From 3ef157ca53165866b45ace0bcc0276ce8c2ccd7c Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 19 Jun 2024 04:33:02 +0000 Subject: [PATCH 17/23] Properly release in Python --- python/cudf/cudf/_lib/pylibcudf/interop.pyx | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/cudf/cudf/_lib/pylibcudf/interop.pyx b/python/cudf/cudf/_lib/pylibcudf/interop.pyx index 4ceca974df9..adf7e1fd7e8 100644 --- a/python/cudf/cudf/_lib/pylibcudf/interop.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/interop.pyx @@ -138,6 +138,7 @@ def _from_arrow_table(pyarrow_object, *, DataType data_type=None): cdef unique_ptr[table] c_result with nogil: + # The libcudf function here will release the stream. c_result = move(cpp_from_arrow_stream(c_stream)) return Table.from_libcudf(move(c_result)) @@ -213,6 +214,11 @@ def _from_arrow_column(pyarrow_object, *, DataType data_type=None): with nogil: c_result = move(cpp_from_arrow_column(c_schema, c_array)) + # The capsule destructors should release automatically for us, but we + # choose to do it explicitly here for clarity. + c_schema.release(c_schema) + c_array.release(c_array) + return Column.from_libcudf(move(c_result)) From 93cb5124bc0fa9b1875fb8dcc4e5a76e37328b92 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Tue, 25 Jun 2024 20:54:13 +0000 Subject: [PATCH 18/23] Initial attempts to get large strings working --- cpp/src/interop/arrow_utilities.cpp | 3 ++- cpp/src/interop/from_arrow_host.cu | 35 ++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cpp/src/interop/arrow_utilities.cpp b/cpp/src/interop/arrow_utilities.cpp index dd9e9600a87..605d813ed1e 100644 --- a/cpp/src/interop/arrow_utilities.cpp +++ b/cpp/src/interop/arrow_utilities.cpp @@ -39,7 +39,8 @@ data_type arrow_to_cudf_type(ArrowSchemaView const* arrow_view) case NANOARROW_TYPE_FLOAT: return data_type(type_id::FLOAT32); case NANOARROW_TYPE_DOUBLE: return data_type(type_id::FLOAT64); case NANOARROW_TYPE_DATE32: return data_type(type_id::TIMESTAMP_DAYS); - case NANOARROW_TYPE_STRING: return data_type(type_id::STRING); + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_LARGE_STRING: return data_type(type_id::STRING); case NANOARROW_TYPE_LIST: return data_type(type_id::LIST); case NANOARROW_TYPE_DICTIONARY: return data_type(type_id::DICTIONARY32); case NANOARROW_TYPE_STRUCT: return data_type(type_id::STRUCT); diff --git a/cpp/src/interop/from_arrow_host.cu b/cpp/src/interop/from_arrow_host.cu index 854a1d68fdc..3ba97485290 100644 --- a/cpp/src/interop/from_arrow_host.cu +++ b/cpp/src/interop/from_arrow_host.cu @@ -188,8 +188,8 @@ std::unique_ptr dispatch_copy_from_arrow_host::operator()(offset_buffers[1])[input->length + input->offset]; + int64_t const char_data_length = + reinterpret_cast(offset_buffers[1])[input->length + input->offset]; void const* char_buffers[2] = {nullptr, input->buffers[2]}; ArrowArray char_array = { .length = char_data_length, @@ -210,15 +210,38 @@ std::unique_ptr dispatch_copy_from_arrow_host::operator()operator()(&view, &offsets_array, data_type(type_id::INT32), true); + auto offsets_column = [&]() { + if (schema->type == NANOARROW_TYPE_LARGE_STRING) { + return this->operator()(&view, &offsets_array, data_type(type_id::INT64), true); + } else if (schema->type == NANOARROW_TYPE_STRING) { + return this->operator()(&view, &offsets_array, data_type(type_id::INT32), true); + } else { + CUDF_FAIL("Unsupported string type", cudf::data_type_error); + } + }(); NANOARROW_THROW_NOT_OK(ArrowSchemaViewInit(&view, char_data_schema.get(), nullptr)); - auto chars_column = this->operator()(&view, &char_array, data_type(type_id::INT8), true); + rmm::device_buffer chars = [&] { + if (schema->type == NANOARROW_TYPE_LARGE_STRING) { + rmm::device_buffer chars(char_data_length, stream, mr); + CUDF_CUDA_TRY(cudaMemcpyAsync(chars.data(), + reinterpret_cast(char_array.buffers[1]), + chars.size(), + cudaMemcpyDefault, + stream.value())); + return chars; + } else if (schema->type == NANOARROW_TYPE_STRING) { + auto chars_column = + this->operator()(&view, &char_array, data_type(type_id::INT8), true); + return std::move(*chars_column->release().data.release()); + } else { + CUDF_FAIL("Unsupported string type", cudf::data_type_error); + } + }(); auto const num_rows = offsets_column->size() - 1; auto out_col = make_strings_column(num_rows, std::move(offsets_column), - std::move(chars_column->release().data.release()[0]), + std::move(chars), input->null_count, std::move(*get_mask_buffer(input))); From e972907a2a17b3eb068571276700eacbf72113e5 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 26 Jun 2024 01:20:01 +0000 Subject: [PATCH 19/23] Properly access string lengths --- cpp/src/interop/from_arrow_host.cu | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/src/interop/from_arrow_host.cu b/cpp/src/interop/from_arrow_host.cu index 3ba97485290..a58e0ad72f7 100644 --- a/cpp/src/interop/from_arrow_host.cu +++ b/cpp/src/interop/from_arrow_host.cu @@ -188,8 +188,16 @@ std::unique_ptr dispatch_copy_from_arrow_host::operator()(offset_buffers[1])[input->length + input->offset]; + int64_t const char_data_length = [&]() { + if (schema->type == NANOARROW_TYPE_LARGE_STRING) { + return reinterpret_cast(offset_buffers[1])[input->length + input->offset]; + } else if (schema->type == NANOARROW_TYPE_STRING) { + return static_cast( + reinterpret_cast(offset_buffers[1])[input->length + input->offset]); + } else { + CUDF_FAIL("Unsupported string type", cudf::data_type_error); + } + }(); void const* char_buffers[2] = {nullptr, input->buffers[2]}; ArrowArray char_array = { .length = char_data_length, From 3d227dd1c84ca49bea0fd608f7793e095232b417 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 26 Jun 2024 17:27:03 +0000 Subject: [PATCH 20/23] Drop the exact equivalence comparison since libcudf will compress large strings to strings if the compression is lossless --- python/cudf/cudf/tests/test_series.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index 467d0c46ae7..66bec9daf2d 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -2757,8 +2757,6 @@ def test_series_from_large_string(pa_type): assert_eq(expected, got) - assert pa_string_array.equals(got.to_arrow()) - @pytest.mark.parametrize( "scalar", From 6d255d54ba3f74740d96f6c75f39dfe7a007c686 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 26 Jun 2024 17:40:54 +0000 Subject: [PATCH 21/23] Block from_arrow_device on large strings --- cpp/src/interop/from_arrow_device.cu | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/interop/from_arrow_device.cu b/cpp/src/interop/from_arrow_device.cu index 002a8ec1f14..73c1a474310 100644 --- a/cpp/src/interop/from_arrow_device.cu +++ b/cpp/src/interop/from_arrow_device.cu @@ -143,6 +143,9 @@ dispatch_tuple_t dispatch_from_arrow_device::operator()( rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { + CUDF_EXPECTS(schema->type != NANOARROW_TYPE_LARGE_STRING, + "Large strings are not yet supported in from_arrow_device", + cudf::data_type_error); if (input->length == 0) { return std::make_tuple( {type, From 1abe0727958b9c7117e0aa50cbcb08e9be5552ee Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 26 Jun 2024 17:47:19 +0000 Subject: [PATCH 22/23] Remove unnecessary code specialization for chars buffer --- cpp/src/interop/from_arrow_host.cu | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/cpp/src/interop/from_arrow_host.cu b/cpp/src/interop/from_arrow_host.cu index a58e0ad72f7..b7e07056686 100644 --- a/cpp/src/interop/from_arrow_host.cu +++ b/cpp/src/interop/from_arrow_host.cu @@ -229,23 +229,12 @@ std::unique_ptr dispatch_copy_from_arrow_host::operator()type == NANOARROW_TYPE_LARGE_STRING) { - rmm::device_buffer chars(char_data_length, stream, mr); - CUDF_CUDA_TRY(cudaMemcpyAsync(chars.data(), - reinterpret_cast(char_array.buffers[1]), - chars.size(), - cudaMemcpyDefault, - stream.value())); - return chars; - } else if (schema->type == NANOARROW_TYPE_STRING) { - auto chars_column = - this->operator()(&view, &char_array, data_type(type_id::INT8), true); - return std::move(*chars_column->release().data.release()); - } else { - CUDF_FAIL("Unsupported string type", cudf::data_type_error); - } - }(); + rmm::device_buffer chars(char_data_length, stream, mr); + CUDF_CUDA_TRY(cudaMemcpyAsync(chars.data(), + reinterpret_cast(char_array.buffers[1]), + chars.size(), + cudaMemcpyDefault, + stream.value())); auto const num_rows = offsets_column->size() - 1; auto out_col = make_strings_column(num_rows, std::move(offsets_column), From 26fcaaf828729a0d24fb4b900ff7a49b7a04e477 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Mon, 1 Jul 2024 19:00:42 +0000 Subject: [PATCH 23/23] Address reviews --- cpp/src/interop/from_arrow_stream.cu | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/interop/from_arrow_stream.cu b/cpp/src/interop/from_arrow_stream.cu index cb3c19ce570..0c85b561944 100644 --- a/cpp/src/interop/from_arrow_stream.cu +++ b/cpp/src/interop/from_arrow_stream.cu @@ -17,7 +17,7 @@ #include "arrow_utilities.hpp" #include -#include +#include #include #include #include @@ -128,7 +128,7 @@ std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, chunks.begin(), chunks.end(), std::back_inserter(chunk_views), [](auto const& chunk) { return chunk->view(); }); - return cudf::concatenate(chunk_views, stream, mr); + return cudf::detail::concatenate(chunk_views, stream, mr); } } // namespace detail @@ -138,7 +138,6 @@ std::unique_ptr
from_arrow_stream(ArrowArrayStream* input, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); - return detail::from_arrow_stream(input, stream, mr); } } // namespace cudf