From 996cb8d870b7b6153802bde670435e8cd3b8775d Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Mon, 22 Jul 2024 16:15:16 -0400 Subject: [PATCH 1/3] Migrate lists/sorting to pylibcudf (#16179) Apart of #15162 Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/16179 --- python/cudf/cudf/_lib/lists.pyx | 28 +++------ .../_lib/pylibcudf/libcudf/lists/sorting.pxd | 6 ++ python/cudf/cudf/_lib/pylibcudf/lists.pxd | 4 +- python/cudf/cudf/_lib/pylibcudf/lists.pyx | 57 ++++++++++++++++++- .../cudf/cudf/pylibcudf_tests/test_lists.py | 46 +++++++++++++++ 5 files changed, 118 insertions(+), 23 deletions(-) diff --git a/python/cudf/cudf/_lib/lists.pyx b/python/cudf/cudf/_lib/lists.pyx index 76f37c3b845..50061f6e468 100644 --- a/python/cudf/cudf/_lib/lists.pyx +++ b/python/cudf/cudf/_lib/lists.pyx @@ -11,9 +11,6 @@ from cudf._lib.pylibcudf.libcudf.column.column cimport column from cudf._lib.pylibcudf.libcudf.lists.lists_column_view cimport ( lists_column_view, ) -from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( - sort_lists as cpp_sort_lists, -) from cudf._lib.pylibcudf.libcudf.lists.stream_compaction cimport ( distinct as cpp_distinct, ) @@ -21,7 +18,6 @@ from cudf._lib.pylibcudf.libcudf.types cimport ( nan_equality, null_equality, null_order, - order, size_type, ) from cudf._lib.utils cimport columns_from_pylibcudf_table @@ -80,24 +76,14 @@ def distinct(Column col, bool nulls_equal, bool nans_all_equal): @acquire_spill_lock() def sort_lists(Column col, bool ascending, str na_position): - cdef shared_ptr[lists_column_view] list_view = ( - make_shared[lists_column_view](col.view()) - ) - cdef order c_sort_order = ( - order.ASCENDING if ascending else order.DESCENDING - ) - cdef null_order c_null_prec = ( - null_order.BEFORE if na_position == "first" else null_order.AFTER - ) - - cdef unique_ptr[column] c_result - - with nogil: - c_result = move( - cpp_sort_lists(list_view.get()[0], c_sort_order, c_null_prec) + return Column.from_pylibcudf( + pylibcudf.lists.sort_lists( + col.to_pylibcudf(mode="read"), + ascending, + null_order.BEFORE if na_position == "first" else null_order.AFTER, + False, ) - - return Column.from_unique_ptr(move(c_result)) + ) @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd index 145ab41302f..337ac73908b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd @@ -15,3 +15,9 @@ cdef extern from "cudf/lists/sorting.hpp" namespace "cudf::lists" nogil: order column_order, null_order null_precedence ) except + + + cdef unique_ptr[column] stable_sort_lists( + const lists_column_view source_column, + order column_order, + null_order null_precedence + ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pxd b/python/cudf/cudf/_lib/pylibcudf/lists.pxd index 38eb575ee8d..cacecae6010 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pxd @@ -2,7 +2,7 @@ from libcpp cimport bool -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, size_type from .column cimport Column from .scalar cimport Scalar @@ -35,3 +35,5 @@ cpdef Column segmented_gather(Column, Column) cpdef Column extract_list_element(Column, ColumnOrSizeType) cpdef Column count_elements(Column) + +cpdef Column sort_lists(Column, bool, null_order, bool stable = *) diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pyx b/python/cudf/cudf/_lib/pylibcudf/lists.pyx index ea469642dd5..b5661a3e634 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pyx @@ -23,8 +23,12 @@ from cudf._lib.pylibcudf.libcudf.lists.count_elements cimport ( from cudf._lib.pylibcudf.libcudf.lists.extract cimport ( extract_list_element as cpp_extract_list_element, ) +from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( + sort_lists as cpp_sort_lists, + stable_sort_lists as cpp_stable_sort_lists, +) from cudf._lib.pylibcudf.libcudf.table.table cimport table -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, order, size_type from cudf._lib.pylibcudf.lists cimport ColumnOrScalar, ColumnOrSizeType from .column cimport Column, ListColumnView @@ -320,3 +324,54 @@ cpdef Column count_elements(Column input): c_result = move(cpp_count_elements(list_view.view())) return Column.from_libcudf(move(c_result)) + + +cpdef Column sort_lists( + Column input, + bool ascending, + null_order na_position, + bool stable = False +): + """Sort the elements within a list in each row of a list column. + + For details, see :cpp:func:`sort_lists`. + + Parameters + ---------- + input : Column + The input column. + ascending : bool + If true, the sort order is ascending. Otherwise, the sort order is descending. + na_position : NullOrder + If na_position equals NullOrder.FIRST, then the null values in the output + column are placed first. Otherwise, they are be placed after. + stable: bool + If true :cpp:func:`stable_sort_lists` is used, Otherwise, + :cpp:func:`sort_lists` is used. + + Returns + ------- + Column + A new Column with elements in each list sorted. + """ + cdef unique_ptr[column] c_result + cdef ListColumnView list_view = input.list_view() + + cdef order c_sort_order = ( + order.ASCENDING if ascending else order.DESCENDING + ) + + with nogil: + if stable: + c_result = move(cpp_stable_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + else: + c_result = move(cpp_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + return Column.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/pylibcudf_tests/test_lists.py b/python/cudf/cudf/pylibcudf_tests/test_lists.py index 7cfed884f90..87472f6d59b 100644 --- a/python/cudf/cudf/pylibcudf_tests/test_lists.py +++ b/python/cudf/cudf/pylibcudf_tests/test_lists.py @@ -22,6 +22,11 @@ def column(): return pa.array([3, 2, 5, 6]), pa.array([-1, 0, 0, 0], type=pa.int32()) +@pytest.fixture +def lists_column(): + return [[4, 2, 3, 1], [1, 2, None, 4], [-10, 10, 10, 0]] + + def test_concatenate_rows(test_data): arrow_tbl = pa.Table.from_arrays(test_data[0], names=["a", "b"]) plc_tbl = plc.interop.from_arrow(arrow_tbl) @@ -191,3 +196,44 @@ def test_count_elements(test_data): expect = pa.array([1, 1, 0, 3], type=pa.int32()) assert_column_eq(expect, res) + + +@pytest.mark.parametrize( + "ascending,na_position,expected", + [ + ( + True, + plc.types.NullOrder.BEFORE, + [[1, 2, 3, 4], [None, 1, 2, 4], [-10, 0, 10, 10]], + ), + ( + True, + plc.types.NullOrder.AFTER, + [[1, 2, 3, 4], [1, 2, 4, None], [-10, 0, 10, 10]], + ), + ( + False, + plc.types.NullOrder.BEFORE, + [[4, 3, 2, 1], [4, 2, 1, None], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ], +) +def test_sort_lists(lists_column, ascending, na_position, expected): + plc_column = plc.interop.from_arrow(pa.array(lists_column)) + res = plc.lists.sort_lists(plc_column, ascending, na_position, False) + res_stable = plc.lists.sort_lists(plc_column, ascending, na_position, True) + + expect = pa.array(expected) + + assert_column_eq(expect, res) + assert_column_eq(expect, res_stable) From 81e65ee312af5133ca2b98d52efaeb29c274a825 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Mon, 22 Jul 2024 15:18:40 -0500 Subject: [PATCH 2/3] Fix docstring of `DataFrame.apply` (#16351) This PR fixes docstring of `DataFrame.apply` Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/16351 --- python/cudf/cudf/core/dataframe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 288bdfd39b3..1d7136e61e3 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4525,7 +4525,6 @@ def apply( If False, the funcs will be passed the whole Series at once. Currently not supported. - engine : {'python', 'numba'}, default 'python' Unused. Added for compatibility with pandas. engine_kwargs : dict From 0cac2a9d68341a38721be16132ead14cf4a0d70b Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 22 Jul 2024 14:18:21 -0700 Subject: [PATCH 3/3] Remove size constraints on source files in batched JSON reading (#16162) Addresses https://github.com/rapidsai/cudf/issues/16138 The batched multi-source JSON reader fails when the size of any of the input source buffers exceeds `INT_MAX` bytes. The goal of this PR is to remove this constraint by modifying the batching behavior of the reader. Instead of constructing batches that include entire source files, the batches are now constructed at the granularity of byte ranges of size at most `INT_MAX` bytes, Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/16162 --- cpp/include/cudf/io/json.hpp | 4 +- cpp/src/io/json/read_json.cu | 139 +++++++++--------- cpp/src/io/json/read_json.hpp | 18 ++- cpp/tests/CMakeLists.txt | 14 +- .../json_chunked_reader.cu} | 81 ++-------- .../json_quote_normalization_test.cpp | 0 cpp/tests/io/{ => json}/json_test.cpp | 0 cpp/tests/io/{ => json}/json_tree.cpp | 0 .../io/{ => json}/json_type_cast_test.cu | 0 cpp/tests/io/json/json_utils.cuh | 105 +++++++++++++ .../json_whitespace_normalization_test.cu | 0 cpp/tests/io/{ => json}/json_writer.cpp | 0 cpp/tests/io/{ => json}/nested_json_test.cpp | 0 .../{json_tests.cpp => json_tests.cu} | 45 +++++- 14 files changed, 242 insertions(+), 164 deletions(-) rename cpp/tests/io/{json_chunked_reader.cpp => json/json_chunked_reader.cu} (64%) rename cpp/tests/io/{ => json}/json_quote_normalization_test.cpp (100%) rename cpp/tests/io/{ => json}/json_test.cpp (100%) rename cpp/tests/io/{ => json}/json_tree.cpp (100%) rename cpp/tests/io/{ => json}/json_type_cast_test.cu (100%) create mode 100644 cpp/tests/io/json/json_utils.cuh rename cpp/tests/io/{ => json}/json_whitespace_normalization_test.cu (100%) rename cpp/tests/io/{ => json}/json_writer.cpp (100%) rename cpp/tests/io/{ => json}/nested_json_test.cpp (100%) rename cpp/tests/large_strings/{json_tests.cpp => json_tests.cu} (50%) diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 7af90766ad0..d47266fdd12 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -333,14 +333,14 @@ class json_reader_options { * * @param offset Number of bytes of offset */ - void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; } + void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; } /** * @brief Set number of bytes to read. * * @param size Number of bytes to read */ - void set_byte_range_size(size_type size) { _byte_range_size = size; } + void set_byte_range_size(size_t size) { _byte_range_size = size; } /** * @brief Set delimiter separating records in JSON lines diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 9cd39038348..0ba4dedfc34 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -148,20 +148,12 @@ device_span ingest_raw_input(device_span buffer, return buffer.first(uncomp_data.size()); } -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream) +size_t estimate_size_per_subchunk(size_t chunk_size) { - auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1); - rmm::device_uvector buffer(total_source_size, stream); - auto readbufspan = ingest_raw_input(buffer, - sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - return find_first_delimiter(readbufspan, '\n', stream); + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); } /** @@ -183,7 +175,6 @@ datasource::owning_buffer> get_record_range_raw_input( rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; @@ -198,17 +189,8 @@ datasource::owning_buffer> get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - // Some magic numbers - constexpr int num_subchunks = 10; // per chunk_size - constexpr size_t min_subchunk_size = 10000; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3; - constexpr int estimated_compression_ratio = 4; - - // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - // 10kb) and the byte range size - - size_t const size_per_subchunk = - geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); + int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - std::for_each(sources.begin(), sources.end(), [](auto const& source) { - CUDF_EXPECTS(source->size() < std::numeric_limits::max(), - "The size of each source file must be less than INT_MAX bytes"); - }); - - constexpr size_t batch_size_ub = std::numeric_limits::max(); - size_t const chunk_offset = reader_opts.get_byte_range_offset(); + /* + * The batched JSON reader enforces that the size of each batch is at most INT_MAX + * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by + * chunk offset and chunk size - that may span across multiple source files. + * Note that the batched reader does not work for compressed inputs or for regular + * JSON inputs. + */ + size_t const total_source_size = sources_size(sources, 0, 0); + size_t chunk_offset = reader_opts.get_byte_range_offset(); size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size; - - // Identify the position of starting source file from which to begin batching based on - // byte range offset. If the offset is larger than the sum of all source - // sizes, then start_source is total number of source files i.e. no file is read - size_t const start_source = [&]() { - size_t sum = 0; + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + size_t const batch_size_ub = + std::numeric_limits::max() - (max_subchunks_prealloced * size_per_subchunk); + + /* + * Identify the position (zero-indexed) of starting source file from which to begin + * batching based on byte range offset. If the offset is larger than the sum of all + * source sizes, then start_source is total number of source files i.e. no file is + * read + */ + + // Prefix sum of source file sizes + size_t pref_source_size = 0; + // Starting source file from which to being batching evaluated using byte range offset + size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { - if (sum + sources[src_idx]->size() > chunk_offset) return src_idx; - sum += sources[src_idx]->size(); + if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } + pref_source_size += sources[src_idx]->size(); } return sources.size(); }(); - - // Construct batches of source files, with starting position of batches indicated by - // batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch - // is capped at INT_MAX bytes. - size_t cur_size = 0; - std::vector batch_positions; - std::vector batch_sizes; - batch_positions.push_back(0); - for (size_t i = start_source; i < sources.size(); i++) { - cur_size += sources[i]->size(); - if (cur_size >= batch_size_ub) { - batch_positions.push_back(i); - batch_sizes.push_back(cur_size - sources[i]->size()); - cur_size = sources[i]->size(); + /* + * Construct batches of byte ranges spanning source files, with the starting position of batches + * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current + * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading + * stops. + */ + size_t pref_bytes_size = chunk_offset; + size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + pref_source_size += sources[i]->size(); + // If the current source file can subsume multiple batches, we split the file until the + // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) + while (pref_bytes_size < end_bytes_size && + pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) { + auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size); + batch_offsets.push_back(batch_offsets.back() + next_batch_size); + pref_bytes_size += next_batch_size; } + i++; } - batch_positions.push_back(sources.size()); - batch_sizes.push_back(cur_size); - - // If there is a single batch, then we can directly return the table without the - // unnecessary concatenate - if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr); + /* + * If there is a single batch, then we can directly return the table without the + * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, + * or if end_bytes_size is larger than total_source_size. + */ + if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; - // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (size_t i = 0; i < batch_sizes.size(); i++) { - batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size)); - partial_tables.emplace_back(read_batch( - host_span>(sources.begin() + batch_positions[i], - batch_positions[i + 1] - batch_positions[i]), - batched_reader_opts, - stream, - rmm::mr::get_current_device_resource())); - if (chunk_size <= batch_sizes[i]) break; - chunk_size -= batch_sizes[i]; - batched_reader_opts.set_byte_range_offset(0); + for (size_t i = 0; i < batch_offsets.size() - 1; i++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[i]); + batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); + partial_tables.emplace_back( + read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource())); } auto expects_schema_equality = diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 0c30b4cad46..ff69f9b7627 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -29,6 +29,19 @@ namespace cudf::io::json::detail { +// Some magic numbers +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int estimated_compression_ratio = 4; +constexpr int max_subchunks_prealloced = 3; + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream); + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -38,9 +51,4 @@ size_type find_first_delimiter(device_span d_data, char const delimiter, rmm::cuda_stream_view stream); -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream); - } // namespace cudf::io::json::detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 8e2017ccb97..05e9759632f 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -313,17 +313,17 @@ ConfigureTest( PERCENT 30 ) ConfigureTest( - JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp + JSON_TEST io/json/json_test.cpp io/json/json_chunked_reader.cu GPUS 1 PERCENT 30 ) -ConfigureTest(JSON_WRITER_TEST io/json_writer.cpp) -ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) -ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) +ConfigureTest(JSON_WRITER_TEST io/json/json_writer.cpp) +ConfigureTest(JSON_TYPE_CAST_TEST io/json/json_type_cast_test.cu) +ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) -ConfigureTest(JSON_QUOTE_NORMALIZATION io/json_quote_normalization_test.cpp) -ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json_whitespace_normalization_test.cu) +ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp) +ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu) ConfigureTest( DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp GPUS 1 @@ -572,7 +572,7 @@ ConfigureTest( LARGE_STRINGS_TEST large_strings/concatenate_tests.cpp large_strings/case_tests.cpp - large_strings/json_tests.cpp + large_strings/json_tests.cu large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp large_strings/parquet_tests.cpp diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json/json_chunked_reader.cu similarity index 64% rename from cpp/tests/io/json_chunked_reader.cpp rename to cpp/tests/io/json/json_chunked_reader.cu index 23d54f7263c..b9dee54752c 100644 --- a/cpp/tests/io/json_chunked_reader.cpp +++ b/cpp/tests/io/json/json_chunked_reader.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "io/json/read_json.hpp" +#include "json_utils.cuh" #include #include @@ -37,65 +37,6 @@ cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -// function to extract first delimiter in the string in each chunk, -// collate together and form byte_range for each chunk, -// parse separately. -std::vector skeleton_for_parellel_chunk_reader( - cudf::host_span> sources, - cudf::io::json_reader_options const& reader_opts, - int32_t chunk_size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - using namespace cudf::io::json::detail; - using cudf::size_type; - size_t total_source_size = 0; - for (auto const& source : sources) { - total_source_size += source->size(); - } - size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; - constexpr size_type no_min_value = -1; - - // Get the first delimiter in each chunk. - std::vector first_delimiter_index(num_chunks); - auto reader_opts_chunk = reader_opts; - for (size_t i = 0; i < num_chunks; i++) { - auto const chunk_start = i * chunk_size; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_size); - first_delimiter_index[i] = - find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream); - if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; } - } - - // Process and allocate record start, end for each worker. - using record_range = std::pair; - std::vector record_ranges; - record_ranges.reserve(num_chunks); - first_delimiter_index[0] = 0; - auto prev = first_delimiter_index[0]; - for (size_t i = 1; i < num_chunks; i++) { - if (first_delimiter_index[i] == no_min_value) continue; - record_ranges.emplace_back(prev, first_delimiter_index[i]); - prev = first_delimiter_index[i]; - } - record_ranges.emplace_back(prev, total_source_size); - - std::vector tables; - // Process each chunk in parallel. - for (auto const& [chunk_start, chunk_end] : record_ranges) { - if (chunk_start == -1 or chunk_end == -1 or - static_cast(chunk_start) >= total_source_size) - continue; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); - } - // assume all records have same number of columns, and inferred same type. (or schema is passed) - // TODO a step before to merge all columns, types and infer final schema. - return tables; -} - TEST_F(JsonReaderTest, ByteRange_SingleSource) { std::string const json_string = R"( @@ -118,11 +59,11 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { @@ -213,11 +154,11 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json/json_quote_normalization_test.cpp similarity index 100% rename from cpp/tests/io/json_quote_normalization_test.cpp rename to cpp/tests/io/json/json_quote_normalization_test.cpp diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json/json_test.cpp similarity index 100% rename from cpp/tests/io/json_test.cpp rename to cpp/tests/io/json/json_test.cpp diff --git a/cpp/tests/io/json_tree.cpp b/cpp/tests/io/json/json_tree.cpp similarity index 100% rename from cpp/tests/io/json_tree.cpp rename to cpp/tests/io/json/json_tree.cpp diff --git a/cpp/tests/io/json_type_cast_test.cu b/cpp/tests/io/json/json_type_cast_test.cu similarity index 100% rename from cpp/tests/io/json_type_cast_test.cu rename to cpp/tests/io/json/json_type_cast_test.cu diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh new file mode 100644 index 00000000000..9383797d91b --- /dev/null +++ b/cpp/tests/io/json/json_utils.cuh @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "io/json/read_json.hpp" + +#include +#include +#include +#include + +#include + +#include + +// Helper function to test correctness of JSON byte range reading. +// We split the input source files into a set of byte range chunks each of size +// `chunk_size` and return an array of partial tables constructed from each chunk +template +std::vector split_byte_range_reading( + cudf::host_span> sources, + cudf::io::json_reader_options const& reader_opts, + IndexType chunk_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto total_source_size = [&sources]() { + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + auto const size = source->size(); + return sum + size; + }); + }(); + auto find_first_delimiter_in_chunk = + [total_source_size, &sources, &stream]( + cudf::io::json_reader_options const& reader_opts) -> IndexType { + rmm::device_uvector buffer(total_source_size, stream); + auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); + // Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the + // return type of that function is size_type. However, when the chunk_size is + // larger than INT_MAX, the position of the delimiter can also be larger than + // INT_MAX. We do not encounter this overflow error in the detail function + // since the batched JSON reader splits the byte_range_size into chunk_sizes + // smaller than INT_MAX bytes + auto const first_delimiter_position_it = + thrust::find(rmm::exec_policy(stream), readbufspan.begin(), readbufspan.end(), '\n'); + return first_delimiter_position_it != readbufspan.end() + ? thrust::distance(readbufspan.begin(), first_delimiter_position_it) + : -1; + }; + size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; + constexpr IndexType no_min_value = -1; + + // Get the first delimiter in each chunk. + std::vector first_delimiter_index(num_chunks); + auto reader_opts_chunk = reader_opts; + for (size_t i = 0; i < num_chunks; i++) { + auto const chunk_start = i * chunk_size; + // We are updating reader_opt_chunks to store offset and size information for the current chunk + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_size); + first_delimiter_index[i] = find_first_delimiter_in_chunk(reader_opts_chunk); + } + + // Process and allocate record start, end for each worker. + using record_range = std::pair; + std::vector record_ranges; + record_ranges.reserve(num_chunks); + size_t prev = 0; + for (size_t i = 1; i < num_chunks; i++) { + // In the case where chunk_size is smaller than row size, the chunk needs to be skipped + if (first_delimiter_index[i] == no_min_value) continue; + size_t next = static_cast(first_delimiter_index[i]) + (i * chunk_size); + record_ranges.emplace_back(prev, next); + prev = next; + } + record_ranges.emplace_back(prev, total_source_size); + + std::vector tables; + for (auto const& [chunk_start, chunk_end] : record_ranges) { + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + } + // assume all records have same number of columns, and inferred same type. (or schema is passed) + // TODO a step before to merge all columns, types and infer final schema. + return tables; +} diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json/json_whitespace_normalization_test.cu similarity index 100% rename from cpp/tests/io/json_whitespace_normalization_test.cu rename to cpp/tests/io/json/json_whitespace_normalization_test.cu diff --git a/cpp/tests/io/json_writer.cpp b/cpp/tests/io/json/json_writer.cpp similarity index 100% rename from cpp/tests/io/json_writer.cpp rename to cpp/tests/io/json/json_writer.cpp diff --git a/cpp/tests/io/nested_json_test.cpp b/cpp/tests/io/json/nested_json_test.cpp similarity index 100% rename from cpp/tests/io/nested_json_test.cpp rename to cpp/tests/io/json/nested_json_test.cpp diff --git a/cpp/tests/large_strings/json_tests.cpp b/cpp/tests/large_strings/json_tests.cu similarity index 50% rename from cpp/tests/large_strings/json_tests.cpp rename to cpp/tests/large_strings/json_tests.cu index bf16d131ba7..49abf7b484d 100644 --- a/cpp/tests/large_strings/json_tests.cpp +++ b/cpp/tests/large_strings/json_tests.cu @@ -14,8 +14,13 @@ * limitations under the License. */ +#include "../io/json/json_utils.cuh" #include "large_strings_fixture.hpp" +#include + +#include +#include #include #include @@ -28,31 +33,57 @@ TEST_F(JsonLargeReaderTest, MultiBatch) { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - constexpr size_t expected_file_size = std::numeric_limits::max() / 2; + constexpr size_t batch_size_ub = std::numeric_limits::max(); + constexpr size_t expected_file_size = 1.5 * static_cast(batch_size_ub); std::size_t const log_repetitions = static_cast(std::ceil(std::log2(expected_file_size / json_string.size()))); json_string.reserve(json_string.size() * (1UL << log_repetitions)); - std::size_t numrows = 4; for (std::size_t i = 0; i < log_repetitions; i++) { json_string += json_string; - numrows <<= 1; } constexpr int num_sources = 2; - std::vector> hostbufs( - num_sources, cudf::host_span(json_string.data(), json_string.size())); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{ - cudf::host_span>(hostbufs.data(), hostbufs.size())}) + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); - ASSERT_EQ(current_reader_table.tbl->num_rows(), numrows * num_sources); + + std::vector> datasources; + for (auto& hb : hostbufs) { + datasources.emplace_back(cudf::io::datasource::create(hb)); + } + // Test for different chunk sizes + std::vector chunk_sizes{ + batch_size_ub / 4, batch_size_ub / 2, batch_size_ub, static_cast(batch_size_ub * 2)}; + for (auto chunk_size : chunk_sizes) { + auto const tables = + split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } }