From b798a70d608cbbe2c7f372a8c21354455ba56f74 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Fri, 18 Aug 2023 11:08:24 -0700 Subject: [PATCH] Remove the internal use of the cudf's default stream in cuIO (#13903) `cudf::get_default_stream()` is used as the default stream when calling `detail` APIs. From this point, the stream passed to this `detail` implementation should be propagated and used consistently. The exceptions are the parts of the implementation that use a separate stream pool. All in all, libcudf code should not use `cudf::get_default_stream()` outside of dispatching the detail APIs. This PR removes most of such uses in cuIO. One notable exception is `datasource::host_read`, as it simply does not have a stream parameter (and adding it would be a breaking change). Also removed comments that mention default stream value that no longer exists. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Bradley Dice (https://github.com/bdice) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/13903 --- cpp/src/io/avro/avro_gpu.cu | 2 +- cpp/src/io/csv/csv_gpu.hpp | 4 ++-- cpp/src/io/fst/logical_stack.cuh | 2 +- cpp/src/io/json/json_column.cu | 12 ++++++------ cpp/src/io/json/nested_json_gpu.cu | 14 +++++++++----- cpp/src/io/parquet/parquet_gpu.hpp | 14 +++++++------- cpp/src/io/parquet/predicate_pushdown.cpp | 6 +++--- cpp/src/io/parquet/reader_impl.cpp | 3 ++- cpp/src/io/parquet/reader_impl_helpers.cpp | 5 +++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 9 ++++++--- cpp/src/io/parquet/reader_impl_preprocess.cu | 2 +- 11 files changed, 41 insertions(+), 32 deletions(-) diff --git a/cpp/src/io/avro/avro_gpu.cu b/cpp/src/io/avro/avro_gpu.cu index 3a663fca041..2c634d9b590 100644 --- a/cpp/src/io/avro/avro_gpu.cu +++ b/cpp/src/io/avro/avro_gpu.cu @@ -419,7 +419,7 @@ __global__ void __launch_bounds__(num_warps * 32, 2) * @param[in] avro_data Raw block data * @param[in] schema_len Number of entries in schema * @param[in] min_row_size Minimum size in bytes of a row - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodeAvroColumnData(device_span blocks, schemadesc_s* schema, diff --git a/cpp/src/io/csv/csv_gpu.hpp b/cpp/src/io/csv/csv_gpu.hpp index 4b51368f14d..62bd8f1eff2 100644 --- a/cpp/src/io/csv/csv_gpu.hpp +++ b/cpp/src/io/csv/csv_gpu.hpp @@ -195,7 +195,7 @@ device_span remove_blank_rows(cudf::io::parse_options_view const& opti * @param[in] data The row-column data * @param[in] column_flags Flags that control individual column parsing * @param[in] row_offsets List of row data start positions (offsets) - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use * * @return stats Histogram of each dtypes' occurrence for each column */ @@ -218,7 +218,7 @@ std::vector detect_column_types( * @param[out] columns Device memory output of column data * @param[out] valids Device memory output of column valids bitmap data * @param[out] valid_counts Device memory output of the number of valid fields in each column - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void decode_row_column_data(cudf::io::parse_options_view const& options, device_span data, diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index a5d32cba125..c4f99736306 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -274,7 +274,7 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, StackSymbolT const empty_stack_symbol, StackSymbolT const read_symbol, std::size_t const num_symbols_out, - rmm::cuda_stream_view stream = cudf::get_default_stream()) + rmm::cuda_stream_view stream) { rmm::device_buffer temp_storage{}; diff --git a/cpp/src/io/json/json_column.cu b/cpp/src/io/json/json_column.cu index 487a4bc4068..bdad16bd9f1 100644 --- a/cpp/src/io/json/json_column.cu +++ b/cpp/src/io/json/json_column.cu @@ -355,16 +355,15 @@ std::vector copy_strings_to_host(device_span input, options_view, stream, rmm::mr::get_current_device_resource()); - auto to_host = [](auto const& col) { + auto to_host = [stream](auto const& col) { if (col.is_empty()) return std::vector{}; auto const scv = cudf::strings_column_view(col); auto const h_chars = cudf::detail::make_std_vector_sync( - cudf::device_span(scv.chars().data(), scv.chars().size()), - cudf::get_default_stream()); + cudf::device_span(scv.chars().data(), scv.chars().size()), stream); auto const h_offsets = cudf::detail::make_std_vector_sync( cudf::device_span(scv.offsets().data() + scv.offset(), scv.size() + 1), - cudf::get_default_stream()); + stream); // build std::string vector from chars and offsets std::vector host_data; @@ -719,7 +718,8 @@ void make_device_json_column(device_span input, * @param options The reader options to influence the relevant type inference and type casting * options */ -cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& options); +cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& options, + rmm::cuda_stream_view stream); std::pair, std::vector> device_json_column_to_cudf_column( device_json_column& json_col, @@ -976,7 +976,7 @@ table_with_metadata device_parse_nested_json(device_span d_input, // Initialize meta data to be populated while recursing through the tree of columns std::vector> out_columns; std::vector out_column_names; - auto parse_opt = parsing_options(options); + auto parse_opt = parsing_options(options, stream); // Iterate over the struct's child columns and convert to cudf column size_type column_index = 0; diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index 9a08b5f9353..b691eaa8caf 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -1889,12 +1889,12 @@ void make_json_column(json_column& root_column, * * @param options The reader options to influence the relevant type inference and type casting * options + * @param stream The CUDA stream to which kernels are dispatched */ -auto parsing_options(cudf::io::json_reader_options const& options) +auto parsing_options(cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream) { auto parse_opts = cudf::io::parse_options{',', '\n', '\"', '.'}; - auto const stream = cudf::get_default_stream(); parse_opts.dayfirst = options.is_enabled_dayfirst(); parse_opts.keepquotes = options.is_enabled_keep_quotes(); parse_opts.trie_true = cudf::detail::create_serialized_trie({"true"}, stream); @@ -1975,8 +1975,12 @@ std::pair, std::vector> json_column_to } // Infer column type, if we don't have an explicit type for it else { - target_type = cudf::io::detail::infer_data_type( - parsing_options(options).json_view(), d_input, string_ranges_it, col_size, stream); + target_type = + cudf::io::detail::infer_data_type(parsing_options(options, stream).json_view(), + d_input, + string_ranges_it, + col_size, + stream); } auto [result_bitmask, null_count] = make_validity(json_col); @@ -1987,7 +1991,7 @@ std::pair, std::vector> json_column_to target_type, std::move(result_bitmask), null_count, - parsing_options(options).view(), + parsing_options(options, stream).view(), stream, mr); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index fc4ad026b61..b7a8f4e2157 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -442,7 +442,7 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk) * * @param[in] chunks List of column chunks * @param[in] num_chunks Number of column chunks - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_stream_view stream); @@ -452,7 +452,7 @@ void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_st * * @param[in] chunks List of column chunks * @param[in] num_chunks Number of column chunks - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, int32_t num_chunks, @@ -480,7 +480,7 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, * @param compute_string_sizes If set to true, the str_bytes field in PageInfo will * be computed * @param level_type_size Size in bytes of the type for level decoding - * @param stream CUDA stream to use, default 0 + * @param stream CUDA stream to use */ void ComputePageSizes(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, @@ -504,7 +504,7 @@ void ComputePageSizes(cudf::detail::hostdevice_vector& pages, * @param[in] min_rows crop all rows below min_row * @param[in] num_rows Maximum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, @@ -524,7 +524,7 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodePageData(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, @@ -544,7 +544,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, @@ -654,7 +654,7 @@ void get_dictionary_indices(cudf::detail::device_2dspan * @param[in] write_v2_headers True if V2 page headers should be written * @param[in] chunk_grstats Setup for chunk-level stats * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void InitEncoderPages(cudf::detail::device_2dspan chunks, device_span pages, diff --git a/cpp/src/io/parquet/predicate_pushdown.cpp b/cpp/src/io/parquet/predicate_pushdown.cpp index 53ebd4900eb..805d082c71e 100644 --- a/cpp/src/io/parquet/predicate_pushdown.cpp +++ b/cpp/src/io/parquet/predicate_pushdown.cpp @@ -375,10 +375,10 @@ class stats_expression_converter : public ast::detail::expression_transformer { std::optional>> aggregate_reader_metadata::filter_row_groups( host_span const> row_group_indices, host_span output_dtypes, - std::reference_wrapper filter) const + std::reference_wrapper filter, + rmm::cuda_stream_view stream) const { - auto stream = cudf::get_default_stream(); - auto mr = rmm::mr::get_current_device_resource(); + auto mr = rmm::mr::get_current_device_resource(); // Create row group indices. std::vector> filtered_row_group_indices; std::vector> all_row_group_indices; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 5a44eb6baa0..b9f3639da79 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -326,7 +326,8 @@ void reader::impl::prepare_data(int64_t skip_rows, [](auto const& col) { return col.type; }); } auto const [skip_rows_corrected, num_rows_corrected, row_groups_info] = - _metadata->select_row_groups(row_group_indices, skip_rows, num_rows, output_types, filter); + _metadata->select_row_groups( + row_group_indices, skip_rows, num_rows, output_types, filter, _stream); if (num_rows_corrected > 0 && not row_groups_info.empty() && not _input_columns.empty()) { load_and_decompress_data(row_groups_info, num_rows_corrected); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 9444ffbcf02..f6dbeb275fc 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -350,12 +350,13 @@ aggregate_reader_metadata::select_row_groups( int64_t skip_rows_opt, std::optional const& num_rows_opt, host_span output_dtypes, - std::optional> filter) const + std::optional> filter, + rmm::cuda_stream_view stream) const { std::optional>> filtered_row_group_indices; if (filter.has_value()) { filtered_row_group_indices = - filter_row_groups(row_group_indices, output_dtypes, filter.value()); + filter_row_groups(row_group_indices, output_dtypes, filter.value(), stream); if (filtered_row_group_indices.has_value()) { row_group_indices = host_span const>(filtered_row_group_indices.value()); diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 1dbcacf0a94..751ffc33123 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -170,12 +170,14 @@ class aggregate_reader_metadata { * @param row_group_indices Lists of row groups to read, one per source * @param output_dtypes List of output column datatypes * @param filter AST expression to filter row groups based on Column chunk statistics + * @param stream CUDA stream used for device memory operations and kernel launches * @return Filtered row group indices, if any is filtered. */ [[nodiscard]] std::optional>> filter_row_groups( host_span const> row_group_indices, host_span output_dtypes, - std::reference_wrapper filter) const; + std::reference_wrapper filter, + rmm::cuda_stream_view stream) const; /** * @brief Filters and reduces down to a selection of row groups @@ -188,7 +190,7 @@ class aggregate_reader_metadata { * @param row_count Total number of rows selected * @param output_dtypes List of output column datatypes * @param filter Optional AST expression to filter row groups based on Column chunk statistics - * + * @param stream CUDA stream used for device memory operations and kernel launches * @return A tuple of corrected row_start, row_count and list of row group indexes and its * starting row */ @@ -197,7 +199,8 @@ class aggregate_reader_metadata { int64_t row_start, std::optional const& row_count, host_span output_dtypes, - std::optional> filter) const; + std::optional> filter, + rmm::cuda_stream_view stream) const; /** * @brief Filters and reduces down to a selection of columns diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 1ea89f5f694..7cdccf0b273 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1178,7 +1178,7 @@ std::vector find_splits(std::vector c * @param id Additional intermediate information required to process the pages * @param num_rows Total number of rows to read * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns - * @param stream CUDA stream to use, default 0 + * @param stream CUDA stream to use */ std::vector compute_splits( cudf::detail::hostdevice_vector& pages,