From c7f6a22bb3edd3cea377d5405ca48a9eee353bc4 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 25 Sep 2024 12:59:58 -1000 Subject: [PATCH 1/4] Add string.attributes APIs to pylibcudf (#16785) Contributes to https://github.com/rapidsai/cudf/issues/15162 Authors: - Matthew Roeschke (https://github.com/mroeschke) - Matthew Murray (https://github.com/Matt711) Approvers: - Matthew Murray (https://github.com/Matt711) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16785 --- python/cudf/cudf/_lib/strings/attributes.pyx | 46 ++++------- .../pylibcudf/strings/CMakeLists.txt | 17 ++++- .../pylibcudf/pylibcudf/strings/__init__.pxd | 19 +++++ .../pylibcudf/pylibcudf/strings/__init__.py | 19 +++++ .../pylibcudf/strings/attributes.pxd | 10 +++ .../pylibcudf/strings/attributes.pyx | 76 +++++++++++++++++++ .../pylibcudf/tests/test_string_attributes.py | 32 ++++++++ 7 files changed, 185 insertions(+), 34 deletions(-) create mode 100644 python/pylibcudf/pylibcudf/strings/attributes.pxd create mode 100644 python/pylibcudf/pylibcudf/strings/attributes.pyx create mode 100644 python/pylibcudf/pylibcudf/tests/test_string_attributes.py diff --git a/python/cudf/cudf/_lib/strings/attributes.pyx b/python/cudf/cudf/_lib/strings/attributes.pyx index fe8c17c9e31..df81b3942b4 100644 --- a/python/cudf/cudf/_lib/strings/attributes.pyx +++ b/python/cudf/cudf/_lib/strings/attributes.pyx @@ -2,19 +2,10 @@ from cudf.core.buffer import acquire_spill_lock -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move - -from pylibcudf.libcudf.column.column cimport column -from pylibcudf.libcudf.column.column_view cimport column_view -from pylibcudf.libcudf.strings.attributes cimport ( - code_points as cpp_code_points, - count_bytes as cpp_count_bytes, - count_characters as cpp_count_characters, -) - from cudf._lib.column cimport Column +import pylibcudf as plc + @acquire_spill_lock() def count_characters(Column source_strings): @@ -22,13 +13,10 @@ def count_characters(Column source_strings): Returns an integer numeric column containing the length of each string in characters. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - - with nogil: - c_result = move(cpp_count_characters(source_view)) - - return Column.from_unique_ptr(move(c_result)) + plc_column = plc.strings.attributes.count_characters( + source_strings.to_pylibcudf(mode="read") + ) + return Column.from_pylibcudf(plc_column) @acquire_spill_lock() @@ -37,13 +25,10 @@ def count_bytes(Column source_strings): Returns an integer numeric column containing the number of bytes of each string. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - - with nogil: - c_result = move(cpp_count_bytes(source_view)) - - return Column.from_unique_ptr(move(c_result)) + plc_column = plc.strings.attributes.count_bytes( + source_strings.to_pylibcudf(mode="read") + ) + return Column.from_pylibcudf(plc_column) @acquire_spill_lock() @@ -52,10 +37,7 @@ def code_points(Column source_strings): Creates a numeric column with code point values (integers) for each character of each string. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - - with nogil: - c_result = move(cpp_code_points(source_view)) - - return Column.from_unique_ptr(move(c_result)) + plc_column = plc.strings.attributes.code_points( + source_strings.to_pylibcudf(mode="read") + ) + return Column.from_pylibcudf(plc_column) diff --git a/python/pylibcudf/pylibcudf/strings/CMakeLists.txt b/python/pylibcudf/pylibcudf/strings/CMakeLists.txt index 77f20b0b917..142bc124ca2 100644 --- a/python/pylibcudf/pylibcudf/strings/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/strings/CMakeLists.txt @@ -13,8 +13,21 @@ # ============================================================================= set(cython_sources - capitalize.pyx case.pyx char_types.pyx contains.pyx extract.pyx find.pyx findall.pyx - regex_flags.pyx regex_program.pyx repeat.pyx replace.pyx side_type.pyx slice.pyx strip.pyx + attributes.pyx + capitalize.pyx + case.pyx + char_types.pyx + contains.pyx + extract.pyx + find.pyx + findall.pyx + regex_flags.pyx + regex_program.pyx + repeat.pyx + replace.pyx + side_type.pyx + slice.pyx + strip.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/pylibcudf/pylibcudf/strings/__init__.pxd b/python/pylibcudf/pylibcudf/strings/__init__.pxd index 91d884b294b..d8afccc7336 100644 --- a/python/pylibcudf/pylibcudf/strings/__init__.pxd +++ b/python/pylibcudf/pylibcudf/strings/__init__.pxd @@ -1,6 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from . cimport ( + attributes, capitalize, case, char_types, @@ -16,3 +17,21 @@ from . cimport ( strip, ) from .side_type cimport side_type + +__all__ = [ + "attributes", + "capitalize", + "case", + "char_types", + "contains", + "convert", + "extract", + "find", + "findall", + "regex_flags", + "regex_program", + "replace", + "slice", + "strip", + "side_type", +] diff --git a/python/pylibcudf/pylibcudf/strings/__init__.py b/python/pylibcudf/pylibcudf/strings/__init__.py index b4856784390..22452812e42 100644 --- a/python/pylibcudf/pylibcudf/strings/__init__.py +++ b/python/pylibcudf/pylibcudf/strings/__init__.py @@ -1,6 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from . import ( + attributes, capitalize, case, char_types, @@ -17,3 +18,21 @@ strip, ) from .side_type import SideType + +__all__ = [ + "attributes", + "capitalize", + "case", + "char_types", + "contains", + "convert", + "extract", + "find", + "findall", + "regex_flags", + "regex_program", + "replace", + "slice", + "strip", + "SideType", +] diff --git a/python/pylibcudf/pylibcudf/strings/attributes.pxd b/python/pylibcudf/pylibcudf/strings/attributes.pxd new file mode 100644 index 00000000000..27398766924 --- /dev/null +++ b/python/pylibcudf/pylibcudf/strings/attributes.pxd @@ -0,0 +1,10 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from pylibcudf.column cimport Column + + +cpdef Column count_characters(Column source_strings) + +cpdef Column count_bytes(Column source_strings) + +cpdef Column code_points(Column source_strings) diff --git a/python/pylibcudf/pylibcudf/strings/attributes.pyx b/python/pylibcudf/pylibcudf/strings/attributes.pyx new file mode 100644 index 00000000000..36bee7bd1d9 --- /dev/null +++ b/python/pylibcudf/pylibcudf/strings/attributes.pyx @@ -0,0 +1,76 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.memory cimport unique_ptr +from libcpp.utility cimport move +from pylibcudf.column cimport Column +from pylibcudf.libcudf.column.column cimport column +from pylibcudf.libcudf.strings cimport attributes as cpp_attributes + + +cpdef Column count_characters(Column source_strings): + """ + Returns a column containing character lengths of each string + in the given column. + + Parameters + ---------- + source_strings : Column + Column of strings. + + Returns + ------- + Column + New column with lengths for each string + """ + cdef unique_ptr[column] c_result + + with nogil: + c_result = move(cpp_attributes.count_characters(source_strings.view())) + + return Column.from_libcudf(move(c_result)) + + +cpdef Column count_bytes(Column source_strings): + """ + Returns a column containing byte lengths of each string + in the given column. + + Parameters + ---------- + source_strings : Column + Column of strings. + + Returns + ------- + Column + New column with the number of bytes for each string + """ + cdef unique_ptr[column] c_result + + with nogil: + c_result = move(cpp_attributes.count_bytes(source_strings.view())) + + return Column.from_libcudf(move(c_result)) + + +cpdef Column code_points(Column source_strings): + """ + Creates a numeric column with code point values (integers) + for each character of each string. + + Parameters + ---------- + source_strings : Column + Column of strings. + + Returns + ------- + Column + New column with code point integer values for each character + """ + cdef unique_ptr[column] c_result + + with nogil: + c_result = move(cpp_attributes.code_points(source_strings.view())) + + return Column.from_libcudf(move(c_result)) diff --git a/python/pylibcudf/pylibcudf/tests/test_string_attributes.py b/python/pylibcudf/pylibcudf/tests/test_string_attributes.py new file mode 100644 index 00000000000..a1820def0b1 --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/test_string_attributes.py @@ -0,0 +1,32 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import pyarrow as pa +import pyarrow.compute as pc +import pylibcudf as plc +import pytest +from utils import assert_column_eq + + +@pytest.fixture() +def str_data(): + pa_data = pa.array(["A", None]) + return pa_data, plc.interop.from_arrow(pa_data) + + +def test_count_characters(str_data): + result = plc.strings.attributes.count_characters(str_data[1]) + expected = pc.utf8_length(str_data[0]) + assert_column_eq(expected, result) + + +def test_count_bytes(str_data): + result = plc.strings.attributes.count_characters(str_data[1]) + expected = pc.binary_length(str_data[0]) + assert_column_eq(expected, result) + + +def test_code_points(str_data): + result = plc.strings.attributes.code_points(str_data[1]) + exp_data = [ord(str_data[0].to_pylist()[0])] + expected = pa.chunked_array([exp_data], type=pa.int32()) + assert_column_eq(expected, result) From 12ee360048473ddd06019090c7d19c67d6959f7a Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Wed, 25 Sep 2024 20:13:45 -0400 Subject: [PATCH 2/4] [REVIEW] JSON host tree algorithms (#16545) Depends on #16836 This change adds a new host tree building algorithms for JSON reader and utf8 field name support. This constructs the device_column_tree using an adjacency list created from parent information. This adjacency list is pruned based on input schema, and also types are enforced as per schema. `mark_is_pruned` Tree is constructed from pruned adjacency list, (with mixed types handling). `construct_tree` utf8 field name support added: (spark requested) utf8 decoding of field names during hashing of field nodes so that utf8 encoded field names also match to same column. All unit tests passes, 1 unit test added where old algorithm fails. This code is kept under experimental flag. Authors: - Shruti Shivakumar (https://github.com/shrshi) - Karthikeyan (https://github.com/karthikeyann) Approvers: - Robert (Bobby) Evans (https://github.com/revans2) - Vukasin Milovanovic (https://github.com/vuule) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/16545 --- cpp/include/cudf/io/json.hpp | 36 + cpp/src/io/json/host_tree_algorithms.cu | 776 ++++++++++++++++-- cpp/src/io/json/json_column.cu | 46 +- cpp/src/io/json/json_tree.cu | 153 +++- cpp/src/io/json/nested_json.hpp | 29 +- cpp/tests/io/json/json_test.cpp | 53 ++ cpp/tests/io/json/json_tree.cpp | 1 + cpp/tests/io/json/json_tree_csr.cu | 1 + .../main/java/ai/rapids/cudf/JSONOptions.java | 15 + java/src/main/java/ai/rapids/cudf/Table.java | 9 + java/src/main/native/src/TableJni.cpp | 12 +- 11 files changed, 1011 insertions(+), 120 deletions(-) diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index ff25a5bacae..6798557e14e 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -105,6 +105,8 @@ class json_reader_options { char _delimiter = '\n'; // Prune columns on read, selected based on the _dtypes option bool _prune_columns = false; + // Experimental features: new column tree construction + bool _experimental = false; // Bytes to skip from the start size_t _byte_range_offset = 0; @@ -277,6 +279,15 @@ class json_reader_options { */ [[nodiscard]] bool is_enabled_prune_columns() const { return _prune_columns; } + /** + * @brief Whether to enable experimental features. + * + * When set to true, experimental features, such as the new column tree construction, + * utf-8 matching of field names will be enabled. + * @return true if experimental features are enabled + */ + [[nodiscard]] bool is_enabled_experimental() const { return _experimental; } + /** * @brief Whether to parse dates as DD/MM versus MM/DD. * @@ -453,6 +464,16 @@ class json_reader_options { */ void enable_prune_columns(bool val) { _prune_columns = val; } + /** + * @brief Set whether to enable experimental features. + * + * When set to true, experimental features, such as the new column tree construction, + * utf-8 matching of field names will be enabled. + * + * @param val Boolean value to enable/disable experimental features + */ + void enable_experimental(bool val) { _experimental = val; } + /** * @brief Set whether to parse dates as DD/MM versus MM/DD. * @@ -695,6 +716,21 @@ class json_reader_options_builder { return *this; } + /** + * @brief Set whether to enable experimental features. + * + * When set to true, experimental features, such as the new column tree construction, + * utf-8 matching of field names will be enabled. + * + * @param val Boolean value to enable/disable experimental features + * @return this for chaining + */ + json_reader_options_builder& experimental(bool val) + { + options._experimental = val; + return *this; + } + /** * @brief Set whether to parse dates as DD/MM versus MM/DD. * diff --git a/cpp/src/io/json/host_tree_algorithms.cu b/cpp/src/io/json/host_tree_algorithms.cu index 70d61132b42..5855f1b5a5f 100644 --- a/cpp/src/io/json/host_tree_algorithms.cu +++ b/cpp/src/io/json/host_tree_algorithms.cu @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include #include +#include namespace cudf::io::json::detail { @@ -58,16 +60,15 @@ namespace cudf::io::json::detail { */ rmm::device_uvector get_values_column_indices(TreeDepthT const row_array_children_level, tree_meta_t const& d_tree, - device_span col_ids, + device_span col_ids, size_type const num_columns, rmm::cuda_stream_view stream) { - CUDF_FUNC_RANGE(); auto [level2_nodes, level2_indices] = get_array_children_indices( row_array_children_level, d_tree.node_levels, d_tree.parent_node_ids, stream); auto col_id_location = thrust::make_permutation_iterator(col_ids.begin(), level2_nodes.begin()); rmm::device_uvector values_column_indices(num_columns, stream); - thrust::scatter(rmm::exec_policy(stream), + thrust::scatter(rmm::exec_policy_nosync(stream), level2_indices.begin(), level2_indices.end(), col_id_location, @@ -90,12 +91,11 @@ std::vector copy_strings_to_host_sync( device_span node_range_end, rmm::cuda_stream_view stream) { - CUDF_FUNC_RANGE(); auto const num_strings = node_range_begin.size(); rmm::device_uvector string_offsets(num_strings, stream); rmm::device_uvector string_lengths(num_strings, stream); auto d_offset_pairs = thrust::make_zip_iterator(node_range_begin.begin(), node_range_end.begin()); - thrust::transform(rmm::exec_policy(stream), + thrust::transform(rmm::exec_policy_nosync(stream), d_offset_pairs, d_offset_pairs + num_strings, thrust::make_zip_iterator(string_offsets.begin(), string_lengths.begin()), @@ -161,18 +161,18 @@ std::vector copy_strings_to_host_sync( rmm::device_uvector is_all_nulls_each_column(device_span input, tree_meta_t const& d_column_tree, tree_meta_t const& tree, - device_span col_ids, + device_span col_ids, cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream) { auto const num_nodes = col_ids.size(); auto const num_cols = d_column_tree.node_categories.size(); rmm::device_uvector is_all_nulls(num_cols, stream); - thrust::fill(rmm::exec_policy(stream), is_all_nulls.begin(), is_all_nulls.end(), true); + thrust::fill(rmm::exec_policy_nosync(stream), is_all_nulls.begin(), is_all_nulls.end(), true); auto parse_opt = parsing_options(options, stream); thrust::for_each_n( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::counting_iterator(0), num_nodes, [options = parse_opt.view(), @@ -193,7 +193,7 @@ rmm::device_uvector is_all_nulls_each_column(device_span return is_all_nulls; } -NodeIndexT get_row_array_parent_col_id(device_span col_ids, +NodeIndexT get_row_array_parent_col_id(device_span col_ids, bool is_enabled_lines, rmm::cuda_stream_view stream) { @@ -221,33 +221,34 @@ struct json_column_data { bitmask_type* validity; }; -std::pair, - std::unordered_map>> -build_tree(device_json_column& root, - std::vector const& is_str_column_all_nulls, - tree_meta_t& d_column_tree, - device_span d_unique_col_ids, - device_span d_max_row_offsets, - std::vector const& column_names, - NodeIndexT row_array_parent_col_id, - bool is_array_of_arrays, - cudf::io::json_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); -void scatter_offsets( - tree_meta_t& tree, - device_span col_ids, - device_span row_offsets, - device_span node_ids, - device_span sorted_col_ids, // Reuse this for parent_col_ids +using hashmap_of_device_columns = + std::unordered_map>; + +std::pair, hashmap_of_device_columns> build_tree( + device_json_column& root, + host_span is_str_column_all_nulls, tree_meta_t& d_column_tree, - host_span ignore_vals, - std::unordered_map>& columns, - rmm::cuda_stream_view stream); + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); +void scatter_offsets(tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, + device_span node_ids, + device_span sorted_col_ids, // Reuse this for parent_col_ids + tree_meta_t const& d_column_tree, + host_span ignore_vals, + hashmap_of_device_columns const& columns, + rmm::cuda_stream_view stream); /** * @brief Constructs `d_json_column` from node tree representation - * Newly constructed columns are insert into `root`'s children. + * Newly constructed columns are inserted into `root`'s children. * `root` must be a list type. * * @param input Input JSON string device data @@ -265,28 +266,28 @@ void scatter_offsets( * of child_offets and validity members of `d_json_column` */ void make_device_json_column(device_span input, - tree_meta_t& tree, - device_span col_ids, - device_span row_offsets, + tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, device_json_column& root, bool is_array_of_arrays, cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - CUDF_FUNC_RANGE(); - bool const is_enabled_lines = options.is_enabled_lines(); bool const is_enabled_mixed_types_as_string = options.is_enabled_mixed_types_as_string(); - auto const num_nodes = col_ids.size(); - rmm::device_uvector sorted_col_ids(col_ids.size(), stream); // make a copy - thrust::copy(rmm::exec_policy(stream), col_ids.begin(), col_ids.end(), sorted_col_ids.begin()); + // make a copy + auto sorted_col_ids = cudf::detail::make_device_uvector_async( + col_ids, stream, cudf::get_current_device_resource_ref()); // sort by {col_id} on {node_ids} stable rmm::device_uvector node_ids(col_ids.size(), stream); - thrust::sequence(rmm::exec_policy(stream), node_ids.begin(), node_ids.end()); - thrust::stable_sort_by_key( - rmm::exec_policy(stream), sorted_col_ids.begin(), sorted_col_ids.end(), node_ids.begin()); + thrust::sequence(rmm::exec_policy_nosync(stream), node_ids.begin(), node_ids.end()); + thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream), + sorted_col_ids.begin(), + sorted_col_ids.end(), + node_ids.begin()); NodeIndexT const row_array_parent_col_id = get_row_array_parent_col_id(col_ids, is_enabled_lines, stream); @@ -316,7 +317,7 @@ void make_device_json_column(device_span input, cudf::detail::make_host_vector_sync(values_column_indices, stream); std::transform(unique_col_ids.begin(), unique_col_ids.end(), - column_names.begin(), + column_names.cbegin(), column_names.begin(), [&h_values_column_indices, &column_parent_ids, row_array_parent_col_id]( auto col_id, auto name) mutable { @@ -333,17 +334,17 @@ void make_device_json_column(device_span input, } return std::vector(); }(); - auto [ignore_vals, columns] = build_tree(root, - is_str_column_all_nulls, - d_column_tree, - d_unique_col_ids, - d_max_row_offsets, - column_names, - row_array_parent_col_id, - is_array_of_arrays, - options, - stream, - mr); + auto const [ignore_vals, columns] = build_tree(root, + is_str_column_all_nulls, + d_column_tree, + d_unique_col_ids, + d_max_row_offsets, + column_names, + row_array_parent_col_id, + is_array_of_arrays, + options, + stream, + mr); scatter_offsets(tree, col_ids, @@ -356,19 +357,18 @@ void make_device_json_column(device_span input, stream); } -std::pair, - std::unordered_map>> -build_tree(device_json_column& root, - std::vector const& is_str_column_all_nulls, - tree_meta_t& d_column_tree, - device_span d_unique_col_ids, - device_span d_max_row_offsets, - std::vector const& column_names, - NodeIndexT row_array_parent_col_id, - bool is_array_of_arrays, - cudf::io::json_reader_options const& options, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +std::pair, hashmap_of_device_columns> build_tree( + device_json_column& root, + host_span is_str_column_all_nulls, + tree_meta_t& d_column_tree, + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { bool const is_enabled_mixed_types_as_string = options.is_enabled_mixed_types_as_string(); auto unique_col_ids = cudf::detail::make_host_vector_async(d_unique_col_ids, stream); @@ -380,6 +380,7 @@ build_tree(device_json_column& root, cudf::detail::make_host_vector_async(d_column_tree.node_range_begin, stream); auto const max_row_offsets = cudf::detail::make_host_vector_async(d_max_row_offsets, stream); auto num_columns = d_unique_col_ids.size(); + stream.synchronize(); auto to_json_col_type = [](auto category) { switch (category) { @@ -439,11 +440,12 @@ build_tree(device_json_column& root, }); // use hash map because we may skip field name's col_ids - std::unordered_map> columns; + hashmap_of_device_columns columns; // map{parent_col_id, child_col_name}> = child_col_id, used for null value column tracking std::map, NodeIndexT> mapped_columns; // find column_ids which are values, but should be ignored in validity - auto ignore_vals = cudf::detail::make_host_vector(num_columns, stream); + auto ignore_vals = cudf::detail::make_host_vector(num_columns, stream); + std::fill(ignore_vals.begin(), ignore_vals.end(), false); std::vector is_mixed_type_column(num_columns, 0); std::vector is_pruned(num_columns, 0); // for columns that are not mixed type but have been forced as string @@ -452,7 +454,7 @@ build_tree(device_json_column& root, std::function remove_child_columns = [&](NodeIndexT this_col_id, device_json_column& col) { - for (auto col_name : col.column_order) { + for (auto const& col_name : col.column_order) { auto child_id = mapped_columns[{this_col_id, col_name}]; is_mixed_type_column[child_id] = 1; remove_child_columns(child_id, col.child_columns.at(col_name)); @@ -523,7 +525,7 @@ build_tree(device_json_column& root, if (parent_col_id != parent_node_sentinel && (is_mixed_type_column[parent_col_id] || is_pruned[this_col_id]) || forced_as_string_column[parent_col_id]) { - ignore_vals[this_col_id] = 1; + ignore_vals[this_col_id] = true; if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; } if (forced_as_string_column[parent_col_id]) { forced_as_string_column[this_col_id] = true; } continue; @@ -569,12 +571,12 @@ build_tree(device_json_column& root, } if (column_categories[this_col_id] == NC_VAL || column_categories[this_col_id] == NC_STR) { - ignore_vals[this_col_id] = 1; + ignore_vals[this_col_id] = true; continue; } if (column_categories[old_col_id] == NC_VAL || column_categories[old_col_id] == NC_STR) { // remap - ignore_vals[old_col_id] = 1; + ignore_vals[old_col_id] = true; mapped_columns.erase({parent_col_id, name}); columns.erase(old_col_id); parent_col.child_columns.erase(name); @@ -624,7 +626,7 @@ build_tree(device_json_column& root, auto parent_col_id = column_parent_ids[this_col_id]; if (parent_col_id != parent_node_sentinel and is_mixed_type_column[parent_col_id] == 1) { is_mixed_type_column[this_col_id] = 1; - ignore_vals[this_col_id] = 1; + ignore_vals[this_col_id] = true; columns.erase(this_col_id); } // Convert only mixed type columns as string (so to copy), but not its children @@ -644,7 +646,7 @@ build_tree(device_json_column& root, auto parent_col_id = column_parent_ids[this_col_id]; if (parent_col_id != parent_node_sentinel and forced_as_string_column[parent_col_id]) { forced_as_string_column[this_col_id] = true; - ignore_vals[this_col_id] = 1; + ignore_vals[this_col_id] = true; } // Convert only mixed type columns as string (so to copy), but not its children if (parent_col_id != parent_node_sentinel and not forced_as_string_column[parent_col_id] and @@ -664,16 +666,15 @@ build_tree(device_json_column& root, return {ignore_vals, columns}; } -void scatter_offsets( - tree_meta_t& tree, - device_span col_ids, - device_span row_offsets, - device_span node_ids, - device_span sorted_col_ids, // Reuse this for parent_col_ids - tree_meta_t& d_column_tree, - host_span ignore_vals, - std::unordered_map>& columns, - rmm::cuda_stream_view stream) +void scatter_offsets(tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, + device_span node_ids, + device_span sorted_col_ids, // Reuse this for parent_col_ids + tree_meta_t const& d_column_tree, + host_span ignore_vals, + hashmap_of_device_columns const& columns, + rmm::cuda_stream_view stream) { auto const num_nodes = col_ids.size(); auto const num_columns = d_column_tree.node_categories.size(); @@ -695,7 +696,7 @@ void scatter_offsets( // 3. scatter string offsets to respective columns, set validity bits thrust::for_each_n( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::counting_iterator(0), num_nodes, [column_categories = d_column_tree.node_categories.begin(), @@ -739,7 +740,7 @@ void scatter_offsets( : col_ids[parent_node_ids[node_id]]; })); auto const list_children_end = thrust::copy_if( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::make_zip_iterator(thrust::make_counting_iterator(0), parent_col_id), thrust::make_zip_iterator(thrust::make_counting_iterator(0), parent_col_id) + num_nodes, @@ -757,12 +758,12 @@ void scatter_offsets( auto const num_list_children = list_children_end - thrust::make_zip_iterator(node_ids.begin(), parent_col_ids.begin()); - thrust::stable_sort_by_key(rmm::exec_policy(stream), + thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream), parent_col_ids.begin(), parent_col_ids.begin() + num_list_children, node_ids.begin()); thrust::for_each_n( - rmm::exec_policy(stream), + rmm::exec_policy_nosync(stream), thrust::make_counting_iterator(0), num_list_children, [node_ids = node_ids.begin(), @@ -805,4 +806,599 @@ void scatter_offsets( stream.synchronize(); } +namespace experimental { + +std::map unified_schema(cudf::io::json_reader_options const& options) +{ + return std::visit( + cudf::detail::visitor_overload{ + [](std::vector const& user_dtypes) { + std::map dnew; + std::transform(thrust::counting_iterator(0), + thrust::counting_iterator(user_dtypes.size()), + std::inserter(dnew, dnew.end()), + [&user_dtypes](auto i) { + return std::pair(std::to_string(i), schema_element{user_dtypes[i]}); + }); + return dnew; + }, + [](std::map const& user_dtypes) { + std::map dnew; + std::transform(user_dtypes.begin(), + user_dtypes.end(), + std::inserter(dnew, dnew.end()), + [](auto key_dtype) { + return std::pair(key_dtype.first, schema_element{key_dtype.second}); + }); + return dnew; + }, + [](std::map const& user_dtypes) { return user_dtypes; }}, + options.get_dtypes()); +} + +std::pair, hashmap_of_device_columns> build_tree( + device_json_column& root, + host_span is_str_column_all_nulls, + tree_meta_t& d_column_tree, + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +/** + * @brief Constructs `d_json_column` from node tree representation + * Newly constructed columns are inserted into `root`'s children. + * `root` must be a list type. + * + * @param input Input JSON string device data + * @param tree Node tree representation of the JSON string + * @param col_ids Column ids of the nodes in the tree + * @param row_offsets Row offsets of the nodes in the tree + * @param root Root node of the `d_json_column` tree + * @param is_array_of_arrays Whether the tree is an array of arrays + * @param options Parsing options specifying the parsing behaviour + * options affecting behaviour are + * is_enabled_lines: Whether the input is a line-delimited JSON + * is_enabled_mixed_types_as_string: Whether to enable reading mixed types as string + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the device memory + * of child_offets and validity members of `d_json_column` + */ +void make_device_json_column(device_span input, + tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, + device_json_column& root, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + bool const is_enabled_lines = options.is_enabled_lines(); + bool const is_enabled_mixed_types_as_string = options.is_enabled_mixed_types_as_string(); + // make a copy + auto sorted_col_ids = cudf::detail::make_device_uvector_async( + col_ids, stream, cudf::get_current_device_resource_ref()); + + // sort by {col_id} on {node_ids} stable + rmm::device_uvector node_ids(col_ids.size(), stream); + thrust::sequence(rmm::exec_policy_nosync(stream), node_ids.begin(), node_ids.end()); + thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream), + sorted_col_ids.begin(), + sorted_col_ids.end(), + node_ids.begin()); + + NodeIndexT const row_array_parent_col_id = + get_row_array_parent_col_id(col_ids, is_enabled_lines, stream); + + // 1. gather column information. + auto [d_column_tree, d_unique_col_ids, d_max_row_offsets] = + reduce_to_column_tree(tree, + col_ids, + sorted_col_ids, + node_ids, + row_offsets, + is_array_of_arrays, + row_array_parent_col_id, + stream); + + auto num_columns = d_unique_col_ids.size(); + std::vector column_names = copy_strings_to_host_sync( + input, d_column_tree.node_range_begin, d_column_tree.node_range_end, stream); + // array of arrays column names + if (is_array_of_arrays) { + auto const unique_col_ids = cudf::detail::make_host_vector_async(d_unique_col_ids, stream); + auto const column_parent_ids = + cudf::detail::make_host_vector_async(d_column_tree.parent_node_ids, stream); + TreeDepthT const row_array_children_level = is_enabled_lines ? 1 : 2; + auto values_column_indices = + get_values_column_indices(row_array_children_level, tree, col_ids, num_columns, stream); + auto h_values_column_indices = + cudf::detail::make_host_vector_sync(values_column_indices, stream); + std::transform(unique_col_ids.begin(), + unique_col_ids.end(), + column_names.cbegin(), + column_names.begin(), + [&h_values_column_indices, &column_parent_ids, row_array_parent_col_id]( + auto col_id, auto name) mutable { + return column_parent_ids[col_id] == row_array_parent_col_id + ? std::to_string(h_values_column_indices[col_id]) + : name; + }); + } + + auto const is_str_column_all_nulls = [&, &column_tree = d_column_tree]() { + if (is_enabled_mixed_types_as_string) { + return cudf::detail::make_std_vector_sync( + is_all_nulls_each_column(input, column_tree, tree, col_ids, options, stream), stream); + } + return std::vector(); + }(); + auto const [ignore_vals, columns] = build_tree(root, + is_str_column_all_nulls, + d_column_tree, + d_unique_col_ids, + d_max_row_offsets, + column_names, + row_array_parent_col_id, + is_array_of_arrays, + options, + stream, + mr); + if (ignore_vals.empty()) return; + scatter_offsets(tree, + col_ids, + row_offsets, + node_ids, + sorted_col_ids, + d_column_tree, + ignore_vals, + columns, + stream); +} + +std::pair, hashmap_of_device_columns> build_tree( + device_json_column& root, + host_span is_str_column_all_nulls, + tree_meta_t& d_column_tree, + device_span d_unique_col_ids, + device_span d_max_row_offsets, + std::vector const& column_names, + NodeIndexT row_array_parent_col_id, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + bool const is_enabled_lines = options.is_enabled_lines(); + bool const is_enabled_mixed_types_as_string = options.is_enabled_mixed_types_as_string(); + auto unique_col_ids = cudf::detail::make_host_vector_async(d_unique_col_ids, stream); + auto column_categories = + cudf::detail::make_host_vector_async(d_column_tree.node_categories, stream); + auto const column_parent_ids = + cudf::detail::make_host_vector_async(d_column_tree.parent_node_ids, stream); + auto column_range_beg = + cudf::detail::make_host_vector_async(d_column_tree.node_range_begin, stream); + auto const max_row_offsets = cudf::detail::make_host_vector_async(d_max_row_offsets, stream); + auto num_columns = d_unique_col_ids.size(); + stream.synchronize(); + + auto to_json_col_type = [](auto category) { + switch (category) { + case NC_STRUCT: return json_col_t::StructColumn; + case NC_LIST: return json_col_t::ListColumn; + case NC_STR: [[fallthrough]]; + case NC_VAL: return json_col_t::StringColumn; + default: return json_col_t::Unknown; + } + }; + + auto initialize_json_columns = [&](auto i, auto& col_ref, auto column_category) { + auto& col = col_ref.get(); + if (col.type != json_col_t::Unknown) { return; } + if (column_category == NC_ERR || column_category == NC_FN) { + return; + } else if (column_category == NC_VAL || column_category == NC_STR) { + col.string_offsets.resize(max_row_offsets[i] + 1, stream); + col.string_lengths.resize(max_row_offsets[i] + 1, stream); + thrust::fill( + rmm::exec_policy_nosync(stream), + thrust::make_zip_iterator(col.string_offsets.begin(), col.string_lengths.begin()), + thrust::make_zip_iterator(col.string_offsets.end(), col.string_lengths.end()), + thrust::make_tuple(0, 0)); + } else if (column_category == NC_LIST) { + col.child_offsets.resize(max_row_offsets[i] + 2, stream); + thrust::uninitialized_fill( + rmm::exec_policy_nosync(stream), col.child_offsets.begin(), col.child_offsets.end(), 0); + } + col.num_rows = max_row_offsets[i] + 1; + col.validity = + cudf::detail::create_null_mask(col.num_rows, cudf::mask_state::ALL_NULL, stream, mr); + col.type = to_json_col_type(column_category); + }; + + // 2. generate nested columns tree and its device_memory + // reorder unique_col_ids w.r.t. column_range_begin for order of column to be in field order. + auto h_range_col_id_it = + thrust::make_zip_iterator(column_range_beg.begin(), unique_col_ids.begin()); + std::sort(h_range_col_id_it, h_range_col_id_it + num_columns, [](auto const& a, auto const& b) { + return thrust::get<0>(a) < thrust::get<0>(b); + }); + // adjacency list construction + std::map> adj; + for (auto const this_col_id : unique_col_ids) { + auto parent_col_id = column_parent_ids[this_col_id]; + adj[parent_col_id].push_back(this_col_id); + } + + // Pruning + auto is_pruned = cudf::detail::make_host_vector(num_columns, stream); + std::fill_n(is_pruned.begin(), num_columns, options.is_enabled_prune_columns()); + + // prune all children of a column, but not self. + auto ignore_all_children = [&](auto parent_col_id) { + std::deque offspring; + if (adj.count(parent_col_id)) { + for (auto const& child : adj[parent_col_id]) { + offspring.push_back(child); + } + } + while (!offspring.empty()) { + auto this_id = offspring.front(); + offspring.pop_front(); + is_pruned[this_id] = true; + if (adj.count(this_id)) { + for (auto const& child : adj[this_id]) { + offspring.push_back(child); + } + } + } + }; + + // Pruning: iterate through schema and mark only those columns and enforce type. + // NoPruning: iterate through schema and enforce type. + + if (adj[parent_node_sentinel].empty()) + return {cudf::detail::make_host_vector(0, stream), {}}; // for empty file + CUDF_EXPECTS(adj[parent_node_sentinel].size() == 1, "Should be 1"); + auto expected_types = cudf::detail::make_host_vector(num_columns, stream); + std::fill_n(expected_types.begin(), num_columns, NUM_NODE_CLASSES); + + auto lookup_names = [&column_names](auto child_ids, auto name) { + for (auto const& child_id : child_ids) { + if (column_names[child_id] == name) return child_id; + } + return -1; + }; + // recursive lambda on schema to mark columns as pruned. + std::function mark_is_pruned; + mark_is_pruned = [&is_pruned, + &mark_is_pruned, + &adj, + &lookup_names, + &column_categories, + &expected_types, + &ignore_all_children](NodeIndexT root, schema_element const& schema) -> void { + if (root == -1) return; + bool pass = + (schema.type == data_type{type_id::STRUCT} and column_categories[root] == NC_STRUCT) or + (schema.type == data_type{type_id::LIST} and column_categories[root] == NC_LIST) or + (schema.type != data_type{type_id::STRUCT} and schema.type != data_type{type_id::LIST} and + column_categories[root] != NC_FN); + if (!pass) { + // ignore all children of this column and prune this column. + is_pruned[root] = true; + ignore_all_children(root); + return; + } + is_pruned[root] = false; + auto expected_type = [](auto type, auto cat) { + if (type == data_type{type_id::STRUCT} and cat == NC_STRUCT) return NC_STRUCT; + if (type == data_type{type_id::LIST} and cat == NC_LIST) return NC_LIST; + if (type != data_type{type_id::STRUCT} and type != data_type{type_id::LIST}) return NC_STR; + return NC_ERR; + }(schema.type, column_categories[root]); + expected_types[root] = expected_type; // forced type. + // ignore children of nested columns, but not self. + if (expected_type == NC_STR and + (column_categories[root] == NC_STRUCT or column_categories[root] == NC_LIST)) + ignore_all_children(root); + if (not(schema.type == data_type{type_id::STRUCT} or schema.type == data_type{type_id::LIST})) + return; // no children to mark for non-nested. + auto child_ids = adj.count(root) ? adj[root] : std::vector{}; + if (schema.type == data_type{type_id::STRUCT}) { + for (auto const& key_pair : schema.child_types) { + auto col_id = lookup_names(child_ids, key_pair.first); + if (col_id == -1) continue; + is_pruned[col_id] = false; + for (auto const& child_id : adj[col_id]) // children of field (>1 if mixed) + mark_is_pruned(child_id, key_pair.second); + } + } else if (schema.type == data_type{type_id::LIST}) { + // partial solution for list children to have any name. + auto this_list_child_name = + schema.child_types.size() == 1 ? schema.child_types.begin()->first : list_child_name; + if (schema.child_types.count(this_list_child_name) == 0) return; + auto list_child = schema.child_types.at(this_list_child_name); + for (auto const& child_id : child_ids) + mark_is_pruned(child_id, list_child); + } + }; + if (is_array_of_arrays) { + if (adj[adj[parent_node_sentinel][0]].empty()) + return {cudf::detail::make_host_vector(0, stream), {}}; + auto root_list_col_id = + is_enabled_lines ? adj[parent_node_sentinel][0] : adj[adj[parent_node_sentinel][0]][0]; + // mark root and row array col_id as not pruned. + if (!is_enabled_lines) { + auto top_level_list_id = adj[parent_node_sentinel][0]; + is_pruned[top_level_list_id] = false; + } + is_pruned[root_list_col_id] = false; + std::visit(cudf::detail::visitor_overload{ + [&root_list_col_id, &adj, &mark_is_pruned, &column_names]( + std::vector const& user_dtypes) -> void { + for (size_t i = 0; i < adj[root_list_col_id].size() && i < user_dtypes.size(); + i++) { + NodeIndexT const first_child_id = adj[root_list_col_id][i]; + auto name = column_names[first_child_id]; + auto value_id = std::stol(name); + if (value_id >= 0 and value_id < static_cast(user_dtypes.size())) + mark_is_pruned(first_child_id, schema_element{user_dtypes[value_id]}); + // Note: mixed type - forced type, will work here. + } + }, + [&root_list_col_id, &adj, &mark_is_pruned, &column_names]( + std::map const& user_dtypes) -> void { + for (size_t i = 0; i < adj[root_list_col_id].size(); i++) { + auto const first_child_id = adj[root_list_col_id][i]; + auto name = column_names[first_child_id]; + if (user_dtypes.count(name)) + mark_is_pruned(first_child_id, schema_element{user_dtypes.at(name)}); + } + }, + [&root_list_col_id, &adj, &mark_is_pruned, &column_names]( + std::map const& user_dtypes) -> void { + for (size_t i = 0; i < adj[root_list_col_id].size(); i++) { + auto const first_child_id = adj[root_list_col_id][i]; + auto name = column_names[first_child_id]; + if (user_dtypes.count(name)) + mark_is_pruned(first_child_id, user_dtypes.at(name)); + } + }}, + options.get_dtypes()); + } else { + auto root_struct_col_id = + is_enabled_lines + ? adj[parent_node_sentinel][0] + : (adj[adj[parent_node_sentinel][0]].empty() ? -1 : adj[adj[parent_node_sentinel][0]][0]); + // mark root and row struct col_id as not pruned. + if (!is_enabled_lines) { + auto top_level_list_id = adj[parent_node_sentinel][0]; + is_pruned[top_level_list_id] = false; + } + is_pruned[root_struct_col_id] = false; + schema_element u_schema{data_type{type_id::STRUCT}}; + u_schema.child_types = unified_schema(options); + std::visit( + cudf::detail::visitor_overload{ + [&is_pruned, &root_struct_col_id, &adj, &mark_is_pruned]( + std::vector const& user_dtypes) -> void { + for (size_t i = 0; i < adj[root_struct_col_id].size() && i < user_dtypes.size(); i++) { + NodeIndexT const first_field_id = adj[root_struct_col_id][i]; + is_pruned[first_field_id] = false; + for (auto const& child_id : adj[first_field_id]) // children of field (>1 if mixed) + mark_is_pruned(child_id, schema_element{user_dtypes[i]}); + } + }, + [&root_struct_col_id, &adj, &mark_is_pruned, &u_schema]( + std::map const& user_dtypes) -> void { + mark_is_pruned(root_struct_col_id, u_schema); + }, + [&root_struct_col_id, &adj, &mark_is_pruned, &u_schema]( + std::map const& user_dtypes) -> void { + mark_is_pruned(root_struct_col_id, u_schema); + }}, + options.get_dtypes()); + } + // Useful for array of arrays + auto named_level = + is_enabled_lines + ? adj[parent_node_sentinel][0] + : (adj[adj[parent_node_sentinel][0]].empty() ? -1 : adj[adj[parent_node_sentinel][0]][0]); + + auto handle_mixed_types = [&column_categories, + &is_str_column_all_nulls, + &is_pruned, + &expected_types, + &is_enabled_mixed_types_as_string, + &ignore_all_children](std::vector& child_ids) { + // do these on unpruned columns only. + // when mixed types is disabled, ignore string sibling of nested column. + // when mixed types is disabled, and both list and struct columns are siblings, error out. + // when mixed types is enabled, force string type on all columns + + // Remove pruned children (forced type will not clash here because other types are already + // pruned) + child_ids.erase( + std::remove_if(child_ids.begin(), + child_ids.end(), + [&is_pruned](NodeIndexT child_id) { return is_pruned[child_id]; }), + child_ids.end()); + // find string id, struct id, list id. + NodeIndexT str_col_id{-1}, struct_col_id{-1}, list_col_id{-1}; + for (auto const& child_id : child_ids) { + if (column_categories[child_id] == NC_VAL || column_categories[child_id] == NC_STR) + str_col_id = child_id; + else if (column_categories[child_id] == NC_STRUCT) + struct_col_id = child_id; + else if (column_categories[child_id] == NC_LIST) + list_col_id = child_id; + } + // conditions for handling mixed types. + if (is_enabled_mixed_types_as_string) { + if (struct_col_id != -1 and list_col_id != -1) { + expected_types[struct_col_id] = NC_STR; + expected_types[list_col_id] = NC_STR; + // ignore children of nested columns. + ignore_all_children(struct_col_id); + ignore_all_children(list_col_id); + } + if ((struct_col_id != -1 or list_col_id != -1) and str_col_id != -1) { + if (is_str_column_all_nulls[str_col_id]) + is_pruned[str_col_id] = true; + else { + // ignore children of nested columns. + if (struct_col_id != -1) { + expected_types[struct_col_id] = NC_STR; + ignore_all_children(struct_col_id); + } + if (list_col_id != -1) { + expected_types[list_col_id] = NC_STR; + ignore_all_children(list_col_id); + } + } + } + } else { + // if both are present, error out. + CUDF_EXPECTS(struct_col_id == -1 or list_col_id == -1, + "A mix of lists and structs within the same column is not supported"); + // either one only: so ignore str column. + if ((struct_col_id != -1 or list_col_id != -1) and str_col_id != -1) { + is_pruned[str_col_id] = true; + } + } + }; + + using dev_ref = std::reference_wrapper; + std::unordered_map columns; + columns.try_emplace(parent_node_sentinel, std::ref(root)); + // convert adjaceny list to tree. + dev_ref parent_ref = std::ref(root); + // creates children column + std::function construct_tree; + construct_tree = [&](NodeIndexT root, dev_ref ref) -> void { + if (is_pruned[root]) return; + auto expected_category = + expected_types[root] == NUM_NODE_CLASSES ? column_categories[root] : expected_types[root]; + initialize_json_columns(root, ref, expected_category); + auto child_ids = adj.count(root) ? adj[root] : std::vector{}; + if (expected_category == NC_STRUCT) { + // find field column ids, and its children and create columns. + for (auto const& field_id : child_ids) { + auto name = column_names[field_id]; + if (is_pruned[field_id]) continue; + auto inserted = + ref.get().child_columns.try_emplace(name, device_json_column(stream, mr)).second; + ref.get().column_order.emplace_back(name); + CUDF_EXPECTS(inserted, + "struct child column insertion failed, duplicate column name in the parent"); + auto this_ref = std::ref(ref.get().child_columns.at(name)); + // Mixed type handling + auto& value_col_ids = adj[field_id]; + handle_mixed_types(value_col_ids); + if (value_col_ids.empty()) { + // If no column is present, remove the uninitialized column. + ref.get().child_columns.erase(name); + ref.get().column_order.pop_back(); + continue; + } + for (auto const& child_id : value_col_ids) // children of field (>1 if mixed) + { + if (is_pruned[child_id]) continue; + columns.try_emplace(child_id, this_ref); + construct_tree(child_id, this_ref); + } + } + } else if (expected_category == NC_LIST) { + // array of arrays interpreted as array of structs. + if (is_array_of_arrays and root == named_level) { + // create column names + std::map> array_values; + for (auto const& child_id : child_ids) { + if (is_pruned[child_id]) continue; + auto name = column_names[child_id]; + array_values[std::stoi(name)].push_back(child_id); + } + // + for (auto const& value_id_pair : array_values) { + auto [value_id, value_col_ids] = value_id_pair; + auto name = std::to_string(value_id); + auto inserted = + ref.get().child_columns.try_emplace(name, device_json_column(stream, mr)).second; + ref.get().column_order.emplace_back(name); + CUDF_EXPECTS(inserted, + "list child column insertion failed, duplicate column name in the parent"); + auto this_ref = std::ref(ref.get().child_columns.at(name)); + handle_mixed_types(value_col_ids); + if (value_col_ids.empty()) { + // If no column is present, remove the uninitialized column. + ref.get().child_columns.erase(name); + ref.get().column_order.pop_back(); + continue; + } + for (auto const& child_id : value_col_ids) // children of field (>1 if mixed) + { + if (is_pruned[child_id]) continue; + columns.try_emplace(child_id, this_ref); + construct_tree(child_id, this_ref); + } + } + } else { + if (child_ids.empty()) return; + auto inserted = + ref.get() + .child_columns.try_emplace(list_child_name, device_json_column(stream, mr)) + .second; + CUDF_EXPECTS(inserted, + "list child column insertion failed, duplicate column name in the parent"); + ref.get().column_order.emplace_back(list_child_name); + auto this_ref = std::ref(ref.get().child_columns.at(list_child_name)); + // Mixed type handling + handle_mixed_types(child_ids); + if (child_ids.empty()) { + // If no column is present, remove the uninitialized column. + ref.get().child_columns.erase(list_child_name); + } + for (auto const& child_id : child_ids) { + if (is_pruned[child_id]) continue; + columns.try_emplace(child_id, this_ref); + construct_tree(child_id, this_ref); + } + } + } + }; + auto inserted = parent_ref.get() + .child_columns.try_emplace(list_child_name, device_json_column(stream, mr)) + .second; + CUDF_EXPECTS(inserted, "child column insertion failed, duplicate column name in the parent"); + parent_ref = std::ref(parent_ref.get().child_columns.at(list_child_name)); + columns.try_emplace(adj[parent_node_sentinel][0], parent_ref); + construct_tree(adj[parent_node_sentinel][0], parent_ref); + + // Forced string type due to input schema and mixed type as string. + for (size_t i = 0; i < expected_types.size(); i++) { + if (expected_types[i] == NC_STR) { + if (columns.count(i)) { columns.at(i).get().forced_as_string_column = true; } + } + } + std::transform(expected_types.cbegin(), + expected_types.cend(), + column_categories.cbegin(), + expected_types.begin(), + [](auto exp, auto cat) { return exp == NUM_NODE_CLASSES ? cat : exp; }); + cudaMemcpyAsync(d_column_tree.node_categories.begin(), + expected_types.data(), + expected_types.size() * sizeof(column_categories[0]), + cudaMemcpyDefault, + stream.value()); + + return {is_pruned, columns}; +} +} // namespace experimental + } // namespace cudf::io::json::detail diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index dfd9285f682..912e93d52ae 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -104,7 +104,7 @@ void print_tree(host_span input, * max row offsets of columns */ std::tuple, rmm::device_uvector> -reduce_to_column_tree(tree_meta_t& tree, +reduce_to_column_tree(tree_meta_t const& tree, device_span original_col_ids, device_span sorted_col_ids, device_span ordered_node_ids, @@ -317,7 +317,7 @@ std::pair, std::vector> device_json_co // Note: json_col modified here, moves this memory }; - auto get_child_schema = [schema](auto child_name) -> std::optional { + auto get_child_schema = [&schema](auto child_name) -> std::optional { if (schema.has_value()) { auto const result = schema.value().child_types.find(child_name); if (result != std::end(schema.value().child_types)) { return result->second; } @@ -325,6 +325,13 @@ std::pair, std::vector> device_json_co return {}; }; + auto get_list_child_schema = [&schema]() -> std::optional { + if (schema.has_value()) { + if (schema.value().child_types.size() > 0) return schema.value().child_types.begin()->second; + } + return {}; + }; + switch (json_col.type) { case json_col_t::StringColumn: { // move string_offsets to GPU and transform to string column @@ -439,9 +446,8 @@ std::pair, std::vector> device_json_co rmm::device_buffer{}, 0); // Create children column - auto child_schema_element = json_col.child_columns.empty() - ? std::optional{} - : get_child_schema(json_col.child_columns.begin()->first); + auto child_schema_element = + json_col.child_columns.empty() ? std::optional{} : get_list_child_schema(); auto [child_column, names] = json_col.child_columns.empty() or (prune_columns and !child_schema_element.has_value()) ? std::pair, @@ -479,6 +485,16 @@ std::pair, std::vector> device_json_co } } +template +auto make_device_json_column_dispatch(bool experimental, Args&&... args) +{ + if (experimental) { + return experimental::make_device_json_column(std::forward(args)...); + } else { + return make_device_json_column(std::forward(args)...); + } +} + table_with_metadata device_parse_nested_json(device_span d_input, cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream, @@ -524,6 +540,7 @@ table_with_metadata device_parse_nested_json(device_span d_input, gpu_tree, is_array_of_arrays, options.is_enabled_lines(), + options.is_enabled_experimental(), stream, cudf::get_current_device_resource_ref()); @@ -536,15 +553,16 @@ table_with_metadata device_parse_nested_json(device_span d_input, 0); // Get internal JSON column - make_device_json_column(d_input, - gpu_tree, - gpu_col_id, - gpu_row_offsets, - root_column, - is_array_of_arrays, - options, - stream, - mr); + make_device_json_column_dispatch(options.is_enabled_experimental(), + d_input, + gpu_tree, + gpu_col_id, + gpu_row_offsets, + root_column, + is_array_of_arrays, + options, + stream, + mr); // data_root refers to the root column of the data represented by the given JSON string auto& data_root = diff --git a/cpp/src/io/json/json_tree.cu b/cpp/src/io/json/json_tree.cu index 4d0dc010c57..d949635c1cc 100644 --- a/cpp/src/io/json/json_tree.cu +++ b/cpp/src/io/json/json_tree.cu @@ -14,17 +14,18 @@ * limitations under the License. */ -#include "io/utilities/hostdevice_vector.hpp" +#include "io/utilities/parsing_utils.cuh" +#include "io/utilities/string_parsing.hpp" #include "nested_json.hpp" #include #include -#include #include #include #include #include #include +#include #include #include #include @@ -34,12 +35,14 @@ #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -492,6 +495,85 @@ tree_meta_t get_tree_representation(device_span tokens, std::move(node_range_end)}; } +// Return field node ids after unicode decoding of field names and matching them to same field names +std::pair> remapped_field_nodes_after_unicode_decode( + device_span d_input, + tree_meta_t const& d_tree, + device_span keys, + rmm::cuda_stream_view stream) +{ + size_t num_keys = keys.size(); + if (num_keys == 0) { return {num_keys, rmm::device_uvector(num_keys, stream)}; } + rmm::device_uvector offsets(num_keys, stream); + rmm::device_uvector lengths(num_keys, stream); + auto offset_length_it = thrust::make_zip_iterator(offsets.begin(), lengths.begin()); + thrust::transform(rmm::exec_policy_nosync(stream), + keys.begin(), + keys.end(), + offset_length_it, + [node_range_begin = d_tree.node_range_begin.data(), + node_range_end = d_tree.node_range_end.data()] __device__(auto key) { + return thrust::make_tuple(node_range_begin[key], + node_range_end[key] - node_range_begin[key]); + }); + cudf::io::parse_options_view opt{',', '\n', '\0', '.'}; + opt.keepquotes = true; + + auto utf8_decoded_fields = parse_data(d_input.data(), + offset_length_it, + num_keys, + data_type{type_id::STRING}, + rmm::device_buffer{}, + 0, + opt, + stream, + cudf::get_current_device_resource_ref()); + // hash using iter, create a hashmap for 0-num_keys. + // insert and find. -> array + // store to static_map with keys as field key[index], and values as key[array[index]] + + auto str_view = strings_column_view{utf8_decoded_fields->view()}; + auto const char_ptr = str_view.chars_begin(stream); + auto const offset_ptr = str_view.offsets().begin(); + + // String hasher + auto const d_hasher = cuda::proclaim_return_type< + typename cudf::hashing::detail::default_hash::result_type>( + [char_ptr, offset_ptr] __device__(auto node_id) { + auto const field_name = cudf::string_view(char_ptr + offset_ptr[node_id], + offset_ptr[node_id + 1] - offset_ptr[node_id]); + return cudf::hashing::detail::default_hash{}(field_name); + }); + auto const d_equal = [char_ptr, offset_ptr] __device__(auto node_id1, auto node_id2) { + auto const field_name1 = cudf::string_view(char_ptr + offset_ptr[node_id1], + offset_ptr[node_id1 + 1] - offset_ptr[node_id1]); + auto const field_name2 = cudf::string_view(char_ptr + offset_ptr[node_id2], + offset_ptr[node_id2 + 1] - offset_ptr[node_id2]); + return field_name1 == field_name2; + }; + + using hasher_type = decltype(d_hasher); + constexpr size_type empty_node_index_sentinel = -1; + auto key_set = cuco::static_set{ + cuco::extent{compute_hash_table_size(num_keys)}, + cuco::empty_key{empty_node_index_sentinel}, + d_equal, + cuco::linear_probing<1, hasher_type>{d_hasher}, + {}, + {}, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, + stream.value()}; + auto const counting_iter = thrust::make_counting_iterator(0); + rmm::device_uvector found_keys(num_keys, stream); + key_set.insert_and_find_async(counting_iter, + counting_iter + num_keys, + found_keys.begin(), + thrust::make_discard_iterator(), + stream.value()); + // set.size will synchronize the stream before return. + return {key_set.size(stream), std::move(found_keys)}; +} + /** * @brief Generates unique node_type id for each node. * Field nodes with the same name are assigned the same node_type id. @@ -500,11 +582,14 @@ tree_meta_t get_tree_representation(device_span tokens, * All inputs and outputs are in node_id order. * @param d_input JSON string in device memory * @param d_tree Tree representation of the JSON + * @param is_enabled_experimental Whether to enable experimental features such as + * utf8 field name support * @param stream CUDA stream used for device memory operations and kernel launches. * @return Vector of node_type ids */ rmm::device_uvector hash_node_type_with_field_name(device_span d_input, tree_meta_t const& d_tree, + bool is_enabled_experimental, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -536,7 +621,7 @@ rmm::device_uvector hash_node_type_with_field_name(device_span(0); + auto const counting_iter = thrust::make_counting_iterator(0); auto const is_field_name_node = [node_categories = d_tree.node_categories.data()] __device__(auto node_id) { @@ -554,15 +639,61 @@ rmm::device_uvector hash_node_type_with_field_name(device_span{rmm::mr::polymorphic_allocator{}, stream}, stream.value()}; - key_set.insert_if_async(iter, - iter + num_nodes, + key_set.insert_if_async(counting_iter, + counting_iter + num_nodes, thrust::counting_iterator(0), // stencil is_field_name_node, stream.value()); + // experimental feature: utf8 field name support + // parse_data on field names, + // rehash it using another map, + // reassign the reverse map values to new matched node indices. + auto get_utf8_matched_field_nodes = [&]() { + auto make_map = [&stream](auto num_keys) { + using hasher_type3 = cudf::hashing::detail::default_hash; + return cuco::static_map{ + cuco::extent{compute_hash_table_size(num_keys, 100)}, // 100% occupancy + cuco::empty_key{empty_node_index_sentinel}, + cuco::empty_value{empty_node_index_sentinel}, + {}, + cuco::linear_probing<1, hasher_type3>{hasher_type3{}}, + {}, + {}, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, + stream.value()}; + }; + if (!is_enabled_experimental) { return std::pair{false, make_map(0)}; } + // get all unique field node ids for utf8 decoding + auto num_keys = key_set.size(stream); + rmm::device_uvector keys(num_keys, stream); + key_set.retrieve_all(keys.data(), stream.value()); + + auto [num_unique_fields, found_keys] = + remapped_field_nodes_after_unicode_decode(d_input, d_tree, keys, stream); + + auto is_need_remap = num_unique_fields != num_keys; + if (!is_need_remap) { return std::pair{false, make_map(0)}; } + + // store to static_map with keys as field keys[index], and values as keys[found_keys[index]] + auto reverse_map = make_map(num_keys); + auto matching_keys_iter = thrust::make_permutation_iterator(keys.begin(), found_keys.begin()); + auto pair_iter = + thrust::make_zip_iterator(thrust::make_tuple(keys.begin(), matching_keys_iter)); + reverse_map.insert_async(pair_iter, pair_iter + num_keys, stream); + return std::pair{is_need_remap, std::move(reverse_map)}; + }; + auto [is_need_remap, reverse_map] = get_utf8_matched_field_nodes(); + auto const get_hash_value = - [key_set = key_set.ref(cuco::op::find)] __device__(auto node_id) -> size_type { + [key_set = key_set.ref(cuco::op::find), + is_need_remap = is_need_remap, + rm = reverse_map.ref(cuco::op::find)] __device__(auto node_id) -> size_type { auto const it = key_set.find(node_id); + if (it != key_set.end() and is_need_remap) { + auto const it2 = rm.find(*it); + return (it2 == rm.end()) ? size_type{0} : it2->second; + } return (it == key_set.end()) ? size_type{0} : *it; }; @@ -771,6 +902,8 @@ std::pair, rmm::device_uvector> hash_n * @param d_tree Tree representation of the JSON * @param is_array_of_arrays Whether the tree is an array of arrays * @param is_enabled_lines Whether the input is a line-delimited JSON + * @param is_enabled_experimental Whether the experimental feature is enabled such as + * utf8 field name support * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned column's device memory * @return column_id, parent_column_id @@ -780,6 +913,7 @@ std::pair, rmm::device_uvector> gene tree_meta_t const& d_tree, bool is_array_of_arrays, bool is_enabled_lines, + bool is_enabled_experimental, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { @@ -793,7 +927,7 @@ std::pair, rmm::device_uvector> gene auto [col_id, unique_keys] = [&]() { // Convert node_category + field_name to node_type. rmm::device_uvector node_type = - hash_node_type_with_field_name(d_input, d_tree, stream); + hash_node_type_with_field_name(d_input, d_tree, is_enabled_experimental, stream); // hash entire path from node to root. return hash_node_path(d_tree.node_levels, @@ -948,12 +1082,13 @@ records_orient_tree_traversal(device_span d_input, tree_meta_t const& d_tree, bool is_array_of_arrays, bool is_enabled_lines, + bool is_enabled_experimental, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); - auto [new_col_id, new_parent_col_id] = - generate_column_id(d_input, d_tree, is_array_of_arrays, is_enabled_lines, stream, mr); + auto [new_col_id, new_parent_col_id] = generate_column_id( + d_input, d_tree, is_array_of_arrays, is_enabled_lines, is_enabled_experimental, stream, mr); auto row_offsets = compute_row_offsets( std::move(new_parent_col_id), d_tree, is_array_of_arrays, is_enabled_lines, stream, mr); diff --git a/cpp/src/io/json/nested_json.hpp b/cpp/src/io/json/nested_json.hpp index 93ef2b46be1..3d9a51833e0 100644 --- a/cpp/src/io/json/nested_json.hpp +++ b/cpp/src/io/json/nested_json.hpp @@ -316,6 +316,8 @@ tree_meta_t get_tree_representation(device_span tokens, * index, level, begin index, and end index in the input JSON string * @param is_array_of_arrays Whether the tree is an array of arrays * @param is_enabled_lines Whether the input is a line-delimited JSON + * @param is_enabled_experimental Whether to enable experimental features such as utf-8 field name + * support * @param stream The CUDA stream to which kernels are dispatched * @param mr Optional, resource with which to allocate * @return A tuple of the output column indices and the row offsets within each column for each node @@ -326,6 +328,7 @@ records_orient_tree_traversal(device_span d_input, tree_meta_t const& d_tree, bool is_array_of_arrays, bool is_enabled_lines, + bool is_enabled_experimental, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); @@ -352,7 +355,7 @@ get_array_children_indices(TreeDepthT row_array_children_level, /** * @brief Reduces node tree representation to column tree representation. * - * @param node_tree Node tree representation of JSON string + * @param tree Node tree representation of JSON string * @param original_col_ids Column ids of nodes * @param sorted_col_ids Sorted column ids of nodes * @param ordered_node_ids Node ids of nodes sorted by column ids @@ -365,7 +368,7 @@ get_array_children_indices(TreeDepthT row_array_children_level, */ CUDF_EXPORT std::tuple, rmm::device_uvector> -reduce_to_column_tree(tree_meta_t& node_tree, +reduce_to_column_tree(tree_meta_t const& tree, device_span original_col_ids, device_span sorted_col_ids, device_span ordered_node_ids, @@ -393,14 +396,30 @@ reduce_to_column_tree(tree_meta_t& node_tree, * of child_offets and validity members of `d_json_column` */ void make_device_json_column(device_span input, - tree_meta_t& tree, - device_span col_ids, - device_span row_offsets, + tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, device_json_column& root, bool is_array_of_arrays, cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); + +namespace experimental { +/** + * @copydoc cudf::io::json::detail::make_device_json_column + */ +void make_device_json_column(device_span input, + tree_meta_t const& tree, + device_span col_ids, + device_span row_offsets, + device_json_column& root, + bool is_array_of_arrays, + cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); +} // namespace experimental + /** * @brief Retrieves the parse_options to be used for type inference and type casting * diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 48bc982d0e3..68ec255b39d 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -2856,6 +2856,59 @@ TEST_F(JsonReaderTest, JSONMixedTypeChildren) } } +TEST_F(JsonReaderTest, MixedTypesWithSchema) +{ + std::string data = "{\"data\": {\"A\": 0, \"B\": 1}}\n{\"data\": [1,0]}\n"; + + std::map data_types; + std::map child_types; + child_types.insert( + std::pair{"element", cudf::io::schema_element{cudf::data_type{cudf::type_id::STRING, 0}, {}}}); + data_types.insert(std::pair{ + "data", cudf::io::schema_element{cudf::data_type{cudf::type_id::LIST, 0}, child_types}}); + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .dtypes(data_types) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL) + .normalize_single_quotes(true) + .normalize_whitespace(true) + .mixed_types_as_string(true) + .experimental(true) + .keep_quotes(true) + .lines(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 2); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::LIST); + EXPECT_EQ(result.tbl->get_column(0).child(1).type().id(), cudf::type_id::STRING); +} + +TEST_F(JsonReaderTest, UnicodeFieldname) +{ + // unicode at nested and leaf levels + std::string data = R"({"data": {"a": 0, "b c": 1}} + {"data": {"\u0061": 2, "\u0062\tc": 3}} + {"d\u0061ta": {"a": 4}})"; + + cudf::io::json_reader_options in_options = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .recovery_mode(cudf::io::json_recovery_mode_t::RECOVER_WITH_NULL) + .experimental(true) + .lines(true); + cudf::io::table_with_metadata result = cudf::io::read_json(in_options); + EXPECT_EQ(result.tbl->num_columns(), 1); + EXPECT_EQ(result.tbl->num_rows(), 3); + EXPECT_EQ(result.tbl->get_column(0).type().id(), cudf::type_id::STRUCT); + EXPECT_EQ(result.tbl->get_column(0).num_children(), 2); + EXPECT_EQ(result.tbl->get_column(0).child(0).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.tbl->get_column(0).child(1).type().id(), cudf::type_id::INT64); + EXPECT_EQ(result.metadata.schema_info.at(0).name, "data"); + EXPECT_EQ(result.metadata.schema_info.at(0).children.at(0).name, "a"); + EXPECT_EQ(result.metadata.schema_info.at(0).children.at(1).name, "b\tc"); + EXPECT_EQ(result.metadata.schema_info.at(0).children.size(), 2); +} + TEST_F(JsonReaderTest, JsonDtypeSchema) { std::string data = R"( diff --git a/cpp/tests/io/json/json_tree.cpp b/cpp/tests/io/json/json_tree.cpp index 875cc467b6a..15682c6ae6b 100644 --- a/cpp/tests/io/json/json_tree.cpp +++ b/cpp/tests/io/json/json_tree.cpp @@ -889,6 +889,7 @@ TEST_P(JsonTreeTraversalTest, CPUvsGPUTraversal) gpu_tree, is_array_of_arrays, json_lines, + false, stream, cudf::get_current_device_resource_ref()); #if LIBCUDF_JSON_DEBUG_DUMP diff --git a/cpp/tests/io/json/json_tree_csr.cu b/cpp/tests/io/json/json_tree_csr.cu index a336b327732..f988ae24b38 100644 --- a/cpp/tests/io/json/json_tree_csr.cu +++ b/cpp/tests/io/json/json_tree_csr.cu @@ -168,6 +168,7 @@ void run_test(std::string const& input, bool enable_lines = true) gpu_tree, is_array_of_arrays, options.is_enabled_lines(), + false, stream, rmm::mr::get_current_device_resource()); auto& gpu_col_id = std::get<0>(tup); diff --git a/java/src/main/java/ai/rapids/cudf/JSONOptions.java b/java/src/main/java/ai/rapids/cudf/JSONOptions.java index 2bb74c3e3b1..e41cc15712f 100644 --- a/java/src/main/java/ai/rapids/cudf/JSONOptions.java +++ b/java/src/main/java/ai/rapids/cudf/JSONOptions.java @@ -39,6 +39,7 @@ public final class JSONOptions extends ColumnFilterOptions { private final boolean allowNonNumericNumbers; private final boolean allowUnquotedControlChars; private final boolean cudfPruneSchema; + private final boolean experimental; private final byte lineDelimiter; private JSONOptions(Builder builder) { @@ -55,6 +56,7 @@ private JSONOptions(Builder builder) { allowNonNumericNumbers = builder.allowNonNumericNumbers; allowUnquotedControlChars = builder.allowUnquotedControlChars; cudfPruneSchema = builder.cudfPruneSchema; + experimental = builder.experimental; lineDelimiter = builder.lineDelimiter; } @@ -111,6 +113,10 @@ public boolean unquotedControlChars() { return allowUnquotedControlChars; } + public boolean experimental() { + return experimental; + } + @Override String[] getIncludeColumnNames() { throw new UnsupportedOperationException("JSON reader didn't support column prune"); @@ -136,6 +142,7 @@ public static final class Builder extends ColumnFilterOptions.Builder(line_delimiter)) .strict_validation(strict_validation) + .experimental(experimental) .keep_quotes(keep_quotes) .prune_columns(false); if (strict_validation) { @@ -1680,6 +1682,7 @@ Java_ai_rapids_cudf_Table_readAndInferJSON(JNIEnv* env, jboolean allow_leading_zeros, jboolean allow_nonnumeric_numbers, jboolean allow_unquoted_control, + jboolean experimental, jbyte line_delimiter) { JNI_NULL_CHECK(env, buffer, "buffer cannot be null", 0); @@ -1705,6 +1708,7 @@ Java_ai_rapids_cudf_Table_readAndInferJSON(JNIEnv* env, .strict_validation(strict_validation) .mixed_types_as_string(mixed_types_as_string) .prune_columns(false) + .experimental(experimental) .delimiter(static_cast(line_delimiter)) .keep_quotes(keep_quotes); if (strict_validation) { @@ -1821,6 +1825,7 @@ Java_ai_rapids_cudf_Table_readJSONFromDataSource(JNIEnv* env, jboolean allow_nonnumeric_numbers, jboolean allow_unquoted_control, jboolean prune_columns, + jboolean experimental, jbyte line_delimiter, jlong ds_handle) { @@ -1859,7 +1864,8 @@ Java_ai_rapids_cudf_Table_readJSONFromDataSource(JNIEnv* env, .delimiter(static_cast(line_delimiter)) .strict_validation(strict_validation) .keep_quotes(keep_quotes) - .prune_columns(prune_columns); + .prune_columns(prune_columns) + .experimental(experimental); if (strict_validation) { opts.numeric_leading_zeros(allow_leading_zeros) .nonnumeric_numbers(allow_nonnumeric_numbers) @@ -1920,6 +1926,7 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(JNIEnv* env, jboolean allow_nonnumeric_numbers, jboolean allow_unquoted_control, jboolean prune_columns, + jboolean experimental, jbyte line_delimiter) { bool read_buffer = true; @@ -1972,7 +1979,8 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Table_readJSON(JNIEnv* env, .delimiter(static_cast(line_delimiter)) .strict_validation(strict_validation) .keep_quotes(keep_quotes) - .prune_columns(prune_columns); + .prune_columns(prune_columns) + .experimental(experimental); if (strict_validation) { opts.numeric_leading_zeros(allow_leading_zeros) .nonnumeric_numbers(allow_nonnumeric_numbers) From 61af76978e97d94c1c9c7297fc73900d7827b433 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:48:51 -1000 Subject: [PATCH 3/4] Add io/timezone APIs to pylibcudf (#16771) Contributes to https://github.com/rapidsai/cudf/issues/15162 Authors: - Matthew Roeschke (https://github.com/mroeschke) - Vyas Ramasubramani (https://github.com/vyasr) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/16771 --- .../api_docs/pylibcudf/io/index.rst | 1 + .../api_docs/pylibcudf/io/timezone.rst | 6 +++ python/cudf/cudf/_lib/timezone.pyx | 27 ++---------- python/pylibcudf/pylibcudf/io/CMakeLists.txt | 4 +- python/pylibcudf/pylibcudf/io/__init__.pxd | 2 +- python/pylibcudf/pylibcudf/io/__init__.py | 2 +- python/pylibcudf/pylibcudf/io/timezone.pxd | 6 +++ python/pylibcudf/pylibcudf/io/timezone.pyx | 43 +++++++++++++++++++ .../pylibcudf/tests/io/test_timezone.py | 16 +++++++ 9 files changed, 81 insertions(+), 26 deletions(-) create mode 100644 docs/cudf/source/user_guide/api_docs/pylibcudf/io/timezone.rst create mode 100644 python/pylibcudf/pylibcudf/io/timezone.pxd create mode 100644 python/pylibcudf/pylibcudf/io/timezone.pyx create mode 100644 python/pylibcudf/pylibcudf/tests/io/test_timezone.py diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index c8933981736..53638f071cc 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -19,3 +19,4 @@ I/O Functions csv json parquet + timezone diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/timezone.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/timezone.rst new file mode 100644 index 00000000000..20c1ffc2e93 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/timezone.rst @@ -0,0 +1,6 @@ +======== +Timezone +======== + +.. automodule:: pylibcudf.io.timezone + :members: diff --git a/python/cudf/cudf/_lib/timezone.pyx b/python/cudf/cudf/_lib/timezone.pyx index bff3b2c4ce4..54624a5a2fd 100644 --- a/python/cudf/cudf/_lib/timezone.pyx +++ b/python/cudf/cudf/_lib/timezone.pyx @@ -1,29 +1,10 @@ # Copyright (c) 2023-2024, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr -from libcpp.optional cimport make_optional -from libcpp.string cimport string -from libcpp.utility cimport move +import pylibcudf as plc -from pylibcudf.libcudf.io.timezone cimport ( - make_timezone_transition_table as cpp_make_timezone_transition_table, -) -from pylibcudf.libcudf.table.table cimport table - -from cudf._lib.utils cimport columns_from_unique_ptr +from cudf._lib.column cimport Column def make_timezone_transition_table(tzdir, tzname): - cdef unique_ptr[table] c_result - cdef string c_tzdir = tzdir.encode() - cdef string c_tzname = tzname.encode() - - with nogil: - c_result = move( - cpp_make_timezone_transition_table( - make_optional[string](c_tzdir), - c_tzname - ) - ) - - return columns_from_unique_ptr(move(c_result)) + plc_table = plc.io.timezone.make_timezone_transition_table(tzdir, tzname) + return [Column.from_pylibcudf(col) for col in plc_table.columns()] diff --git a/python/pylibcudf/pylibcudf/io/CMakeLists.txt b/python/pylibcudf/pylibcudf/io/CMakeLists.txt index 529a71a48ce..965724a47b1 100644 --- a/python/pylibcudf/pylibcudf/io/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,9 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx types.pyx) +set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx timezone.pyx + types.pyx +) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/pylibcudf/pylibcudf/io/__init__.pxd b/python/pylibcudf/pylibcudf/io/__init__.pxd index 5927a19dc69..1bcc0a3f963 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.pxd +++ b/python/pylibcudf/pylibcudf/io/__init__.pxd @@ -1,5 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. # CSV is removed since it is def not cpdef (to force kw-only arguments) -from . cimport avro, datasource, json, orc, parquet, types +from . cimport avro, datasource, json, orc, parquet, timezone, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/__init__.py b/python/pylibcudf/pylibcudf/io/__init__.py index 5d899ee0808..2e4f215b12c 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.py +++ b/python/pylibcudf/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, csv, datasource, json, orc, parquet, types +from . import avro, csv, datasource, json, orc, parquet, timezone, types from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/timezone.pxd b/python/pylibcudf/pylibcudf/io/timezone.pxd new file mode 100644 index 00000000000..2aa755dbbd8 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/timezone.pxd @@ -0,0 +1,6 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from ..table cimport Table + + +cpdef Table make_timezone_transition_table(str tzif_dir, str timezone_name) diff --git a/python/pylibcudf/pylibcudf/io/timezone.pyx b/python/pylibcudf/pylibcudf/io/timezone.pyx new file mode 100644 index 00000000000..e02239d7252 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/timezone.pyx @@ -0,0 +1,43 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.memory cimport unique_ptr +from libcpp.optional cimport make_optional +from libcpp.string cimport string +from libcpp.utility cimport move +from pylibcudf.libcudf.io.timezone cimport ( + make_timezone_transition_table as cpp_make_timezone_transition_table, +) +from pylibcudf.libcudf.table.table cimport table + +from ..table cimport Table + + +cpdef Table make_timezone_transition_table(str tzif_dir, str timezone_name): + """ + Creates a transition table to convert ORC timestamps to UTC. + + Parameters + ---------- + tzif_dir : str + The directory where the TZif files are located + timezone_name : str + standard timezone name + + Returns + ------- + Table + The transition table for the given timezone. + """ + cdef unique_ptr[table] c_result + cdef string c_tzdir = tzif_dir.encode() + cdef string c_tzname = timezone_name.encode() + + with nogil: + c_result = move( + cpp_make_timezone_transition_table( + make_optional[string](c_tzdir), + c_tzname + ) + ) + + return Table.from_libcudf(move(c_result)) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_timezone.py b/python/pylibcudf/pylibcudf/tests/io/test_timezone.py new file mode 100644 index 00000000000..76b0424b2af --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/io/test_timezone.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import zoneinfo + +import pylibcudf as plc +import pytest + + +def test_make_timezone_transition_table(): + if len(zoneinfo.TZPATH) == 0: + pytest.skip("No TZPATH available.") + tz_path = zoneinfo.TZPATH[0] + result = plc.io.timezone.make_timezone_transition_table( + tz_path, "America/Los_Angeles" + ) + assert isinstance(result, plc.Table) + assert result.num_rows() > 0 From b00a718a7980fadc91c8b37d6bbe829e4b8549e8 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:51:18 -1000 Subject: [PATCH 4/4] Add partitioning APIs to pylibcudf (#16781) Contributes to https://github.com/rapidsai/cudf/issues/15162 Authors: - Matthew Roeschke (https://github.com/mroeschke) - Matthew Murray (https://github.com/Matt711) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Matthew Murray (https://github.com/Matt711) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16781 --- .../user_guide/api_docs/pylibcudf/index.rst | 1 + .../api_docs/pylibcudf/partitioning.rst | 6 + python/cudf/cudf/_lib/hash.pyx | 35 ++--- python/cudf/cudf/_lib/partitioning.pyx | 35 +---- python/pylibcudf/pylibcudf/CMakeLists.txt | 1 + python/pylibcudf/pylibcudf/__init__.pxd | 2 + python/pylibcudf/pylibcudf/__init__.py | 2 + .../pylibcudf/libcudf/partitioning.pxd | 7 + python/pylibcudf/pylibcudf/partitioning.pxd | 19 +++ python/pylibcudf/pylibcudf/partitioning.pyx | 120 ++++++++++++++++++ .../pylibcudf/tests/test_partitioning.py | 55 ++++++++ 11 files changed, 229 insertions(+), 54 deletions(-) create mode 100644 docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst create mode 100644 python/pylibcudf/pylibcudf/partitioning.pxd create mode 100644 python/pylibcudf/pylibcudf/partitioning.pyx create mode 100644 python/pylibcudf/pylibcudf/tests/test_partitioning.py diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst index edb0963ed29..e21536e2e97 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst @@ -25,6 +25,7 @@ This page provides API documentation for pylibcudf. lists merge null_mask + partitioning quantiles reduce replace diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst new file mode 100644 index 00000000000..6951dbecca0 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst @@ -0,0 +1,6 @@ +============ +partitioning +============ + +.. automodule:: pylibcudf.partitioning + :members: diff --git a/python/cudf/cudf/_lib/hash.pyx b/python/cudf/cudf/_lib/hash.pyx index 48f75b12a73..9b7ab0888d2 100644 --- a/python/cudf/cudf/_lib/hash.pyx +++ b/python/cudf/cudf/_lib/hash.pyx @@ -3,11 +3,8 @@ from cudf.core.buffer import acquire_spill_lock from libcpp.memory cimport unique_ptr -from libcpp.pair cimport pair from libcpp.utility cimport move -from libcpp.vector cimport vector -cimport pylibcudf.libcudf.types as libcudf_types from pylibcudf.libcudf.column.column cimport column from pylibcudf.libcudf.hash cimport ( md5, @@ -19,37 +16,23 @@ from pylibcudf.libcudf.hash cimport ( sha512, xxhash_64, ) -from pylibcudf.libcudf.partitioning cimport ( - hash_partition as cpp_hash_partition, -) -from pylibcudf.libcudf.table.table cimport table from pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns +from cudf._lib.utils cimport table_view_from_columns + +import pylibcudf as plc @acquire_spill_lock() -def hash_partition(list source_columns, object columns_to_hash, +def hash_partition(list source_columns, list columns_to_hash, int num_partitions): - cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash - cdef int c_num_partitions = num_partitions - cdef table_view c_source_view = table_view_from_columns(source_columns) - - cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result - with nogil: - c_result = move( - cpp_hash_partition( - c_source_view, - c_columns_to_hash, - c_num_partitions - ) - ) - - return ( - columns_from_unique_ptr(move(c_result.first)), - list(c_result.second) + plc_table, offsets = plc.partitioning.hash_partition( + plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), + columns_to_hash, + num_partitions ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/partitioning.pyx b/python/cudf/cudf/_lib/partitioning.pyx index d94f0e1b564..13997da8403 100644 --- a/python/cudf/cudf/_lib/partitioning.pyx +++ b/python/cudf/cudf/_lib/partitioning.pyx @@ -2,24 +2,13 @@ from cudf.core.buffer import acquire_spill_lock -from libcpp.memory cimport unique_ptr -from libcpp.pair cimport pair -from libcpp.utility cimport move -from libcpp.vector cimport vector - -from pylibcudf.libcudf.column.column_view cimport column_view -from pylibcudf.libcudf.partitioning cimport partition as cpp_partition -from pylibcudf.libcudf.table.table cimport table -from pylibcudf.libcudf.table.table_view cimport table_view - from cudf._lib.column cimport Column -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns + +import pylibcudf as plc from cudf._lib.reduce import minmax from cudf._lib.stream_compaction import distinct_count as cpp_distinct_count -cimport pylibcudf.libcudf.types as libcudf_types - @acquire_spill_lock() def partition(list source_columns, Column partition_map, @@ -50,25 +39,15 @@ def partition(list source_columns, Column partition_map, if num_partitions is None: num_partitions = cpp_distinct_count(partition_map, ignore_nulls=True) - cdef int c_num_partitions = num_partitions - cdef table_view c_source_view = table_view_from_columns(source_columns) - - cdef column_view c_partition_map_view = partition_map.view() - cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result if partition_map.size > 0: lo, hi = minmax(partition_map) if lo < 0 or hi >= num_partitions: raise ValueError("Partition map has invalid values") - with nogil: - c_result = move( - cpp_partition( - c_source_view, - c_partition_map_view, - c_num_partitions - ) - ) - return ( - columns_from_unique_ptr(move(c_result.first)), list(c_result.second) + plc_table, offsets = plc.partitioning.partition( + plc.Table([col.to_pylibcudf(mode="read") for col in source_columns]), + partition_map.to_pylibcudf(mode="read"), + num_partitions ) + return [Column.from_pylibcudf(col) for col in plc_table.columns()], offsets diff --git a/python/pylibcudf/pylibcudf/CMakeLists.txt b/python/pylibcudf/pylibcudf/CMakeLists.txt index fb3a6c13a70..a7cb66d7b16 100644 --- a/python/pylibcudf/pylibcudf/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/CMakeLists.txt @@ -31,6 +31,7 @@ set(cython_sources lists.pyx merge.pyx null_mask.pyx + partitioning.pyx quantiles.pyx reduce.pyx replace.pyx diff --git a/python/pylibcudf/pylibcudf/__init__.pxd b/python/pylibcudf/pylibcudf/__init__.pxd index 66d9c3d6165..a384edd456d 100644 --- a/python/pylibcudf/pylibcudf/__init__.pxd +++ b/python/pylibcudf/pylibcudf/__init__.pxd @@ -17,6 +17,7 @@ from . cimport ( lists, merge, null_mask, + partitioning, quantiles, reduce, replace, @@ -61,6 +62,7 @@ __all__ = [ "lists", "merge", "null_mask", + "partitioning", "quantiles", "reduce", "replace", diff --git a/python/pylibcudf/pylibcudf/__init__.py b/python/pylibcudf/pylibcudf/__init__.py index 0a3615fa941..2a5365e8fad 100644 --- a/python/pylibcudf/pylibcudf/__init__.py +++ b/python/pylibcudf/pylibcudf/__init__.py @@ -28,6 +28,7 @@ lists, merge, null_mask, + partitioning, quantiles, reduce, replace, @@ -75,6 +76,7 @@ "lists", "merge", "null_mask", + "partitioning", "quantiles", "reduce", "replace", diff --git a/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd b/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd index 1ea10e8a194..89bddbffab5 100644 --- a/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/partitioning.pxd @@ -25,3 +25,10 @@ cdef extern from "cudf/partitioning.hpp" namespace "cudf" nogil: const column_view& partition_map, int num_partitions ) except + + + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] \ + round_robin_partition "cudf::round_robin_partition" ( + const table_view& input, + int num_partitions, + int start_partition + ) except + diff --git a/python/pylibcudf/pylibcudf/partitioning.pxd b/python/pylibcudf/pylibcudf/partitioning.pxd new file mode 100644 index 00000000000..aad60149fc4 --- /dev/null +++ b/python/pylibcudf/pylibcudf/partitioning.pxd @@ -0,0 +1,19 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from .column cimport Column +from .table cimport Table + + +cpdef tuple[Table, list] hash_partition( + Table input, + list columns_to_hash, + int num_partitions +) + +cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions) + +cpdef tuple[Table, list] round_robin_partition( + Table input, + int num_partitions, + int start_partition=* +) diff --git a/python/pylibcudf/pylibcudf/partitioning.pyx b/python/pylibcudf/pylibcudf/partitioning.pyx new file mode 100644 index 00000000000..8fa70daab5a --- /dev/null +++ b/python/pylibcudf/pylibcudf/partitioning.pyx @@ -0,0 +1,120 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cimport pylibcudf.libcudf.types as libcudf_types +from libcpp.memory cimport unique_ptr +from libcpp.pair cimport pair +from libcpp.utility cimport move +from libcpp.vector cimport vector +from pylibcudf.libcudf cimport partitioning as cpp_partitioning +from pylibcudf.libcudf.table.table cimport table + +from .column cimport Column +from .table cimport Table + + +cpdef tuple[Table, list] hash_partition( + Table input, + list columns_to_hash, + int num_partitions +): + """ + Partitions rows from the input table into multiple output tables. + + For details, see :cpp:func:`hash_partition`. + + Parameters + ---------- + input : Table + The table to partition + columns_to_hash : list[int] + Indices of input columns to hash + num_partitions : int + The number of partitions to use + + Returns + ------- + tuple[Table, list[int]] + An output table and a vector of row offsets to each partition + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash + cdef int c_num_partitions = num_partitions + + with nogil: + c_result = move( + cpp_partitioning.hash_partition( + input.view(), c_columns_to_hash, c_num_partitions + ) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) + +cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions): + """ + Partitions rows of `t` according to the mapping specified by `partition_map`. + + For details, see :cpp:func:`partition`. + + Parameters + ---------- + t : Table + The table to partition + partition_map : Column + Non-nullable column of integer values that map each row + in `t` to it's partition. + num_partitions : int + The total number of partitions + + Returns + ------- + tuple[Table, list[int]] + An output table and a list of row offsets to each partition + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef int c_num_partitions = num_partitions + + with nogil: + c_result = move( + cpp_partitioning.partition(t.view(), partition_map.view(), c_num_partitions) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) + + +cpdef tuple[Table, list] round_robin_partition( + Table input, + int num_partitions, + int start_partition=0 +): + """ + Round-robin partition. + + For details, see :cpp:func:`round_robin_partition`. + + Parameters + ---------- + input : Table + The input table to be round-robin partitioned + num_partitions : int + Number of partitions for the table + start_partition : int, default 0 + Index of the 1st partition + + Returns + ------- + tuple[Table, list[int]] + The partitioned table and the partition offsets + for each partition within the table. + """ + cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result + cdef int c_num_partitions = num_partitions + cdef int c_start_partition = start_partition + + with nogil: + c_result = move( + cpp_partitioning.round_robin_partition( + input.view(), c_num_partitions, c_start_partition + ) + ) + + return Table.from_libcudf(move(c_result.first)), list(c_result.second) diff --git a/python/pylibcudf/pylibcudf/tests/test_partitioning.py b/python/pylibcudf/pylibcudf/tests/test_partitioning.py new file mode 100644 index 00000000000..444d0089d2c --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/test_partitioning.py @@ -0,0 +1,55 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import pyarrow as pa +import pylibcudf as plc +import pytest +from utils import assert_table_eq + + +@pytest.fixture(scope="module") +def partitioning_data(): + data = {"a": [1, 2, 3], "b": [1, 2, 5], "c": [1, 2, 10]} + pa_table = pa.table(data) + plc_table = plc.interop.from_arrow(pa_table) + return data, plc_table, pa_table + + +def test_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.partition( + plc_table, + plc.interop.from_arrow(pa.array([0, 0, 0])), + 1, + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0, 3] + + +def test_hash_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.hash_partition( + plc_table, [0, 1], 1 + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0] + + +def test_round_robin_partition(partitioning_data): + raw_data, plc_table, pa_table = partitioning_data + result, result_offsets = plc.partitioning.round_robin_partition( + plc_table, 1, 0 + ) + expected = pa.table( + list(raw_data.values()), + schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), + ) + assert_table_eq(expected, result) + assert result_offsets == [0]