Skip to content

Commit

Permalink
Fix missing streams (#9767)
Browse files Browse the repository at this point in the history
fix missing `stream` argument in default argument of functions.
And also in some cases, `mr` on returned objects creation.

This cleanup is done as a follow up after PR #9679

Almost all of libcudf functions usages of stream arg are cleaned up.
Missing `mr` still might need another clean up.

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - https://github.com/nvdbaranec

URL: #9767
  • Loading branch information
karthikeyann authored Dec 9, 2021
1 parent e6b0661 commit 024003c
Show file tree
Hide file tree
Showing 39 changed files with 134 additions and 94 deletions.
2 changes: 1 addition & 1 deletion cpp/src/copying/copy.cu
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ std::unique_ptr<column> copy_if_else(Left const& lhs,

if (boolean_mask.is_empty()) { return cudf::empty_like(lhs); }

auto bool_mask_device_p = column_device_view::create(boolean_mask);
auto bool_mask_device_p = column_device_view::create(boolean_mask, stream);
column_device_view bool_mask_device = *bool_mask_device_p;

auto const has_nulls = boolean_mask.has_nulls();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ struct column_scalar_scatterer_impl<struct_view, MapIterator> {

// Null mask pushdown inside factory method
return make_structs_column(
target.size(), std::move(fields), null_count, std::move(*contents.null_mask));
target.size(), std::move(fields), null_count, std::move(*contents.null_mask), stream, mr);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/copying/segmented_shift.cu
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ std::unique_ptr<column> segmented_shift(column_view const& segmented_values,
rmm::mr::device_memory_resource* mr)
{
if (segmented_values.is_empty()) { return empty_like(segmented_values); }
if (offset == 0) { return std::make_unique<column>(segmented_values); };
if (offset == 0) { return std::make_unique<column>(segmented_values, stream, mr); };

return type_dispatcher<dispatch_storage_type>(segmented_values.type(),
segmented_shift_functor_forwarder{},
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/copying/shift.cu
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ struct shift_functor {
using ScalarType = cudf::scalar_type_t<T>;
auto& scalar = static_cast<ScalarType const&>(fill_value);

auto device_input = column_device_view::create(input);
auto device_input = column_device_view::create(input, stream);
auto output =
detail::allocate_like(input, input.size(), mask_allocation_policy::NEVER, stream, mr);
auto device_output = mutable_column_device_view::create(*output);
auto device_output = mutable_column_device_view::create(*output, stream);

auto const scalar_is_valid = scalar.is_valid(stream);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/filling/fill.cu
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void in_place_fill(cudf::mutable_column_view& destination,
using ScalarType = cudf::scalar_type_t<T>;
auto p_scalar = static_cast<ScalarType const*>(&value);
T fill_value = p_scalar->value(stream);
bool is_valid = p_scalar->is_valid();
bool is_valid = p_scalar->is_valid(stream);
cudf::detail::copy_range(thrust::make_constant_iterator(fill_value),
thrust::make_constant_iterator(is_valid),
destination,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/groupby/common/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ namespace detail {

template <typename RequestType>
inline std::vector<aggregation_result> extract_results(host_span<RequestType const> requests,
cudf::detail::result_cache& cache)
cudf::detail::result_cache& cache,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::vector<aggregation_result> results(requests.size());
std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
Expand All @@ -45,7 +47,7 @@ inline std::vector<aggregation_result> extract_results(host_span<RequestType con
} else {
auto it = repeated_result.find({requests[i].values, *agg});
if (it != repeated_result.end()) {
results[i].results.emplace_back(std::make_unique<column>(it->second));
results[i].results.emplace_back(std::make_unique<column>(it->second, stream, mr));
} else {
CUDF_FAIL("Cannot extract result from the cache");
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
column_view sum_result = sparse_results->get_result(col, *sum_agg);
column_view count_result = sparse_results->get_result(col, *count_agg);

auto values_view = column_device_view::create(col);
auto sum_view = column_device_view::create(sum_result);
auto count_view = column_device_view::create(count_result);
auto values_view = column_device_view::create(col, stream);
auto sum_view = column_device_view::create(sum_result, stream);
auto count_view = column_device_view::create(count_result, stream);

auto var_result = make_fixed_width_column(
cudf::detail::target_type(result_type, agg.kind), col.size(), mask_state::ALL_NULL, stream);
auto var_result_view = mutable_column_device_view::create(var_result->mutable_view());
auto var_result_view = mutable_column_device_view::create(var_result->mutable_view(), stream);
mutable_table_view var_table_view{{var_result->mutable_view()}};
cudf::detail::initialize_with_identity(var_table_view, {agg.kind}, stream);

Expand Down Expand Up @@ -668,7 +668,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby(
std::unique_ptr<table> unique_keys =
groupby(keys, requests, &cache, has_nulls(keys), include_null_keys, stream, mr);

return std::make_pair(std::move(unique_keys), extract_results(requests, cache));
return std::make_pair(std::move(unique_keys), extract_results(requests, cache, stream, mr));
}
} // namespace hash
} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort
}
}

auto results = detail::extract_results(requests, cache);
auto results = detail::extract_results(requests, cache, stream, mr);

return std::make_pair(helper().unique_keys(stream, mr), std::move(results));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_count.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::unique_ptr<column> group_count_valid(column_view const& values,
if (num_groups == 0) { return result; }

if (values.nullable()) {
auto values_view = column_device_view::create(values);
auto values_view = column_device_view::create(values, stream);

// make_validity_iterator returns a boolean iterator that sums to 1 (1+1=1)
// so we need to transform it to cast it to an integer type
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_nth_element.cu
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ std::unique_ptr<column> group_nth_element(column_view const& values,
});
} else { // skip nulls (equivalent to pandas nth(dropna='any'))
// Returns index of nth value.
auto values_view = column_device_view::create(values);
auto values_view = column_device_view::create(values, stream);
auto bitmask_iterator =
thrust::make_transform_iterator(cudf::detail::make_validity_iterator(*values_view),
[] __device__(auto b) { return static_cast<size_type>(b); });
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/groupby/sort/group_scan_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ struct group_scan_functor<K,
return make_structs_column(values.size(),
std::move(scanned_children),
values.null_count(),
cudf::detail::copy_bitmask(values, stream, mr));
cudf::detail::copy_bitmask(values, stream, mr),
stream,
mr);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ struct typed_group_tdigest {

// device column view. handy because the .element() function
// automatically handles fixed-point conversions for us
auto d_col = cudf::column_device_view::create(col);
auto d_col = cudf::column_device_view::create(col, stream);

// compute min and max columns
auto min_col = cudf::make_numeric_column(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort
}
}

auto results = detail::extract_results(requests, cache);
auto results = detail::extract_results(requests, cache, stream, mr);

return std::make_pair(helper().sorted_keys(stream, mr), std::move(results));
}
Expand Down
26 changes: 16 additions & 10 deletions cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,14 @@ std::unique_ptr<column> dispatch_to_cudf_column::operator()<cudf::string_view>(
UNKNOWN_NULL_COUNT,
std::move(*get_mask_buffer(array, stream, mr)));

return num_rows == array.length() ? std::move(out_col)
: std::make_unique<column>(cudf::detail::slice(
out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())));
return num_rows == array.length()
? std::move(out_col)
: std::make_unique<column>(
cudf::detail::slice(out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())),
stream,
mr);
}

template <>
Expand Down Expand Up @@ -383,11 +386,14 @@ std::unique_ptr<column> dispatch_to_cudf_column::operator()<cudf::list_view>(
stream,
mr);

return num_rows == array.length() ? std::move(out_col)
: std::make_unique<column>(cudf::detail::slice(
out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())));
return num_rows == array.length()
? std::move(out_col)
: std::make_unique<column>(
cudf::detail::slice(out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())),
stream,
mr);
}

