Skip to content

Commit

Permalink
Remove the internal use of the cudf's default stream in cuIO (#13903)
Browse files Browse the repository at this point in the history
`cudf::get_default_stream()` is used as the default stream when calling `detail` APIs. From this point, the stream passed to this `detail` implementation should be propagated and used consistently. The exceptions are the parts of the implementation that use a separate stream pool.
All in all, libcudf code should not use `cudf::get_default_stream()` outside of dispatching the detail APIs. This PR removes most of such uses in cuIO.
One notable exception is `datasource::host_read`, as it simply does not have a stream parameter (and adding it would be a breaking change).
Also removed comments that mention default stream value that no longer exists.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Nghia Truong (https://github.com/ttnghia)

URL: #13903
  • Loading branch information
vuule authored Aug 18, 2023
1 parent f233422 commit b798a70
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cpp/src/io/avro/avro_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ __global__ void __launch_bounds__(num_warps * 32, 2)
* @param[in] avro_data Raw block data
* @param[in] schema_len Number of entries in schema
* @param[in] min_row_size Minimum size in bytes of a row
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecodeAvroColumnData(device_span<block_desc_s const> blocks,
schemadesc_s* schema,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/csv/csv_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ device_span<uint64_t> remove_blank_rows(cudf::io::parse_options_view const& opti
* @param[in] data The row-column data
* @param[in] column_flags Flags that control individual column parsing
* @param[in] row_offsets List of row data start positions (offsets)
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*
* @return stats Histogram of each dtypes' occurrence for each column
*/
Expand All @@ -218,7 +218,7 @@ std::vector<column_type_histogram> detect_column_types(
* @param[out] columns Device memory output of column data
* @param[out] valids Device memory output of column valids bitmap data
* @param[out] valid_counts Device memory output of the number of valid fields in each column
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void decode_row_column_data(cudf::io::parse_options_view const& options,
device_span<char const> data,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/fst/logical_stack.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols,
StackSymbolT const empty_stack_symbol,
StackSymbolT const read_symbol,
std::size_t const num_symbols_out,
rmm::cuda_stream_view stream = cudf::get_default_stream())
rmm::cuda_stream_view stream)
{
rmm::device_buffer temp_storage{};

Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/json/json_column.cu
Original file line number Diff line number Diff line change
Expand Up @@ -355,16 +355,15 @@ std::vector<std::string> copy_strings_to_host(device_span<SymbolT const> input,
options_view,
stream,
rmm::mr::get_current_device_resource());
auto to_host = [](auto const& col) {
auto to_host = [stream](auto const& col) {
if (col.is_empty()) return std::vector<std::string>{};
auto const scv = cudf::strings_column_view(col);
auto const h_chars = cudf::detail::make_std_vector_sync<char>(
cudf::device_span<char const>(scv.chars().data<char>(), scv.chars().size()),
cudf::get_default_stream());
cudf::device_span<char const>(scv.chars().data<char>(), scv.chars().size()), stream);
auto const h_offsets = cudf::detail::make_std_vector_sync(
cudf::device_span<cudf::size_type const>(scv.offsets().data<cudf::size_type>() + scv.offset(),
scv.size() + 1),
cudf::get_default_stream());
stream);

// build std::string vector from chars and offsets
std::vector<std::string> host_data;
Expand Down Expand Up @@ -719,7 +718,8 @@ void make_device_json_column(device_span<SymbolT const> input,
* @param options The reader options to influence the relevant type inference and type casting
* options
*/
cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& options);
cudf::io::parse_options parsing_options(cudf::io::json_reader_options const& options,
rmm::cuda_stream_view stream);

std::pair<std::unique_ptr<column>, std::vector<column_name_info>> device_json_column_to_cudf_column(
device_json_column& json_col,
Expand Down Expand Up @@ -976,7 +976,7 @@ table_with_metadata device_parse_nested_json(device_span<SymbolT const> d_input,
// Initialize meta data to be populated while recursing through the tree of columns
std::vector<std::unique_ptr<column>> out_columns;
std::vector<column_name_info> out_column_names;
auto parse_opt = parsing_options(options);
auto parse_opt = parsing_options(options, stream);

// Iterate over the struct's child columns and convert to cudf column
size_type column_index = 0;
Expand Down
14 changes: 9 additions & 5 deletions cpp/src/io/json/nested_json_gpu.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1889,12 +1889,12 @@ void make_json_column(json_column& root_column,
*
* @param options The reader options to influence the relevant type inference and type casting
* options
* @param stream The CUDA stream to which kernels are dispatched
*/
auto parsing_options(cudf::io::json_reader_options const& options)
auto parsing_options(cudf::io::json_reader_options const& options, rmm::cuda_stream_view stream)
{
auto parse_opts = cudf::io::parse_options{',', '\n', '\"', '.'};

auto const stream = cudf::get_default_stream();
parse_opts.dayfirst = options.is_enabled_dayfirst();
parse_opts.keepquotes = options.is_enabled_keep_quotes();
parse_opts.trie_true = cudf::detail::create_serialized_trie({"true"}, stream);
Expand Down Expand Up @@ -1975,8 +1975,12 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
}
// Infer column type, if we don't have an explicit type for it
else {
target_type = cudf::io::detail::infer_data_type(
parsing_options(options).json_view(), d_input, string_ranges_it, col_size, stream);
target_type =
cudf::io::detail::infer_data_type(parsing_options(options, stream).json_view(),
d_input,
string_ranges_it,
col_size,
stream);
}

auto [result_bitmask, null_count] = make_validity(json_col);
Expand All @@ -1987,7 +1991,7 @@ std::pair<std::unique_ptr<column>, std::vector<column_name_info>> json_column_to
target_type,
std::move(result_bitmask),
null_count,
parsing_options(options).view(),
parsing_options(options, stream).view(),
stream,
mr);

Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ constexpr bool is_string_col(ColumnChunkDesc const& chunk)
*
* @param[in] chunks List of column chunks
* @param[in] num_chunks Number of column chunks
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_stream_view stream);

Expand All @@ -452,7 +452,7 @@ void DecodePageHeaders(ColumnChunkDesc* chunks, int32_t num_chunks, rmm::cuda_st
*
* @param[in] chunks List of column chunks
* @param[in] num_chunks Number of column chunks
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
int32_t num_chunks,
Expand Down Expand Up @@ -480,7 +480,7 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
* @param compute_string_sizes If set to true, the str_bytes field in PageInfo will
* be computed
* @param level_type_size Size in bytes of the type for level decoding
* @param stream CUDA stream to use, default 0
* @param stream CUDA stream to use
*/
void ComputePageSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
Expand All @@ -504,7 +504,7 @@ void ComputePageSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] min_rows crop all rows below min_row
* @param[in] num_rows Maximum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void ComputePageStringSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
Expand All @@ -524,7 +524,7 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
Expand All @@ -544,7 +544,7 @@ void DecodePageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecodeStringPageData(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
Expand Down Expand Up @@ -654,7 +654,7 @@ void get_dictionary_indices(cudf::detail::device_2dspan<gpu::PageFragment const>
* @param[in] write_v2_headers True if V2 page headers should be written
* @param[in] chunk_grstats Setup for chunk-level stats
* @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
device_span<gpu::EncPage> pages,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/parquet/predicate_pushdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ class stats_expression_converter : public ast::detail::expression_transformer {
std::optional<std::vector<std::vector<size_type>>> aggregate_reader_metadata::filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
std::reference_wrapper<ast::expression const> filter) const
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const
{
auto stream = cudf::get_default_stream();
auto mr = rmm::mr::get_current_device_resource();
auto mr = rmm::mr::get_current_device_resource();
// Create row group indices.
std::vector<std::vector<size_type>> filtered_row_group_indices;
std::vector<std::vector<size_type>> all_row_group_indices;
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ void reader::impl::prepare_data(int64_t skip_rows,
[](auto const& col) { return col.type; });
}
auto const [skip_rows_corrected, num_rows_corrected, row_groups_info] =
_metadata->select_row_groups(row_group_indices, skip_rows, num_rows, output_types, filter);
_metadata->select_row_groups(
row_group_indices, skip_rows, num_rows, output_types, filter, _stream);

if (num_rows_corrected > 0 && not row_groups_info.empty() && not _input_columns.empty()) {
load_and_decompress_data(row_groups_info, num_rows_corrected);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,13 @@ aggregate_reader_metadata::select_row_groups(
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
host_span<data_type const> output_dtypes,
std::optional<std::reference_wrapper<ast::expression const>> filter) const
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const
{
std::optional<std::vector<std::vector<size_type>>> filtered_row_group_indices;
if (filter.has_value()) {
filtered_row_group_indices =
filter_row_groups(row_group_indices, output_dtypes, filter.value());
filter_row_groups(row_group_indices, output_dtypes, filter.value(), stream);
if (filtered_row_group_indices.has_value()) {
row_group_indices =
host_span<std::vector<size_type> const>(filtered_row_group_indices.value());
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ class aggregate_reader_metadata {
* @param row_group_indices Lists of row groups to read, one per source
* @param output_dtypes List of output column datatypes
* @param filter AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
* @return Filtered row group indices, if any is filtered.
*/
[[nodiscard]] std::optional<std::vector<std::vector<size_type>>> filter_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
host_span<data_type const> output_dtypes,
std::reference_wrapper<ast::expression const> filter) const;
std::reference_wrapper<ast::expression const> filter,
rmm::cuda_stream_view stream) const;

/**
* @brief Filters and reduces down to a selection of row groups
Expand All @@ -188,7 +190,7 @@ class aggregate_reader_metadata {
* @param row_count Total number of rows selected
* @param output_dtypes List of output column datatypes
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
*
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of corrected row_start, row_count and list of row group indexes and its
* starting row
*/
Expand All @@ -197,7 +199,8 @@ class aggregate_reader_metadata {
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
std::optional<std::reference_wrapper<ast::expression const>> filter) const;
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;

/**
* @brief Filters and reduces down to a selection of columns
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ std::vector<gpu::chunk_read_info> find_splits(std::vector<cumulative_row_info> c
* @param id Additional intermediate information required to process the pages
* @param num_rows Total number of rows to read
* @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns
* @param stream CUDA stream to use, default 0
* @param stream CUDA stream to use
*/
std::vector<gpu::chunk_read_info> compute_splits(
cudf::detail::hostdevice_vector<gpu::PageInfo>& pages,
Expand Down

0 comments on commit b798a70

Please sign in to comment.