diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 7af90766ad0..d47266fdd12 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -333,14 +333,14 @@ class json_reader_options { * * @param offset Number of bytes of offset */ - void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; } + void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; } /** * @brief Set number of bytes to read. * * @param size Number of bytes to read */ - void set_byte_range_size(size_type size) { _byte_range_size = size; } + void set_byte_range_size(size_t size) { _byte_range_size = size; } /** * @brief Set delimiter separating records in JSON lines diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 9cd39038348..0ba4dedfc34 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -148,20 +148,12 @@ device_span ingest_raw_input(device_span buffer, return buffer.first(uncomp_data.size()); } -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream) +size_t estimate_size_per_subchunk(size_t chunk_size) { - auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1); - rmm::device_uvector buffer(total_source_size, stream); - auto readbufspan = ingest_raw_input(buffer, - sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - return find_first_delimiter(readbufspan, '\n', stream); + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); } /** @@ -183,7 +175,6 @@ datasource::owning_buffer> get_record_range_raw_input( rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; @@ -198,17 +189,8 @@ datasource::owning_buffer> get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - // Some magic numbers - constexpr int num_subchunks = 10; // per chunk_size - constexpr size_t min_subchunk_size = 10000; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3; - constexpr int estimated_compression_ratio = 4; - - // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - // 10kb) and the byte range size - - size_t const size_per_subchunk = - geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); + int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - std::for_each(sources.begin(), sources.end(), [](auto const& source) { - CUDF_EXPECTS(source->size() < std::numeric_limits::max(), - "The size of each source file must be less than INT_MAX bytes"); - }); - - constexpr size_t batch_size_ub = std::numeric_limits::max(); - size_t const chunk_offset = reader_opts.get_byte_range_offset(); + /* + * The batched JSON reader enforces that the size of each batch is at most INT_MAX + * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by + * chunk offset and chunk size - that may span across multiple source files. + * Note that the batched reader does not work for compressed inputs or for regular + * JSON inputs. + */ + size_t const total_source_size = sources_size(sources, 0, 0); + size_t chunk_offset = reader_opts.get_byte_range_offset(); size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size; - - // Identify the position of starting source file from which to begin batching based on - // byte range offset. If the offset is larger than the sum of all source - // sizes, then start_source is total number of source files i.e. no file is read - size_t const start_source = [&]() { - size_t sum = 0; + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + size_t const batch_size_ub = + std::numeric_limits::max() - (max_subchunks_prealloced * size_per_subchunk); + + /* + * Identify the position (zero-indexed) of starting source file from which to begin + * batching based on byte range offset. If the offset is larger than the sum of all + * source sizes, then start_source is total number of source files i.e. no file is + * read + */ + + // Prefix sum of source file sizes + size_t pref_source_size = 0; + // Starting source file from which to being batching evaluated using byte range offset + size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { - if (sum + sources[src_idx]->size() > chunk_offset) return src_idx; - sum += sources[src_idx]->size(); + if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } + pref_source_size += sources[src_idx]->size(); } return sources.size(); }(); - - // Construct batches of source files, with starting position of batches indicated by - // batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch - // is capped at INT_MAX bytes. - size_t cur_size = 0; - std::vector batch_positions; - std::vector batch_sizes; - batch_positions.push_back(0); - for (size_t i = start_source; i < sources.size(); i++) { - cur_size += sources[i]->size(); - if (cur_size >= batch_size_ub) { - batch_positions.push_back(i); - batch_sizes.push_back(cur_size - sources[i]->size()); - cur_size = sources[i]->size(); + /* + * Construct batches of byte ranges spanning source files, with the starting position of batches + * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current + * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading + * stops. + */ + size_t pref_bytes_size = chunk_offset; + size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + pref_source_size += sources[i]->size(); + // If the current source file can subsume multiple batches, we split the file until the + // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) + while (pref_bytes_size < end_bytes_size && + pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) { + auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size); + batch_offsets.push_back(batch_offsets.back() + next_batch_size); + pref_bytes_size += next_batch_size; } + i++; } - batch_positions.push_back(sources.size()); - batch_sizes.push_back(cur_size); - - // If there is a single batch, then we can directly return the table without the - // unnecessary concatenate - if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr); + /* + * If there is a single batch, then we can directly return the table without the + * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, + * or if end_bytes_size is larger than total_source_size. + */ + if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; - // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (size_t i = 0; i < batch_sizes.size(); i++) { - batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size)); - partial_tables.emplace_back(read_batch( - host_span>(sources.begin() + batch_positions[i], - batch_positions[i + 1] - batch_positions[i]), - batched_reader_opts, - stream, - rmm::mr::get_current_device_resource())); - if (chunk_size <= batch_sizes[i]) break; - chunk_size -= batch_sizes[i]; - batched_reader_opts.set_byte_range_offset(0); + for (size_t i = 0; i < batch_offsets.size() - 1; i++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[i]); + batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); + partial_tables.emplace_back( + read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource())); } auto expects_schema_equality = diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 0c30b4cad46..ff69f9b7627 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -29,6 +29,19 @@ namespace cudf::io::json::detail { +// Some magic numbers +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int estimated_compression_ratio = 4; +constexpr int max_subchunks_prealloced = 3; + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream); + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -38,9 +51,4 @@ size_type find_first_delimiter(device_span d_data, char const delimiter, rmm::cuda_stream_view stream); -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream); - } // namespace cudf::io::json::detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 8e2017ccb97..05e9759632f 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -313,17 +313,17 @@ ConfigureTest( PERCENT 30 ) ConfigureTest( - JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp + JSON_TEST io/json/json_test.cpp io/json/json_chunked_reader.cu GPUS 1 PERCENT 30 ) -ConfigureTest(JSON_WRITER_TEST io/json_writer.cpp) -ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) -ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) +ConfigureTest(JSON_WRITER_TEST io/json/json_writer.cpp) +ConfigureTest(JSON_TYPE_CAST_TEST io/json/json_type_cast_test.cu) +ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) -ConfigureTest(JSON_QUOTE_NORMALIZATION io/json_quote_normalization_test.cpp) -ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json_whitespace_normalization_test.cu) +ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp) +ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu) ConfigureTest( DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp GPUS 1 @@ -572,7 +572,7 @@ ConfigureTest( LARGE_STRINGS_TEST large_strings/concatenate_tests.cpp large_strings/case_tests.cpp - large_strings/json_tests.cpp + large_strings/json_tests.cu large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp large_strings/parquet_tests.cpp diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json/json_chunked_reader.cu similarity index 64% rename from cpp/tests/io/json_chunked_reader.cpp rename to cpp/tests/io/json/json_chunked_reader.cu index 23d54f7263c..b9dee54752c 100644 --- a/cpp/tests/io/json_chunked_reader.cpp +++ b/cpp/tests/io/json/json_chunked_reader.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "io/json/read_json.hpp" +#include "json_utils.cuh" #include #include @@ -37,65 +37,6 @@ cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -// function to extract first delimiter in the string in each chunk, -// collate together and form byte_range for each chunk, -// parse separately. -std::vector skeleton_for_parellel_chunk_reader( - cudf::host_span> sources, - cudf::io::json_reader_options const& reader_opts, - int32_t chunk_size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - using namespace cudf::io::json::detail; - using cudf::size_type; - size_t total_source_size = 0; - for (auto const& source : sources) { - total_source_size += source->size(); - } - size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; - constexpr size_type no_min_value = -1; - - // Get the first delimiter in each chunk. - std::vector first_delimiter_index(num_chunks); - auto reader_opts_chunk = reader_opts; - for (size_t i = 0; i < num_chunks; i++) { - auto const chunk_start = i * chunk_size; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_size); - first_delimiter_index[i] = - find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream); - if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; } - } - - // Process and allocate record start, end for each worker. - using record_range = std::pair; - std::vector record_ranges; - record_ranges.reserve(num_chunks); - first_delimiter_index[0] = 0; - auto prev = first_delimiter_index[0]; - for (size_t i = 1; i < num_chunks; i++) { - if (first_delimiter_index[i] == no_min_value) continue; - record_ranges.emplace_back(prev, first_delimiter_index[i]); - prev = first_delimiter_index[i]; - } - record_ranges.emplace_back(prev, total_source_size); - - std::vector tables; - // Process each chunk in parallel. - for (auto const& [chunk_start, chunk_end] : record_ranges) { - if (chunk_start == -1 or chunk_end == -1 or - static_cast(chunk_start) >= total_source_size) - continue; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); - } - // assume all records have same number of columns, and inferred same type. (or schema is passed) - // TODO a step before to merge all columns, types and infer final schema. - return tables; -} - TEST_F(JsonReaderTest, ByteRange_SingleSource) { std::string const json_string = R"( @@ -118,11 +59,11 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { @@ -213,11 +154,11 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json/json_quote_normalization_test.cpp similarity index 100% rename from cpp/tests/io/json_quote_normalization_test.cpp rename to cpp/tests/io/json/json_quote_normalization_test.cpp diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json/json_test.cpp similarity index 100% rename from cpp/tests/io/json_test.cpp rename to cpp/tests/io/json/json_test.cpp diff --git a/cpp/tests/io/json_tree.cpp b/cpp/tests/io/json/json_tree.cpp similarity index 100% rename from cpp/tests/io/json_tree.cpp rename to cpp/tests/io/json/json_tree.cpp diff --git a/cpp/tests/io/json_type_cast_test.cu b/cpp/tests/io/json/json_type_cast_test.cu similarity index 100% rename from cpp/tests/io/json_type_cast_test.cu rename to cpp/tests/io/json/json_type_cast_test.cu diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh new file mode 100644 index 00000000000..9383797d91b --- /dev/null +++ b/cpp/tests/io/json/json_utils.cuh @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "io/json/read_json.hpp" + +#include +#include +#include +#include + +#include + +#include + +// Helper function to test correctness of JSON byte range reading. +// We split the input source files into a set of byte range chunks each of size +// `chunk_size` and return an array of partial tables constructed from each chunk +template +std::vector split_byte_range_reading( + cudf::host_span> sources, + cudf::io::json_reader_options const& reader_opts, + IndexType chunk_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto total_source_size = [&sources]() { + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + auto const size = source->size(); + return sum + size; + }); + }(); + auto find_first_delimiter_in_chunk = + [total_source_size, &sources, &stream]( + cudf::io::json_reader_options const& reader_opts) -> IndexType { + rmm::device_uvector buffer(total_source_size, stream); + auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); + // Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the + // return type of that function is size_type. However, when the chunk_size is + // larger than INT_MAX, the position of the delimiter can also be larger than + // INT_MAX. We do not encounter this overflow error in the detail function + // since the batched JSON reader splits the byte_range_size into chunk_sizes + // smaller than INT_MAX bytes + auto const first_delimiter_position_it = + thrust::find(rmm::exec_policy(stream), readbufspan.begin(), readbufspan.end(), '\n'); + return first_delimiter_position_it != readbufspan.end() + ? thrust::distance(readbufspan.begin(), first_delimiter_position_it) + : -1; + }; + size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; + constexpr IndexType no_min_value = -1; + + // Get the first delimiter in each chunk. + std::vector first_delimiter_index(num_chunks); + auto reader_opts_chunk = reader_opts; + for (size_t i = 0; i < num_chunks; i++) { + auto const chunk_start = i * chunk_size; + // We are updating reader_opt_chunks to store offset and size information for the current chunk + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_size); + first_delimiter_index[i] = find_first_delimiter_in_chunk(reader_opts_chunk); + } + + // Process and allocate record start, end for each worker. + using record_range = std::pair; + std::vector record_ranges; + record_ranges.reserve(num_chunks); + size_t prev = 0; + for (size_t i = 1; i < num_chunks; i++) { + // In the case where chunk_size is smaller than row size, the chunk needs to be skipped + if (first_delimiter_index[i] == no_min_value) continue; + size_t next = static_cast(first_delimiter_index[i]) + (i * chunk_size); + record_ranges.emplace_back(prev, next); + prev = next; + } + record_ranges.emplace_back(prev, total_source_size); + + std::vector tables; + for (auto const& [chunk_start, chunk_end] : record_ranges) { + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + } + // assume all records have same number of columns, and inferred same type. (or schema is passed) + // TODO a step before to merge all columns, types and infer final schema. + return tables; +} diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json/json_whitespace_normalization_test.cu similarity index 100% rename from cpp/tests/io/json_whitespace_normalization_test.cu rename to cpp/tests/io/json/json_whitespace_normalization_test.cu diff --git a/cpp/tests/io/json_writer.cpp b/cpp/tests/io/json/json_writer.cpp similarity index 100% rename from cpp/tests/io/json_writer.cpp rename to cpp/tests/io/json/json_writer.cpp diff --git a/cpp/tests/io/nested_json_test.cpp b/cpp/tests/io/json/nested_json_test.cpp similarity index 100% rename from cpp/tests/io/nested_json_test.cpp rename to cpp/tests/io/json/nested_json_test.cpp diff --git a/cpp/tests/large_strings/json_tests.cpp b/cpp/tests/large_strings/json_tests.cu similarity index 50% rename from cpp/tests/large_strings/json_tests.cpp rename to cpp/tests/large_strings/json_tests.cu index bf16d131ba7..49abf7b484d 100644 --- a/cpp/tests/large_strings/json_tests.cpp +++ b/cpp/tests/large_strings/json_tests.cu @@ -14,8 +14,13 @@ * limitations under the License. */ +#include "../io/json/json_utils.cuh" #include "large_strings_fixture.hpp" +#include + +#include +#include #include #include @@ -28,31 +33,57 @@ TEST_F(JsonLargeReaderTest, MultiBatch) { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - constexpr size_t expected_file_size = std::numeric_limits::max() / 2; + constexpr size_t batch_size_ub = std::numeric_limits::max(); + constexpr size_t expected_file_size = 1.5 * static_cast(batch_size_ub); std::size_t const log_repetitions = static_cast(std::ceil(std::log2(expected_file_size / json_string.size()))); json_string.reserve(json_string.size() * (1UL << log_repetitions)); - std::size_t numrows = 4; for (std::size_t i = 0; i < log_repetitions; i++) { json_string += json_string; - numrows <<= 1; } constexpr int num_sources = 2; - std::vector> hostbufs( - num_sources, cudf::host_span(json_string.data(), json_string.size())); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{ - cudf::host_span>(hostbufs.data(), hostbufs.size())}) + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); - ASSERT_EQ(current_reader_table.tbl->num_rows(), numrows * num_sources); + + std::vector> datasources; + for (auto& hb : hostbufs) { + datasources.emplace_back(cudf::io::datasource::create(hb)); + } + // Test for different chunk sizes + std::vector chunk_sizes{ + batch_size_ub / 4, batch_size_ub / 2, batch_size_ub, static_cast(batch_size_ub * 2)}; + for (auto chunk_size : chunk_sizes) { + auto const tables = + split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } } diff --git a/python/cudf/cudf/_lib/lists.pyx b/python/cudf/cudf/_lib/lists.pyx index 76f37c3b845..50061f6e468 100644 --- a/python/cudf/cudf/_lib/lists.pyx +++ b/python/cudf/cudf/_lib/lists.pyx @@ -11,9 +11,6 @@ from cudf._lib.pylibcudf.libcudf.column.column cimport column from cudf._lib.pylibcudf.libcudf.lists.lists_column_view cimport ( lists_column_view, ) -from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( - sort_lists as cpp_sort_lists, -) from cudf._lib.pylibcudf.libcudf.lists.stream_compaction cimport ( distinct as cpp_distinct, ) @@ -21,7 +18,6 @@ from cudf._lib.pylibcudf.libcudf.types cimport ( nan_equality, null_equality, null_order, - order, size_type, ) from cudf._lib.utils cimport columns_from_pylibcudf_table @@ -80,24 +76,14 @@ def distinct(Column col, bool nulls_equal, bool nans_all_equal): @acquire_spill_lock() def sort_lists(Column col, bool ascending, str na_position): - cdef shared_ptr[lists_column_view] list_view = ( - make_shared[lists_column_view](col.view()) - ) - cdef order c_sort_order = ( - order.ASCENDING if ascending else order.DESCENDING - ) - cdef null_order c_null_prec = ( - null_order.BEFORE if na_position == "first" else null_order.AFTER - ) - - cdef unique_ptr[column] c_result - - with nogil: - c_result = move( - cpp_sort_lists(list_view.get()[0], c_sort_order, c_null_prec) + return Column.from_pylibcudf( + pylibcudf.lists.sort_lists( + col.to_pylibcudf(mode="read"), + ascending, + null_order.BEFORE if na_position == "first" else null_order.AFTER, + False, ) - - return Column.from_unique_ptr(move(c_result)) + ) @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd index 145ab41302f..337ac73908b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd @@ -15,3 +15,9 @@ cdef extern from "cudf/lists/sorting.hpp" namespace "cudf::lists" nogil: order column_order, null_order null_precedence ) except + + + cdef unique_ptr[column] stable_sort_lists( + const lists_column_view source_column, + order column_order, + null_order null_precedence + ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pxd b/python/cudf/cudf/_lib/pylibcudf/lists.pxd index 7e4262c466b..6e9bd5ff76b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pxd @@ -2,7 +2,7 @@ from libcpp cimport bool -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, size_type from .column cimport Column from .scalar cimport Scalar @@ -37,3 +37,5 @@ cpdef Column extract_list_element(Column, ColumnOrSizeType) cpdef Column count_elements(Column) cpdef Column sequences(Column, Column, Column steps = *) + +cpdef Column sort_lists(Column, bool, null_order, bool stable = *) diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pyx b/python/cudf/cudf/_lib/pylibcudf/lists.pyx index a2e17c3f351..3837eaaca78 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pyx @@ -24,8 +24,12 @@ from cudf._lib.pylibcudf.libcudf.lists.count_elements cimport ( from cudf._lib.pylibcudf.libcudf.lists.extract cimport ( extract_list_element as cpp_extract_list_element, ) +from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( + sort_lists as cpp_sort_lists, + stable_sort_lists as cpp_stable_sort_lists, +) from cudf._lib.pylibcudf.libcudf.table.table cimport table -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, order, size_type from cudf._lib.pylibcudf.lists cimport ColumnOrScalar, ColumnOrSizeType from .column cimport Column, ListColumnView @@ -359,3 +363,53 @@ cpdef Column sequences(Column starts, Column sizes, Column steps = None): sizes.view(), )) return Column.from_libcudf(move(c_result)) + +cpdef Column sort_lists( + Column input, + bool ascending, + null_order na_position, + bool stable = False +): + """Sort the elements within a list in each row of a list column. + + For details, see :cpp:func:`sort_lists`. + + Parameters + ---------- + input : Column + The input column. + ascending : bool + If true, the sort order is ascending. Otherwise, the sort order is descending. + na_position : NullOrder + If na_position equals NullOrder.FIRST, then the null values in the output + column are placed first. Otherwise, they are be placed after. + stable: bool + If true :cpp:func:`stable_sort_lists` is used, Otherwise, + :cpp:func:`sort_lists` is used. + + Returns + ------- + Column + A new Column with elements in each list sorted. + """ + cdef unique_ptr[column] c_result + cdef ListColumnView list_view = input.list_view() + + cdef order c_sort_order = ( + order.ASCENDING if ascending else order.DESCENDING + ) + + with nogil: + if stable: + c_result = move(cpp_stable_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + else: + c_result = move(cpp_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + return Column.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 288bdfd39b3..1d7136e61e3 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4525,7 +4525,6 @@ def apply( If False, the funcs will be passed the whole Series at once. Currently not supported. - engine : {'python', 'numba'}, default 'python' Unused. Added for compatibility with pandas. engine_kwargs : dict diff --git a/python/cudf/cudf/pylibcudf_tests/test_lists.py b/python/cudf/cudf/pylibcudf_tests/test_lists.py index 69acf78ea07..0b2e0e00ce8 100644 --- a/python/cudf/cudf/pylibcudf_tests/test_lists.py +++ b/python/cudf/cudf/pylibcudf_tests/test_lists.py @@ -22,6 +22,11 @@ def column(): return pa.array([3, 2, 5, 6]), pa.array([-1, 0, 0, 0], type=pa.int32()) +@pytest.fixture +def lists_column(): + return [[4, 2, 3, 1], [1, 2, None, 4], [-10, 10, 10, 0]] + + def test_concatenate_rows(test_data): arrow_tbl = pa.Table.from_arrays(test_data[0], names=["a", "b"]) plc_tbl = plc.interop.from_arrow(arrow_tbl) @@ -207,3 +212,44 @@ def test_sequences(): assert_column_eq(expect1, res1) assert_column_eq(expect2, res2) + + +@pytest.mark.parametrize( + "ascending,na_position,expected", + [ + ( + True, + plc.types.NullOrder.BEFORE, + [[1, 2, 3, 4], [None, 1, 2, 4], [-10, 0, 10, 10]], + ), + ( + True, + plc.types.NullOrder.AFTER, + [[1, 2, 3, 4], [1, 2, 4, None], [-10, 0, 10, 10]], + ), + ( + False, + plc.types.NullOrder.BEFORE, + [[4, 3, 2, 1], [4, 2, 1, None], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ], +) +def test_sort_lists(lists_column, ascending, na_position, expected): + plc_column = plc.interop.from_arrow(pa.array(lists_column)) + res = plc.lists.sort_lists(plc_column, ascending, na_position, False) + res_stable = plc.lists.sort_lists(plc_column, ascending, na_position, True) + + expect = pa.array(expected) + + assert_column_eq(expect, res) + assert_column_eq(expect, res_stable)