diff --git a/cpp/include/cudf/dictionary/dictionary_factories.hpp b/cpp/include/cudf/dictionary/dictionary_factories.hpp index 7cdfa3bf9e5..21f593e1aec 100644 --- a/cpp/include/cudf/dictionary/dictionary_factories.hpp +++ b/cpp/include/cudf/dictionary/dictionary_factories.hpp @@ -87,12 +87,17 @@ std::unique_ptr make_dictionary_column( * @param indices_column Indices to use for the new dictionary column. * @param null_mask Null mask for the output column. * @param null_count Number of nulls for the output column. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. * @return New dictionary column. */ -std::unique_ptr make_dictionary_column(std::unique_ptr keys_column, - std::unique_ptr indices_column, - rmm::device_buffer&& null_mask, - size_type null_count); +std::unique_ptr make_dictionary_column( + std::unique_ptr keys_column, + std::unique_ptr indices_column, + rmm::device_buffer&& null_mask, + size_type null_count, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** * @brief Construct a dictionary column by taking ownership of the provided keys diff --git a/cpp/include/cudf/io/json.hpp b/cpp/include/cudf/io/json.hpp index 7af90766ad0..d47266fdd12 100644 --- a/cpp/include/cudf/io/json.hpp +++ b/cpp/include/cudf/io/json.hpp @@ -333,14 +333,14 @@ class json_reader_options { * * @param offset Number of bytes of offset */ - void set_byte_range_offset(size_type offset) { _byte_range_offset = offset; } + void set_byte_range_offset(size_t offset) { _byte_range_offset = offset; } /** * @brief Set number of bytes to read. * * @param size Number of bytes to read */ - void set_byte_range_size(size_type size) { _byte_range_size = size; } + void set_byte_range_size(size_t size) { _byte_range_size = size; } /** * @brief Set delimiter separating records in JSON lines diff --git a/cpp/include/cudf/lists/explode.hpp b/cpp/include/cudf/lists/explode.hpp index 81d82dcfa09..303f182ce8c 100644 --- a/cpp/include/cudf/lists/explode.hpp +++ b/cpp/include/cudf/lists/explode.hpp @@ -66,6 +66,7 @@ namespace cudf { * * @param input_table Table to explode. * @param explode_column_idx Column index to explode inside the table. + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * * @return A new table with explode_col exploded. @@ -73,6 +74,7 @@ namespace cudf { std::unique_ptr explode( table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -109,6 +111,7 @@ std::unique_ptr
explode( * * @param input_table Table to explode. * @param explode_column_idx Column index to explode inside the table. + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * * @return A new table with exploded value and position. The column order of return table is @@ -117,6 +120,7 @@ std::unique_ptr
explode( std::unique_ptr
explode_position( table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -152,6 +156,7 @@ std::unique_ptr
explode_position( * * @param input_table Table to explode. * @param explode_column_idx Column index to explode inside the table. + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * * @return A new table with explode_col exploded. @@ -159,6 +164,7 @@ std::unique_ptr
explode_position( std::unique_ptr
explode_outer( table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @@ -196,6 +202,7 @@ std::unique_ptr
explode_outer( * * @param input_table Table to explode. * @param explode_column_idx Column index to explode inside the table. + * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned column's device memory. * * @return A new table with explode_col exploded. @@ -203,6 +210,7 @@ std::unique_ptr
explode_outer( std::unique_ptr
explode_outer_position( table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); /** @} */ // end of group diff --git a/cpp/include/cudf/lists/set_operations.hpp b/cpp/include/cudf/lists/set_operations.hpp index b8abfd62461..871e66b2d83 100644 --- a/cpp/include/cudf/lists/set_operations.hpp +++ b/cpp/include/cudf/lists/set_operations.hpp @@ -53,8 +53,8 @@ namespace cudf::lists { * @param nulls_equal Flag to specify whether null elements should be considered as equal, default * to be `UNEQUAL` which means only non-null elements are checked for overlapping * @param nans_equal Flag to specify whether floating-point NaNs should be considered as equal - * @param mr Device memory resource used to allocate the returned object * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned object * @return A column of type BOOL containing the check results */ std::unique_ptr have_overlap( diff --git a/cpp/include/cudf_test/stream_checking_resource_adaptor.hpp b/cpp/include/cudf_test/stream_checking_resource_adaptor.hpp index 5a077e86a0f..4f3c723d195 100644 --- a/cpp/include/cudf_test/stream_checking_resource_adaptor.hpp +++ b/cpp/include/cudf_test/stream_checking_resource_adaptor.hpp @@ -24,13 +24,11 @@ #include +namespace cudf::test { + /** * @brief Resource that verifies that the default stream is not used in any allocation. - * - * @tparam Upstream Type of the upstream resource used for - * allocation/deallocation. */ -template class stream_checking_resource_adaptor final : public rmm::mr::device_memory_resource { public: /** @@ -40,14 +38,13 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res * * @param upstream The resource used for allocating/deallocating device memory */ - stream_checking_resource_adaptor(Upstream* upstream, + stream_checking_resource_adaptor(rmm::device_async_resource_ref upstream, bool error_on_invalid_stream, bool check_default_stream) : upstream_{upstream}, error_on_invalid_stream_{error_on_invalid_stream}, check_default_stream_{check_default_stream} { - CUDF_EXPECTS(nullptr != upstream, "Unexpected null upstream resource pointer."); } stream_checking_resource_adaptor() = delete; @@ -86,7 +83,7 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res void* do_allocate(std::size_t bytes, rmm::cuda_stream_view stream) override { verify_stream(stream); - return upstream_->allocate(bytes, stream); + return upstream_.allocate_async(bytes, rmm::CUDA_ALLOCATION_ALIGNMENT, stream); } /** @@ -101,7 +98,7 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res void do_deallocate(void* ptr, std::size_t bytes, rmm::cuda_stream_view stream) override { verify_stream(stream); - upstream_->deallocate(ptr, bytes, stream); + upstream_.deallocate_async(ptr, bytes, rmm::CUDA_ALLOCATION_ALIGNMENT, stream); } /** @@ -113,8 +110,8 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res [[nodiscard]] bool do_is_equal(device_memory_resource const& other) const noexcept override { if (this == &other) { return true; } - auto cast = dynamic_cast const*>(&other); - if (cast == nullptr) { return upstream_->is_equal(other); } + auto cast = dynamic_cast(&other); + if (cast == nullptr) { return false; } return get_upstream_resource() == cast->get_upstream_resource(); } @@ -150,7 +147,8 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res } } - Upstream* upstream_; // the upstream resource used for satisfying allocation requests + rmm::device_async_resource_ref + upstream_; // the upstream resource used for satisfying allocation requests bool error_on_invalid_stream_; // If true, throw an exception when the wrong stream is detected. // If false, simply print to stdout. bool check_default_stream_; // If true, throw an exception when the default stream is observed. @@ -162,13 +160,12 @@ class stream_checking_resource_adaptor final : public rmm::mr::device_memory_res * @brief Convenience factory to return a `stream_checking_resource_adaptor` around the * upstream resource `upstream`. * - * @tparam Upstream Type of the upstream `device_memory_resource`. - * @param upstream Pointer to the upstream resource + * @param upstream Reference to the upstream resource */ -template -stream_checking_resource_adaptor make_stream_checking_resource_adaptor( - Upstream* upstream, bool error_on_invalid_stream, bool check_default_stream) +inline stream_checking_resource_adaptor make_stream_checking_resource_adaptor( + rmm::device_async_resource_ref upstream, bool error_on_invalid_stream, bool check_default_stream) { - return stream_checking_resource_adaptor{ - upstream, error_on_invalid_stream, check_default_stream}; + return stream_checking_resource_adaptor{upstream, error_on_invalid_stream, check_default_stream}; } + +} // namespace cudf::test diff --git a/cpp/include/cudf_test/testing_main.hpp b/cpp/include/cudf_test/testing_main.hpp index 66b831b917f..3ad4b127f80 100644 --- a/cpp/include/cudf_test/testing_main.hpp +++ b/cpp/include/cudf_test/testing_main.hpp @@ -32,8 +32,7 @@ #include #include -namespace cudf { -namespace test { +namespace cudf::test { /// MR factory functions inline auto make_cuda() { return std::make_shared(); } @@ -91,8 +90,7 @@ inline std::shared_ptr create_memory_resource( CUDF_FAIL("Invalid RMM allocation mode: " + allocation_mode); } -} // namespace test -} // namespace cudf +} // namespace cudf::test /** * @brief Parses the cuDF test command line options. @@ -182,8 +180,8 @@ inline auto make_stream_mode_adaptor(cxxopts::ParseResult const& cmd_opts) auto const stream_error_mode = cmd_opts["stream_error_mode"].as(); auto const error_on_invalid_stream = (stream_error_mode == "error"); auto const check_default_stream = (stream_mode == "new_cudf_default"); - auto adaptor = - make_stream_checking_resource_adaptor(resource, error_on_invalid_stream, check_default_stream); + auto adaptor = cudf::test::make_stream_checking_resource_adaptor( + resource, error_on_invalid_stream, check_default_stream); if ((stream_mode == "new_cudf_default") || (stream_mode == "new_testing_default")) { rmm::mr::set_current_device_resource(&adaptor); } diff --git a/cpp/src/dictionary/dictionary_factories.cu b/cpp/src/dictionary/dictionary_factories.cu index 37f8fa7a05b..0617d71fa51 100644 --- a/cpp/src/dictionary/dictionary_factories.cu +++ b/cpp/src/dictionary/dictionary_factories.cu @@ -77,7 +77,9 @@ std::unique_ptr make_dictionary_column(column_view const& keys_column, std::unique_ptr make_dictionary_column(std::unique_ptr keys_column, std::unique_ptr indices_column, rmm::device_buffer&& null_mask, - size_type null_count) + size_type null_count, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { CUDF_EXPECTS(!keys_column->has_nulls(), "keys column must not have nulls"); CUDF_EXPECTS(!indices_column->has_nulls(), "indices column must not have nulls"); @@ -89,7 +91,7 @@ std::unique_ptr make_dictionary_column(std::unique_ptr keys_colu children.emplace_back(std::move(keys_column)); return std::make_unique(data_type{type_id::DICTIONARY32}, count, - rmm::device_buffer{}, + rmm::device_buffer{0, stream, mr}, std::move(null_mask), null_count, std::move(children)); @@ -134,8 +136,11 @@ std::unique_ptr make_dictionary_column(std::unique_ptr keys, auto indices_column = [&] { // If the types match, then just commandeer the column's data buffer. if (new_type.id() == indices_type) { - return std::make_unique( - new_type, indices_size, std::move(*(contents.data.release())), rmm::device_buffer{}, 0); + return std::make_unique(new_type, + indices_size, + std::move(*(contents.data.release())), + rmm::device_buffer{0, stream, mr}, + 0); } // If the new type does not match, then convert the data. cudf::column_view cast_view{ diff --git a/cpp/src/io/json/read_json.cu b/cpp/src/io/json/read_json.cu index 9cd39038348..0ba4dedfc34 100644 --- a/cpp/src/io/json/read_json.cu +++ b/cpp/src/io/json/read_json.cu @@ -148,20 +148,12 @@ device_span ingest_raw_input(device_span buffer, return buffer.first(uncomp_data.size()); } -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream) +size_t estimate_size_per_subchunk(size_t chunk_size) { - auto total_source_size = sources_size(sources, 0, 0) + (sources.size() - 1); - rmm::device_uvector buffer(total_source_size, stream); - auto readbufspan = ingest_raw_input(buffer, - sources, - reader_opts.get_compression(), - reader_opts.get_byte_range_offset(), - reader_opts.get_byte_range_size(), - stream); - return find_first_delimiter(readbufspan, '\n', stream); + auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; + // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to + // 10kb) and the byte range size + return geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); } /** @@ -183,7 +175,6 @@ datasource::owning_buffer> get_record_range_raw_input( rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); - auto geometric_mean = [](double a, double b) { return std::sqrt(a * b); }; size_t const total_source_size = sources_size(sources, 0, 0); auto constexpr num_delimiter_chars = 1; @@ -198,17 +189,8 @@ datasource::owning_buffer> get_record_range_raw_input( auto should_load_all_sources = !chunk_size || chunk_size >= total_source_size - chunk_offset; chunk_size = should_load_all_sources ? total_source_size - chunk_offset : chunk_size; - // Some magic numbers - constexpr int num_subchunks = 10; // per chunk_size - constexpr size_t min_subchunk_size = 10000; - int const num_subchunks_prealloced = should_load_all_sources ? 0 : 3; - constexpr int estimated_compression_ratio = 4; - - // NOTE: heuristic for choosing subchunk size: geometric mean of minimum subchunk size (set to - // 10kb) and the byte range size - - size_t const size_per_subchunk = - geometric_mean(std::ceil((double)chunk_size / num_subchunks), min_subchunk_size); + int const num_subchunks_prealloced = should_load_all_sources ? 0 : max_subchunks_prealloced; + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); // The allocation for single source compressed input is estimated by assuming a ~4:1 // compression ratio. For uncompressed inputs, we can getter a better estimate using the idea @@ -308,67 +290,78 @@ table_with_metadata read_json(host_span> sources, "Multiple inputs are supported only for JSON Lines format"); } - std::for_each(sources.begin(), sources.end(), [](auto const& source) { - CUDF_EXPECTS(source->size() < std::numeric_limits::max(), - "The size of each source file must be less than INT_MAX bytes"); - }); - - constexpr size_t batch_size_ub = std::numeric_limits::max(); - size_t const chunk_offset = reader_opts.get_byte_range_offset(); + /* + * The batched JSON reader enforces that the size of each batch is at most INT_MAX + * bytes (~2.14GB). Batches are defined to be byte range chunks - characterized by + * chunk offset and chunk size - that may span across multiple source files. + * Note that the batched reader does not work for compressed inputs or for regular + * JSON inputs. + */ + size_t const total_source_size = sources_size(sources, 0, 0); + size_t chunk_offset = reader_opts.get_byte_range_offset(); size_t chunk_size = reader_opts.get_byte_range_size(); - chunk_size = !chunk_size ? sources_size(sources, 0, 0) : chunk_size; - - // Identify the position of starting source file from which to begin batching based on - // byte range offset. If the offset is larger than the sum of all source - // sizes, then start_source is total number of source files i.e. no file is read - size_t const start_source = [&]() { - size_t sum = 0; + chunk_size = !chunk_size ? total_source_size - chunk_offset + : std::min(chunk_size, total_source_size - chunk_offset); + + size_t const size_per_subchunk = estimate_size_per_subchunk(chunk_size); + size_t const batch_size_ub = + std::numeric_limits::max() - (max_subchunks_prealloced * size_per_subchunk); + + /* + * Identify the position (zero-indexed) of starting source file from which to begin + * batching based on byte range offset. If the offset is larger than the sum of all + * source sizes, then start_source is total number of source files i.e. no file is + * read + */ + + // Prefix sum of source file sizes + size_t pref_source_size = 0; + // Starting source file from which to being batching evaluated using byte range offset + size_t const start_source = [chunk_offset, &sources, &pref_source_size]() { for (size_t src_idx = 0; src_idx < sources.size(); ++src_idx) { - if (sum + sources[src_idx]->size() > chunk_offset) return src_idx; - sum += sources[src_idx]->size(); + if (pref_source_size + sources[src_idx]->size() > chunk_offset) { return src_idx; } + pref_source_size += sources[src_idx]->size(); } return sources.size(); }(); - - // Construct batches of source files, with starting position of batches indicated by - // batch_positions. The size of each batch i.e. the sum of sizes of the source files in the batch - // is capped at INT_MAX bytes. - size_t cur_size = 0; - std::vector batch_positions; - std::vector batch_sizes; - batch_positions.push_back(0); - for (size_t i = start_source; i < sources.size(); i++) { - cur_size += sources[i]->size(); - if (cur_size >= batch_size_ub) { - batch_positions.push_back(i); - batch_sizes.push_back(cur_size - sources[i]->size()); - cur_size = sources[i]->size(); + /* + * Construct batches of byte ranges spanning source files, with the starting position of batches + * indicated by `batch_offsets`. `pref_bytes_size` gives the bytes position from which the current + * batch begins, and `end_bytes_size` gives the terminal bytes position after which reading + * stops. + */ + size_t pref_bytes_size = chunk_offset; + size_t end_bytes_size = chunk_offset + chunk_size; + std::vector batch_offsets{pref_bytes_size}; + for (size_t i = start_source; i < sources.size() && pref_bytes_size < end_bytes_size;) { + pref_source_size += sources[i]->size(); + // If the current source file can subsume multiple batches, we split the file until the + // boundary of the last batch exceeds the end of the file (indexed by `pref_source_size`) + while (pref_bytes_size < end_bytes_size && + pref_source_size >= std::min(pref_bytes_size + batch_size_ub, end_bytes_size)) { + auto next_batch_size = std::min(batch_size_ub, end_bytes_size - pref_bytes_size); + batch_offsets.push_back(batch_offsets.back() + next_batch_size); + pref_bytes_size += next_batch_size; } + i++; } - batch_positions.push_back(sources.size()); - batch_sizes.push_back(cur_size); - - // If there is a single batch, then we can directly return the table without the - // unnecessary concatenate - if (batch_sizes.size() == 1) return read_batch(sources, reader_opts, stream, mr); + /* + * If there is a single batch, then we can directly return the table without the + * unnecessary concatenate. The size of batch_offsets is 1 if all sources are empty, + * or if end_bytes_size is larger than total_source_size. + */ + if (batch_offsets.size() <= 2) return read_batch(sources, reader_opts, stream, mr); std::vector partial_tables; json_reader_options batched_reader_opts{reader_opts}; - // Dispatch individual batches to read_batch and push the resulting table into // partial_tables array. Note that the reader options need to be updated for each // batch to adjust byte range offset and byte range size. - for (size_t i = 0; i < batch_sizes.size(); i++) { - batched_reader_opts.set_byte_range_size(std::min(batch_sizes[i], chunk_size)); - partial_tables.emplace_back(read_batch( - host_span>(sources.begin() + batch_positions[i], - batch_positions[i + 1] - batch_positions[i]), - batched_reader_opts, - stream, - rmm::mr::get_current_device_resource())); - if (chunk_size <= batch_sizes[i]) break; - chunk_size -= batch_sizes[i]; - batched_reader_opts.set_byte_range_offset(0); + for (size_t i = 0; i < batch_offsets.size() - 1; i++) { + batched_reader_opts.set_byte_range_offset(batch_offsets[i]); + batched_reader_opts.set_byte_range_size(batch_offsets[i + 1] - batch_offsets[i]); + partial_tables.emplace_back( + read_batch(sources, batched_reader_opts, stream, rmm::mr::get_current_device_resource())); } auto expects_schema_equality = diff --git a/cpp/src/io/json/read_json.hpp b/cpp/src/io/json/read_json.hpp index 0c30b4cad46..ff69f9b7627 100644 --- a/cpp/src/io/json/read_json.hpp +++ b/cpp/src/io/json/read_json.hpp @@ -29,6 +29,19 @@ namespace cudf::io::json::detail { +// Some magic numbers +constexpr int num_subchunks = 10; // per chunk_size +constexpr size_t min_subchunk_size = 10000; +constexpr int estimated_compression_ratio = 4; +constexpr int max_subchunks_prealloced = 3; + +device_span ingest_raw_input(device_span buffer, + host_span> sources, + compression_type compression, + size_t range_offset, + size_t range_size, + rmm::cuda_stream_view stream); + table_with_metadata read_json(host_span> sources, json_reader_options const& reader_opts, rmm::cuda_stream_view stream, @@ -38,9 +51,4 @@ size_type find_first_delimiter(device_span d_data, char const delimiter, rmm::cuda_stream_view stream); -size_type find_first_delimiter_in_chunk(host_span> sources, - json_reader_options const& reader_opts, - char const delimiter, - rmm::cuda_stream_view stream); - } // namespace cudf::io::json::detail diff --git a/cpp/src/lists/explode.cu b/cpp/src/lists/explode.cu index 370d7480578..46c4fc78a6f 100644 --- a/cpp/src/lists/explode.cu +++ b/cpp/src/lists/explode.cu @@ -229,8 +229,8 @@ std::unique_ptr
explode_outer(table_view const& input_table, if (null_or_empty_count == 0) { // performance penalty to run the below loop if there are no nulls or empty lists. // run simple explode instead - return include_position ? explode_position(input_table, explode_column_idx, stream, mr) - : explode(input_table, explode_column_idx, stream, mr); + return include_position ? detail::explode_position(input_table, explode_column_idx, stream, mr) + : detail::explode(input_table, explode_column_idx, stream, mr); } auto gather_map_size = sliced_child.size() + null_or_empty_count; @@ -300,58 +300,63 @@ std::unique_ptr
explode_outer(table_view const& input_table, } // namespace detail /** - * @copydoc cudf::explode(table_view const&, size_type, rmm::device_async_resource_ref) + * @copydoc cudf::explode(table_view const&, size_type, rmm::cuda_stream_view, + * rmm::device_async_resource_ref) */ std::unique_ptr
explode(table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); CUDF_EXPECTS(input_table.column(explode_column_idx).type().id() == type_id::LIST, "Unsupported non-list column"); - return detail::explode(input_table, explode_column_idx, cudf::get_default_stream(), mr); + return detail::explode(input_table, explode_column_idx, stream, mr); } /** - * @copydoc cudf::explode_position(table_view const&, size_type, rmm::device_async_resource_ref) + * @copydoc cudf::explode_position(table_view const&, size_type, rmm::cuda_stream_view, + * rmm::device_async_resource_ref) */ std::unique_ptr
explode_position(table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); CUDF_EXPECTS(input_table.column(explode_column_idx).type().id() == type_id::LIST, "Unsupported non-list column"); - return detail::explode_position(input_table, explode_column_idx, cudf::get_default_stream(), mr); + return detail::explode_position(input_table, explode_column_idx, stream, mr); } /** - * @copydoc cudf::explode_outer(table_view const&, size_type, rmm::device_async_resource_ref) + * @copydoc cudf::explode_outer(table_view const&, size_type, rmm::cuda_stream_view, + * rmm::device_async_resource_ref) */ std::unique_ptr
explode_outer(table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); CUDF_EXPECTS(input_table.column(explode_column_idx).type().id() == type_id::LIST, "Unsupported non-list column"); - return detail::explode_outer( - input_table, explode_column_idx, false, cudf::get_default_stream(), mr); + return detail::explode_outer(input_table, explode_column_idx, false, stream, mr); } /** * @copydoc cudf::explode_outer_position(table_view const&, size_type, - * rmm::device_async_resource_ref) + * rmm::cuda_stream_view, rmm::device_async_resource_ref) */ std::unique_ptr
explode_outer_position(table_view const& input_table, size_type explode_column_idx, + rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { CUDF_FUNC_RANGE(); CUDF_EXPECTS(input_table.column(explode_column_idx).type().id() == type_id::LIST, "Unsupported non-list column"); - return detail::explode_outer( - input_table, explode_column_idx, true, cudf::get_default_stream(), mr); + return detail::explode_outer(input_table, explode_column_idx, true, stream, mr); } } // namespace cudf diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 8e2017ccb97..05e9759632f 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -313,17 +313,17 @@ ConfigureTest( PERCENT 30 ) ConfigureTest( - JSON_TEST io/json_test.cpp io/json_chunked_reader.cpp + JSON_TEST io/json/json_test.cpp io/json/json_chunked_reader.cu GPUS 1 PERCENT 30 ) -ConfigureTest(JSON_WRITER_TEST io/json_writer.cpp) -ConfigureTest(JSON_TYPE_CAST_TEST io/json_type_cast_test.cu) -ConfigureTest(NESTED_JSON_TEST io/nested_json_test.cpp io/json_tree.cpp) +ConfigureTest(JSON_WRITER_TEST io/json/json_writer.cpp) +ConfigureTest(JSON_TYPE_CAST_TEST io/json/json_type_cast_test.cu) +ConfigureTest(NESTED_JSON_TEST io/json/nested_json_test.cpp io/json/json_tree.cpp) ConfigureTest(ARROW_IO_SOURCE_TEST io/arrow_io_source_test.cpp) ConfigureTest(MULTIBYTE_SPLIT_TEST io/text/multibyte_split_test.cpp) -ConfigureTest(JSON_QUOTE_NORMALIZATION io/json_quote_normalization_test.cpp) -ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json_whitespace_normalization_test.cu) +ConfigureTest(JSON_QUOTE_NORMALIZATION io/json/json_quote_normalization_test.cpp) +ConfigureTest(JSON_WHITESPACE_NORMALIZATION io/json/json_whitespace_normalization_test.cu) ConfigureTest( DATA_CHUNK_SOURCE_TEST io/text/data_chunk_source_test.cpp GPUS 1 @@ -572,7 +572,7 @@ ConfigureTest( LARGE_STRINGS_TEST large_strings/concatenate_tests.cpp large_strings/case_tests.cpp - large_strings/json_tests.cpp + large_strings/json_tests.cu large_strings/large_strings_fixture.cpp large_strings/merge_tests.cpp large_strings/parquet_tests.cpp diff --git a/cpp/tests/io/json_chunked_reader.cpp b/cpp/tests/io/json/json_chunked_reader.cu similarity index 64% rename from cpp/tests/io/json_chunked_reader.cpp rename to cpp/tests/io/json/json_chunked_reader.cu index 23d54f7263c..b9dee54752c 100644 --- a/cpp/tests/io/json_chunked_reader.cpp +++ b/cpp/tests/io/json/json_chunked_reader.cu @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "io/json/read_json.hpp" +#include "json_utils.cuh" #include #include @@ -37,65 +37,6 @@ cudf::test::TempDirTestEnvironment* const temp_env = static_cast( ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); -// function to extract first delimiter in the string in each chunk, -// collate together and form byte_range for each chunk, -// parse separately. -std::vector skeleton_for_parellel_chunk_reader( - cudf::host_span> sources, - cudf::io::json_reader_options const& reader_opts, - int32_t chunk_size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - using namespace cudf::io::json::detail; - using cudf::size_type; - size_t total_source_size = 0; - for (auto const& source : sources) { - total_source_size += source->size(); - } - size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; - constexpr size_type no_min_value = -1; - - // Get the first delimiter in each chunk. - std::vector first_delimiter_index(num_chunks); - auto reader_opts_chunk = reader_opts; - for (size_t i = 0; i < num_chunks; i++) { - auto const chunk_start = i * chunk_size; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_size); - first_delimiter_index[i] = - find_first_delimiter_in_chunk(sources, reader_opts_chunk, '\n', stream); - if (first_delimiter_index[i] != no_min_value) { first_delimiter_index[i] += chunk_start; } - } - - // Process and allocate record start, end for each worker. - using record_range = std::pair; - std::vector record_ranges; - record_ranges.reserve(num_chunks); - first_delimiter_index[0] = 0; - auto prev = first_delimiter_index[0]; - for (size_t i = 1; i < num_chunks; i++) { - if (first_delimiter_index[i] == no_min_value) continue; - record_ranges.emplace_back(prev, first_delimiter_index[i]); - prev = first_delimiter_index[i]; - } - record_ranges.emplace_back(prev, total_source_size); - - std::vector tables; - // Process each chunk in parallel. - for (auto const& [chunk_start, chunk_end] : record_ranges) { - if (chunk_start == -1 or chunk_end == -1 or - static_cast(chunk_start) >= total_source_size) - continue; - reader_opts_chunk.set_byte_range_offset(chunk_start); - reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); - tables.push_back(read_json(sources, reader_opts_chunk, stream, mr)); - } - // assume all records have same number of columns, and inferred same type. (or schema is passed) - // TODO a step before to merge all columns, types and infer final schema. - return tables; -} - TEST_F(JsonReaderTest, ByteRange_SingleSource) { std::string const json_string = R"( @@ -118,11 +59,11 @@ TEST_F(JsonReaderTest, ByteRange_SingleSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { @@ -213,11 +154,11 @@ TEST_F(JsonReaderTest, ByteRange_MultiSource) // Test for different chunk sizes for (auto chunk_size : {7, 10, 15, 20, 40, 50, 100, 200, 500, 1000, 2000}) { - auto const tables = skeleton_for_parellel_chunk_reader(datasources, - json_lines_options, - chunk_size, - cudf::get_default_stream(), - rmm::mr::get_current_device_resource()); + auto const tables = split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); auto table_views = std::vector(tables.size()); std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { diff --git a/cpp/tests/io/json_quote_normalization_test.cpp b/cpp/tests/io/json/json_quote_normalization_test.cpp similarity index 100% rename from cpp/tests/io/json_quote_normalization_test.cpp rename to cpp/tests/io/json/json_quote_normalization_test.cpp diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json/json_test.cpp similarity index 100% rename from cpp/tests/io/json_test.cpp rename to cpp/tests/io/json/json_test.cpp diff --git a/cpp/tests/io/json_tree.cpp b/cpp/tests/io/json/json_tree.cpp similarity index 100% rename from cpp/tests/io/json_tree.cpp rename to cpp/tests/io/json/json_tree.cpp diff --git a/cpp/tests/io/json_type_cast_test.cu b/cpp/tests/io/json/json_type_cast_test.cu similarity index 100% rename from cpp/tests/io/json_type_cast_test.cu rename to cpp/tests/io/json/json_type_cast_test.cu diff --git a/cpp/tests/io/json/json_utils.cuh b/cpp/tests/io/json/json_utils.cuh new file mode 100644 index 00000000000..9383797d91b --- /dev/null +++ b/cpp/tests/io/json/json_utils.cuh @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "io/json/read_json.hpp" + +#include +#include +#include +#include + +#include + +#include + +// Helper function to test correctness of JSON byte range reading. +// We split the input source files into a set of byte range chunks each of size +// `chunk_size` and return an array of partial tables constructed from each chunk +template +std::vector split_byte_range_reading( + cudf::host_span> sources, + cudf::io::json_reader_options const& reader_opts, + IndexType chunk_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto total_source_size = [&sources]() { + return std::accumulate(sources.begin(), sources.end(), 0ul, [=](size_t sum, auto& source) { + auto const size = source->size(); + return sum + size; + }); + }(); + auto find_first_delimiter_in_chunk = + [total_source_size, &sources, &stream]( + cudf::io::json_reader_options const& reader_opts) -> IndexType { + rmm::device_uvector buffer(total_source_size, stream); + auto readbufspan = cudf::io::json::detail::ingest_raw_input(buffer, + sources, + reader_opts.get_compression(), + reader_opts.get_byte_range_offset(), + reader_opts.get_byte_range_size(), + stream); + // Note: we cannot reuse cudf::io::json::detail::find_first_delimiter since the + // return type of that function is size_type. However, when the chunk_size is + // larger than INT_MAX, the position of the delimiter can also be larger than + // INT_MAX. We do not encounter this overflow error in the detail function + // since the batched JSON reader splits the byte_range_size into chunk_sizes + // smaller than INT_MAX bytes + auto const first_delimiter_position_it = + thrust::find(rmm::exec_policy(stream), readbufspan.begin(), readbufspan.end(), '\n'); + return first_delimiter_position_it != readbufspan.end() + ? thrust::distance(readbufspan.begin(), first_delimiter_position_it) + : -1; + }; + size_t num_chunks = (total_source_size + chunk_size - 1) / chunk_size; + constexpr IndexType no_min_value = -1; + + // Get the first delimiter in each chunk. + std::vector first_delimiter_index(num_chunks); + auto reader_opts_chunk = reader_opts; + for (size_t i = 0; i < num_chunks; i++) { + auto const chunk_start = i * chunk_size; + // We are updating reader_opt_chunks to store offset and size information for the current chunk + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_size); + first_delimiter_index[i] = find_first_delimiter_in_chunk(reader_opts_chunk); + } + + // Process and allocate record start, end for each worker. + using record_range = std::pair; + std::vector record_ranges; + record_ranges.reserve(num_chunks); + size_t prev = 0; + for (size_t i = 1; i < num_chunks; i++) { + // In the case where chunk_size is smaller than row size, the chunk needs to be skipped + if (first_delimiter_index[i] == no_min_value) continue; + size_t next = static_cast(first_delimiter_index[i]) + (i * chunk_size); + record_ranges.emplace_back(prev, next); + prev = next; + } + record_ranges.emplace_back(prev, total_source_size); + + std::vector tables; + for (auto const& [chunk_start, chunk_end] : record_ranges) { + reader_opts_chunk.set_byte_range_offset(chunk_start); + reader_opts_chunk.set_byte_range_size(chunk_end - chunk_start); + tables.push_back(cudf::io::json::detail::read_json(sources, reader_opts_chunk, stream, mr)); + } + // assume all records have same number of columns, and inferred same type. (or schema is passed) + // TODO a step before to merge all columns, types and infer final schema. + return tables; +} diff --git a/cpp/tests/io/json_whitespace_normalization_test.cu b/cpp/tests/io/json/json_whitespace_normalization_test.cu similarity index 100% rename from cpp/tests/io/json_whitespace_normalization_test.cu rename to cpp/tests/io/json/json_whitespace_normalization_test.cu diff --git a/cpp/tests/io/json_writer.cpp b/cpp/tests/io/json/json_writer.cpp similarity index 100% rename from cpp/tests/io/json_writer.cpp rename to cpp/tests/io/json/json_writer.cpp diff --git a/cpp/tests/io/nested_json_test.cpp b/cpp/tests/io/json/nested_json_test.cpp similarity index 100% rename from cpp/tests/io/nested_json_test.cpp rename to cpp/tests/io/json/nested_json_test.cpp diff --git a/cpp/tests/large_strings/json_tests.cpp b/cpp/tests/large_strings/json_tests.cu similarity index 50% rename from cpp/tests/large_strings/json_tests.cpp rename to cpp/tests/large_strings/json_tests.cu index bf16d131ba7..49abf7b484d 100644 --- a/cpp/tests/large_strings/json_tests.cpp +++ b/cpp/tests/large_strings/json_tests.cu @@ -14,8 +14,13 @@ * limitations under the License. */ +#include "../io/json/json_utils.cuh" #include "large_strings_fixture.hpp" +#include + +#include +#include #include #include @@ -28,31 +33,57 @@ TEST_F(JsonLargeReaderTest, MultiBatch) { "a": { "y" : 6}, "b" : [4, 5 ], "c": 12 } { "a": { "y" : 6}, "b" : [6 ], "c": 13 } { "a": { "y" : 6}, "b" : [7 ], "c": 14 })"; - constexpr size_t expected_file_size = std::numeric_limits::max() / 2; + constexpr size_t batch_size_ub = std::numeric_limits::max(); + constexpr size_t expected_file_size = 1.5 * static_cast(batch_size_ub); std::size_t const log_repetitions = static_cast(std::ceil(std::log2(expected_file_size / json_string.size()))); json_string.reserve(json_string.size() * (1UL << log_repetitions)); - std::size_t numrows = 4; for (std::size_t i = 0; i < log_repetitions; i++) { json_string += json_string; - numrows <<= 1; } constexpr int num_sources = 2; - std::vector> hostbufs( - num_sources, cudf::host_span(json_string.data(), json_string.size())); + std::vector> hostbufs( + num_sources, + cudf::host_span(reinterpret_cast(json_string.data()), + json_string.size())); // Initialize parsing options (reading json lines) cudf::io::json_reader_options json_lines_options = cudf::io::json_reader_options::builder( cudf::io::source_info{ - cudf::host_span>(hostbufs.data(), hostbufs.size())}) + cudf::host_span>(hostbufs.data(), hostbufs.size())}) .lines(true) .compression(cudf::io::compression_type::NONE) .recovery_mode(cudf::io::json_recovery_mode_t::FAIL); // Read full test data via existing, nested JSON lines reader cudf::io::table_with_metadata current_reader_table = cudf::io::read_json(json_lines_options); - ASSERT_EQ(current_reader_table.tbl->num_rows(), numrows * num_sources); + + std::vector> datasources; + for (auto& hb : hostbufs) { + datasources.emplace_back(cudf::io::datasource::create(hb)); + } + // Test for different chunk sizes + std::vector chunk_sizes{ + batch_size_ub / 4, batch_size_ub / 2, batch_size_ub, static_cast(batch_size_ub * 2)}; + for (auto chunk_size : chunk_sizes) { + auto const tables = + split_byte_range_reading(datasources, + json_lines_options, + chunk_size, + cudf::get_default_stream(), + rmm::mr::get_current_device_resource()); + + auto table_views = std::vector(tables.size()); + std::transform(tables.begin(), tables.end(), table_views.begin(), [](auto& table) { + return table.tbl->view(); + }); + auto result = cudf::concatenate(table_views); + + // Verify that the data read via chunked reader matches the data read via nested JSON reader + // cannot use EQUAL due to concatenate removing null mask + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(current_reader_table.tbl->view(), result->view()); + } } diff --git a/cpp/tests/streams/dictionary_test.cpp b/cpp/tests/streams/dictionary_test.cpp index 9e81c8574b8..03e4cf47470 100644 --- a/cpp/tests/streams/dictionary_test.cpp +++ b/cpp/tests/streams/dictionary_test.cpp @@ -26,6 +26,52 @@ class DictionaryTest : public cudf::test::BaseFixture {}; +TEST_F(DictionaryTest, FactoryColumnViews) +{ + cudf::test::strings_column_wrapper keys({"aaa", "ccc", "ddd", "www"}); + cudf::test::fixed_width_column_wrapper values{2, 0, 3, 1, 2, 2, 2, 3, 0}; + + auto dictionary = cudf::make_dictionary_column(keys, values, cudf::test::get_default_stream()); + cudf::dictionary_column_view view(dictionary->view()); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.keys(), keys); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.indices(), values); +} + +TEST_F(DictionaryTest, FactoryColumns) +{ + std::vector h_keys{"aaa", "ccc", "ddd", "www"}; + cudf::test::strings_column_wrapper keys(h_keys.begin(), h_keys.end()); + std::vector h_values{2, 0, 3, 1, 2, 2, 2, 3, 0}; + cudf::test::fixed_width_column_wrapper values(h_values.begin(), h_values.end()); + + auto dictionary = cudf::make_dictionary_column( + keys.release(), values.release(), cudf::test::get_default_stream()); + cudf::dictionary_column_view view(dictionary->view()); + + cudf::test::strings_column_wrapper keys_expected(h_keys.begin(), h_keys.end()); + cudf::test::fixed_width_column_wrapper values_expected(h_values.begin(), h_values.end()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.keys(), keys_expected); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.indices(), values_expected); +} + +TEST_F(DictionaryTest, FactoryColumnsNullMaskCount) +{ + std::vector h_keys{"aaa", "ccc", "ddd", "www"}; + cudf::test::strings_column_wrapper keys(h_keys.begin(), h_keys.end()); + std::vector h_values{2, 0, 3, 1, 2, 2, 2, 3, 0}; + cudf::test::fixed_width_column_wrapper values(h_values.begin(), h_values.end()); + + auto dictionary = cudf::make_dictionary_column( + keys.release(), values.release(), rmm::device_buffer{}, 0, cudf::test::get_default_stream()); + cudf::dictionary_column_view view(dictionary->view()); + + cudf::test::strings_column_wrapper keys_expected(h_keys.begin(), h_keys.end()); + cudf::test::fixed_width_column_wrapper values_expected(h_values.begin(), h_values.end()); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.keys(), keys_expected); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(view.indices(), values_expected); +} + TEST_F(DictionaryTest, Encode) { cudf::test::fixed_width_column_wrapper col({1, 2, 3, 4, 5}); diff --git a/cpp/tests/streams/lists_test.cpp b/cpp/tests/streams/lists_test.cpp index 711e20e4b17..7963dced292 100644 --- a/cpp/tests/streams/lists_test.cpp +++ b/cpp/tests/streams/lists_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -212,3 +213,57 @@ TEST_F(ListTest, HaveOverlap) cudf::nan_equality::ALL_EQUAL, cudf::test::get_default_stream()); } + +TEST_F(ListTest, Explode) +{ + cudf::test::fixed_width_column_wrapper list_col_a{100, 200, 300}; + cudf::test::lists_column_wrapper list_col_b{ + cudf::test::lists_column_wrapper{1, 2, 7}, + cudf::test::lists_column_wrapper{5, 6}, + cudf::test::lists_column_wrapper{0, 3}}; + cudf::test::strings_column_wrapper list_col_c{"string0", "string1", "string2"}; + cudf::table_view lists_table({list_col_a, list_col_b, list_col_c}); + cudf::explode(lists_table, 1, cudf::test::get_default_stream()); +} + +TEST_F(ListTest, ExplodePosition) +{ + cudf::test::fixed_width_column_wrapper list_col_a{100, 200, 300}; + cudf::test::lists_column_wrapper list_col_b{ + cudf::test::lists_column_wrapper{1, 2, 7}, + cudf::test::lists_column_wrapper{5, 6}, + cudf::test::lists_column_wrapper{0, 3}}; + cudf::test::strings_column_wrapper list_col_c{"string0", "string1", "string2"}; + cudf::table_view lists_table({list_col_a, list_col_b, list_col_c}); + cudf::explode_position(lists_table, 1, cudf::test::get_default_stream()); +} + +TEST_F(ListTest, ExplodeOuter) +{ + constexpr auto null = 0; + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2 == 0; }); + cudf::test::lists_column_wrapper list_col_a{ + cudf::test::lists_column_wrapper({1, null, 7}, valids), + cudf::test::lists_column_wrapper({5, null, 0, null}, valids), + cudf::test::lists_column_wrapper{}, + cudf::test::lists_column_wrapper({0, null, 8}, valids)}; + cudf::test::fixed_width_column_wrapper list_col_b{100, 200, 300, 400}; + cudf::table_view lists_table({list_col_a, list_col_b}); + cudf::explode_outer(lists_table, 0, cudf::test::get_default_stream()); +} + +TEST_F(ListTest, ExplodeOuterPosition) +{ + constexpr auto null = 0; + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 2 == 0; }); + cudf::test::lists_column_wrapper list_col_a{ + cudf::test::lists_column_wrapper({1, null, 7}, valids), + cudf::test::lists_column_wrapper({5, null, 0, null}, valids), + cudf::test::lists_column_wrapper{}, + cudf::test::lists_column_wrapper({0, null, 8}, valids)}; + cudf::test::fixed_width_column_wrapper list_col_b{100, 200, 300, 400}; + cudf::table_view lists_table({list_col_a, list_col_b}); + cudf::explode_outer_position(lists_table, 0, cudf::test::get_default_stream()); +} diff --git a/python/cudf/cudf/_lib/lists.pyx b/python/cudf/cudf/_lib/lists.pyx index 76f37c3b845..50061f6e468 100644 --- a/python/cudf/cudf/_lib/lists.pyx +++ b/python/cudf/cudf/_lib/lists.pyx @@ -11,9 +11,6 @@ from cudf._lib.pylibcudf.libcudf.column.column cimport column from cudf._lib.pylibcudf.libcudf.lists.lists_column_view cimport ( lists_column_view, ) -from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( - sort_lists as cpp_sort_lists, -) from cudf._lib.pylibcudf.libcudf.lists.stream_compaction cimport ( distinct as cpp_distinct, ) @@ -21,7 +18,6 @@ from cudf._lib.pylibcudf.libcudf.types cimport ( nan_equality, null_equality, null_order, - order, size_type, ) from cudf._lib.utils cimport columns_from_pylibcudf_table @@ -80,24 +76,14 @@ def distinct(Column col, bool nulls_equal, bool nans_all_equal): @acquire_spill_lock() def sort_lists(Column col, bool ascending, str na_position): - cdef shared_ptr[lists_column_view] list_view = ( - make_shared[lists_column_view](col.view()) - ) - cdef order c_sort_order = ( - order.ASCENDING if ascending else order.DESCENDING - ) - cdef null_order c_null_prec = ( - null_order.BEFORE if na_position == "first" else null_order.AFTER - ) - - cdef unique_ptr[column] c_result - - with nogil: - c_result = move( - cpp_sort_lists(list_view.get()[0], c_sort_order, c_null_prec) + return Column.from_pylibcudf( + pylibcudf.lists.sort_lists( + col.to_pylibcudf(mode="read"), + ascending, + null_order.BEFORE if na_position == "first" else null_order.AFTER, + False, ) - - return Column.from_unique_ptr(move(c_result)) + ) @acquire_spill_lock() diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd index 145ab41302f..337ac73908b 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/lists/sorting.pxd @@ -15,3 +15,9 @@ cdef extern from "cudf/lists/sorting.hpp" namespace "cudf::lists" nogil: order column_order, null_order null_precedence ) except + + + cdef unique_ptr[column] stable_sort_lists( + const lists_column_view source_column, + order column_order, + null_order null_precedence + ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pxd b/python/cudf/cudf/_lib/pylibcudf/lists.pxd index 38eb575ee8d..cacecae6010 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pxd @@ -2,7 +2,7 @@ from libcpp cimport bool -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, size_type from .column cimport Column from .scalar cimport Scalar @@ -35,3 +35,5 @@ cpdef Column segmented_gather(Column, Column) cpdef Column extract_list_element(Column, ColumnOrSizeType) cpdef Column count_elements(Column) + +cpdef Column sort_lists(Column, bool, null_order, bool stable = *) diff --git a/python/cudf/cudf/_lib/pylibcudf/lists.pyx b/python/cudf/cudf/_lib/pylibcudf/lists.pyx index ea469642dd5..b5661a3e634 100644 --- a/python/cudf/cudf/_lib/pylibcudf/lists.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/lists.pyx @@ -23,8 +23,12 @@ from cudf._lib.pylibcudf.libcudf.lists.count_elements cimport ( from cudf._lib.pylibcudf.libcudf.lists.extract cimport ( extract_list_element as cpp_extract_list_element, ) +from cudf._lib.pylibcudf.libcudf.lists.sorting cimport ( + sort_lists as cpp_sort_lists, + stable_sort_lists as cpp_stable_sort_lists, +) from cudf._lib.pylibcudf.libcudf.table.table cimport table -from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.libcudf.types cimport null_order, order, size_type from cudf._lib.pylibcudf.lists cimport ColumnOrScalar, ColumnOrSizeType from .column cimport Column, ListColumnView @@ -320,3 +324,54 @@ cpdef Column count_elements(Column input): c_result = move(cpp_count_elements(list_view.view())) return Column.from_libcudf(move(c_result)) + + +cpdef Column sort_lists( + Column input, + bool ascending, + null_order na_position, + bool stable = False +): + """Sort the elements within a list in each row of a list column. + + For details, see :cpp:func:`sort_lists`. + + Parameters + ---------- + input : Column + The input column. + ascending : bool + If true, the sort order is ascending. Otherwise, the sort order is descending. + na_position : NullOrder + If na_position equals NullOrder.FIRST, then the null values in the output + column are placed first. Otherwise, they are be placed after. + stable: bool + If true :cpp:func:`stable_sort_lists` is used, Otherwise, + :cpp:func:`sort_lists` is used. + + Returns + ------- + Column + A new Column with elements in each list sorted. + """ + cdef unique_ptr[column] c_result + cdef ListColumnView list_view = input.list_view() + + cdef order c_sort_order = ( + order.ASCENDING if ascending else order.DESCENDING + ) + + with nogil: + if stable: + c_result = move(cpp_stable_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + else: + c_result = move(cpp_sort_lists( + list_view.view(), + c_sort_order, + na_position, + )) + return Column.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 288bdfd39b3..1d7136e61e3 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -4525,7 +4525,6 @@ def apply( If False, the funcs will be passed the whole Series at once. Currently not supported. - engine : {'python', 'numba'}, default 'python' Unused. Added for compatibility with pandas. engine_kwargs : dict diff --git a/python/cudf/cudf/pylibcudf_tests/test_lists.py b/python/cudf/cudf/pylibcudf_tests/test_lists.py index 7cfed884f90..87472f6d59b 100644 --- a/python/cudf/cudf/pylibcudf_tests/test_lists.py +++ b/python/cudf/cudf/pylibcudf_tests/test_lists.py @@ -22,6 +22,11 @@ def column(): return pa.array([3, 2, 5, 6]), pa.array([-1, 0, 0, 0], type=pa.int32()) +@pytest.fixture +def lists_column(): + return [[4, 2, 3, 1], [1, 2, None, 4], [-10, 10, 10, 0]] + + def test_concatenate_rows(test_data): arrow_tbl = pa.Table.from_arrays(test_data[0], names=["a", "b"]) plc_tbl = plc.interop.from_arrow(arrow_tbl) @@ -191,3 +196,44 @@ def test_count_elements(test_data): expect = pa.array([1, 1, 0, 3], type=pa.int32()) assert_column_eq(expect, res) + + +@pytest.mark.parametrize( + "ascending,na_position,expected", + [ + ( + True, + plc.types.NullOrder.BEFORE, + [[1, 2, 3, 4], [None, 1, 2, 4], [-10, 0, 10, 10]], + ), + ( + True, + plc.types.NullOrder.AFTER, + [[1, 2, 3, 4], [1, 2, 4, None], [-10, 0, 10, 10]], + ), + ( + False, + plc.types.NullOrder.BEFORE, + [[4, 3, 2, 1], [4, 2, 1, None], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ( + False, + plc.types.NullOrder.AFTER, + [[4, 3, 2, 1], [None, 4, 2, 1], [10, 10, 0, -10]], + ), + ], +) +def test_sort_lists(lists_column, ascending, na_position, expected): + plc_column = plc.interop.from_arrow(pa.array(lists_column)) + res = plc.lists.sort_lists(plc_column, ascending, na_position, False) + res_stable = plc.lists.sort_lists(plc_column, ascending, na_position, True) + + expect = pa.array(expected) + + assert_column_eq(expect, res) + assert_column_eq(expect, res_stable) diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index cbeadf1426a..dba76855329 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -23,8 +23,6 @@ from typing_extensions import Self - import cudf - from cudf_polars.containers import Column @@ -83,16 +81,6 @@ def num_rows(self) -> int: """Number of rows.""" return 0 if len(self.columns) == 0 else self.table.num_rows() - @classmethod - def from_cudf(cls, df: cudf.DataFrame) -> Self: - """Create from a cudf dataframe.""" - return cls( - [ - NamedColumn(c.to_pylibcudf(mode="read"), name) - for name, c in df._data.items() - ] - ) - @classmethod def from_polars(cls, df: pl.DataFrame) -> Self: """ diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 4694805a6e7..6325feced94 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -882,7 +882,14 @@ def __init__( self.name = name self.options = options self.children = children - if self.name not in ("mask_nans", "round", "setsorted", "unique"): + if self.name not in ( + "mask_nans", + "round", + "setsorted", + "unique", + "dropnull", + "fill_null", + ): raise NotImplementedError(f"Unary function {name=}") def do_evaluate( @@ -968,6 +975,27 @@ def do_evaluate( order=order, null_order=null_order, ) + elif self.name == "dropnull": + (column,) = ( + child.evaluate(df, context=context, mapping=mapping) + for child in self.children + ) + return Column( + plc.stream_compaction.drop_nulls( + plc.Table([column.obj]), [0], 1 + ).columns()[0] + ) + elif self.name == "fill_null": + column = self.children[0].evaluate(df, context=context, mapping=mapping) + if isinstance(self.children[1], Literal): + arg = plc.interop.from_arrow(self.children[1].value) + else: + evaluated = self.children[1].evaluate( + df, context=context, mapping=mapping + ) + arg = evaluated.obj_scalar if evaluated.is_scalar else evaluated.obj + return Column(plc.replace.replace_nulls(column.obj, arg)) + raise NotImplementedError( f"Unimplemented unary function {self.name=}" ) # pragma: no cover; init trips first @@ -1160,6 +1188,10 @@ class Cast(Expr): def __init__(self, dtype: plc.DataType, value: Expr) -> None: super().__init__(dtype) self.children = (value,) + if not plc.unary.is_supported_cast(self.dtype, value.dtype): + raise NotImplementedError( + f"Can't cast {self.dtype.id().name} to {value.dtype.id().name}" + ) def do_evaluate( self, diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b934869ffef..e5691cba7dd 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -25,7 +25,6 @@ import polars as pl -import cudf import cudf._lib.pylibcudf as plc import cudf_polars.dsl.expr as expr @@ -205,8 +204,6 @@ class Scan(IR): def __post_init__(self) -> None: """Validate preconditions.""" - if self.file_options.n_rows is not None: - raise NotImplementedError("row limit in scan") if self.typ not in ("csv", "parquet"): raise NotImplementedError(f"Unhandled scan type: {self.typ}") if self.cloud_options is not None and any( @@ -241,6 +238,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: options = self.file_options with_columns = options.with_columns row_index = options.row_index + nrows = self.file_options.n_rows if self.file_options.n_rows is not None else -1 if self.typ == "csv": parse_options = self.reader_options["parse_options"] sep = chr(parse_options["separator"]) @@ -295,6 +293,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: comment=comment, decimal=decimal, dtypes=self.schema, + nrows=nrows, ) pieces.append(tbl_w_meta) tables, colnames = zip( @@ -308,9 +307,16 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: colnames[0], ) elif self.typ == "parquet": - cdf = cudf.read_parquet(self.paths, columns=with_columns) - assert isinstance(cdf, cudf.DataFrame) - df = DataFrame.from_cudf(cdf) + tbl_w_meta = plc.io.parquet.read_parquet( + plc.io.SourceInfo(self.paths), + columns=with_columns, + num_rows=nrows, + ) + df = DataFrame.from_table( + tbl_w_meta.tbl, + # TODO: consider nested column names? + tbl_w_meta.column_names(include_children=False), + ) else: raise NotImplementedError( f"Unhandled scan type: {self.typ}" @@ -337,13 +343,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: null_order=plc.types.NullOrder.AFTER, ) df = DataFrame([index, *df.columns]) - # TODO: should be true, but not the case until we get - # cudf-classic out of the loop for IO since it converts date32 - # to datetime. - # assert all( - # c.obj.type() == dtype - # for c, dtype in zip(df.columns, self.schema.values()) - # ) + assert all(c.obj.type() == self.schema[c.name] for c in df.columns) if self.predicate is None: return df else: diff --git a/python/cudf_polars/tests/expressions/test_casting.py b/python/cudf_polars/tests/expressions/test_casting.py new file mode 100644 index 00000000000..3e003054338 --- /dev/null +++ b/python/cudf_polars/tests/expressions/test_casting.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) + +_supported_dtypes = [(pl.Int8(), pl.Int64())] + +_unsupported_dtypes = [ + (pl.String(), pl.Int64()), +] + + +@pytest.fixture +def dtypes(request): + return request.param + + +@pytest.fixture +def tests(dtypes): + fromtype, totype = dtypes + if fromtype == pl.String(): + data = ["a", "b", "c"] + else: + data = [1, 2, 3] + return pl.DataFrame( + { + "a": pl.Series(data, dtype=fromtype), + } + ).lazy(), totype + + +@pytest.mark.parametrize("dtypes", _supported_dtypes, indirect=True) +def test_cast_supported(tests): + df, totype = tests + q = df.select(pl.col("a").cast(totype)) + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize("dtypes", _unsupported_dtypes, indirect=True) +def test_cast_unsupported(tests): + df, totype = tests + assert_ir_translation_raises( + df.select(pl.col("a").cast(totype)), NotImplementedError + ) diff --git a/python/cudf_polars/tests/test_drop_nulls.py b/python/cudf_polars/tests/test_drop_nulls.py new file mode 100644 index 00000000000..5dfe9f66a97 --- /dev/null +++ b/python/cudf_polars/tests/test_drop_nulls.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) + + +@pytest.fixture( + params=[ + [1, 2, 1, 3, 5, None, None], + [1.5, 2.5, None, 1.5, 3, float("nan"), 3], + [], + [None, None], + [1, 2, 3, 4, 5], + ] +) +def null_data(request): + is_empty = pl.Series(request.param).dtype == pl.Null + return pl.DataFrame( + { + "a": pl.Series(request.param, dtype=pl.Float64 if is_empty else None), + "b": pl.Series(request.param, dtype=pl.Float64 if is_empty else None), + } + ).lazy() + + +def test_drop_null(null_data): + q = null_data.select(pl.col("a").drop_nulls()) + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize( + "value", + [0, pl.col("a").mean(), pl.col("b")], + ids=["scalar", "aggregation", "column_expression"], +) +def test_fill_null(null_data, value): + q = null_data.select(pl.col("a").fill_null(value)) + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize( + "strategy", ["forward", "backward", "min", "max", "mean", "zero", "one"] +) +def test_fill_null_with_strategy(null_data, strategy): + q = null_data.select(pl.col("a").fill_null(strategy=strategy)) + + # Not yet exposed to python from rust + assert_ir_translation_raises(q, NotImplementedError) + + +@pytest.mark.parametrize("strategy", ["forward", "backward"]) +@pytest.mark.parametrize("limit", [0, 1, 2]) +def test_fill_null_with_limit(null_data, strategy, limit): + q = null_data.select(pl.col("a").fill_null(strategy=strategy, limit=limit)) + + # Not yet exposed to python from rust + assert_ir_translation_raises(q, NotImplementedError) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 0981a96a34a..642b6ae8a37 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -24,15 +24,7 @@ def row_index(request): @pytest.fixture( - params=[ - None, - pytest.param( - 2, marks=pytest.mark.xfail(reason="No handling of row limit in scan") - ), - pytest.param( - 3, marks=pytest.mark.xfail(reason="No handling of row limit in scan") - ), - ], + params=[None, 2, 3], ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], ) def n_rows(request):