std::unique_ptr<column> get_column(arrow::Array const& array,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::string_view>(
std::unique_ptr<column> tmp_column =
((input.offset() != 0) or
((input.num_children() == 2) and (input.child(0).size() - 1 != input.size())))
? std::make_unique<cudf::column>(input)
? std::make_unique<cudf::column>(input, stream)
: nullptr;

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
Expand Down Expand Up @@ -245,7 +245,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::struct_view>(
"Number of field names and number of children doesn't match\n");
std::unique_ptr<column> tmp_column = nullptr;

if (input.offset() != 0) { tmp_column = std::make_unique<cudf::column>(input); }
if (input.offset() != 0) { tmp_column = std::make_unique<cudf::column>(input, stream); }

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
auto child_arrays = fetch_child_array(input_view, metadata.children_meta, ar_mr, stream);
Expand Down Expand Up @@ -280,7 +280,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::list_view>(
std::unique_ptr<column> tmp_column = nullptr;
if ((input.offset() != 0) or
((input.num_children() == 2) and (input.child(0).size() - 1 != input.size()))) {
tmp_column = std::make_unique<cudf::column>(input);
tmp_column = std::make_unique<cudf::column>(input, stream);
}

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ rmm::device_buffer decompress_data(datasource& source,
0);
uncompressed_data_offsets.host_to_device(stream);

thrust::tabulate(rmm::exec_policy(),
thrust::tabulate(rmm::exec_policy(stream),
uncompressed_data_ptrs.begin(),
uncompressed_data_ptrs.end(),
[off = uncompressed_data_offsets.device_ptr(),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/labeling/label_bins.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ std::unique_ptr<column> label_bins(column_view const& input,
left_begin, left_end, right_begin));
}

const auto mask_and_count = valid_if(output_begin, output_end, filter_null_sentinel());
auto mask_and_count = valid_if(output_begin, output_end, filter_null_sentinel(), stream, mr);

output->set_null_mask(mask_and_count.first, mask_and_count.second);
output->set_null_mask(std::move(mask_and_count.first), mask_and_count.second);
return output;
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/lists/combine/concatenate_list_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ std::unique_ptr<column> concatenate_lists_ignore_null(column_view const& input,

// The child column of the output lists column is just copied from the input column.
auto out_entries = std::make_unique<column>(
lists_column_view(lists_column_view(input).get_sliced_child(stream)).get_sliced_child(stream));
lists_column_view(lists_column_view(input).get_sliced_child(stream)).get_sliced_child(stream),
stream,
mr);

auto [null_mask, null_count] = [&] {
if (!build_null_mask)
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/lists/copying/copying.cu
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ std::unique_ptr<cudf::column> copy_slice(lists_column_view const& lists,
std::move(offsets),
std::move(child),
cudf::UNKNOWN_NULL_COUNT,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

} // namespace detail
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/lists/copying/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ std::unique_ptr<column> gather_list_nested(cudf::lists_column_view const& list,
std::move(child_gd.offsets),
std::move(child),
null_count,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

// it's a leaf. do a regular gather
Expand All @@ -187,7 +189,9 @@ std::unique_ptr<column> gather_list_nested(cudf::lists_column_view const& list,
std::move(child_gd.offsets),
std::move(child),
null_count,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

} // namespace detail
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/lists/drop_list_duplicates.cu
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,21 @@ std::pair<std::unique_ptr<column>, std::unique_ptr<column>> drop_list_duplicates
// If the values lists column is not given, its corresponding output will be nullptr.
auto out_values =
values ? make_lists_column(keys.size(),
std::make_unique<column>(output_offsets->view()),
std::make_unique<column>(output_offsets->view(), stream, mr),
std::move(unique_entries_and_list_indices[1]),
values.value().null_count(),
cudf::detail::copy_bitmask(values.value().parent(), stream, mr))
cudf::detail::copy_bitmask(values.value().parent(), stream, mr),
stream,
mr)
: nullptr;

auto out_keys = make_lists_column(keys.size(),
std::move(output_offsets),
std::move(unique_entries_and_list_indices[0]),
keys.null_count(),
cudf::detail::copy_bitmask(keys.parent(), stream, mr));
cudf::detail::copy_bitmask(keys.parent(), stream, mr),
stream,
mr);

return std::pair{std::move(out_keys), std::move(out_values)};
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/lists/interleave_columns.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ generate_list_offsets_and_validities(table_view const& input,
auto const num_cols = input.num_columns();
auto const num_rows = input.num_rows();
auto const num_output_lists = num_rows * num_cols;
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);

// The output offsets column.
static_assert(sizeof(offset_type) == sizeof(int32_t));
Expand Down Expand Up @@ -217,7 +217,7 @@ struct interleave_list_entries_impl<T, std::enable_if_t<std::is_same_v<T, cudf::
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const noexcept
{
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);
auto comp_fn = compute_string_sizes_and_interleave_lists_fn{
*table_dv_ptr, output_list_offsets.template begin<offset_type>(), data_has_null_mask};

Expand Down Expand Up @@ -250,15 +250,15 @@ struct interleave_list_entries_impl<T, std::enable_if_t<cudf::is_fixed_width<T>(
rmm::mr::device_memory_resource* mr) const noexcept
{
auto const num_cols = input.num_columns();
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);

// The output child column.
auto output = allocate_like(lists_column_view(*input.begin()).child(),
num_output_entries,
mask_allocation_policy::NEVER,
stream,
mr);
auto output_dv_ptr = mutable_column_device_view::create(*output);
auto output_dv_ptr = mutable_column_device_view::create(*output, stream);

// The array of int8_t to store entry validities.
auto validities =
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/lists/segmented_sort.cu
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ std::unique_ptr<column> sort_lists(lists_column_view const& input,
std::move(output_offset),
std::move(output_child),
input.null_count(),
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

std::unique_ptr<column> stable_sort_lists(lists_column_view const& input,
Expand Down Expand Up @@ -300,7 +302,9 @@ std::unique_ptr<column> stable_sort_lists(lists_column_view const& input,
std::move(output_offset),
std::move(sorted_child_table->release().front()),
input.null_count(),
cudf::detail::copy_bitmask(input.parent(), stream, mr));
cudf::detail::copy_bitmask(input.parent(), stream, mr),
stream,
mr);
}
} // namespace detail

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/merge/merge.cu
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@ table_ptr_type merge(std::vector<table_view> const& tables_to_merge,
});

// If there is only one non-empty table_view, return its copy
if (merge_queue.size() == 1) { return std::make_unique<cudf::table>(merge_queue.top().view); }
if (merge_queue.size() == 1) {
return std::make_unique<cudf::table>(merge_queue.top().view, stream, mr);
}
// No inputs have rows, return a table with same columns as the first one
if (merge_queue.empty()) { return empty_like(first_table); }

Expand Down
Loading

0 comments on commit 024003c

Please sign in to comment.