Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing streams #9767

Merged
merged 8 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/copying/copy.cu
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ std::unique_ptr<column> copy_if_else(Left const& lhs,

if (boolean_mask.is_empty()) { return cudf::empty_like(lhs); }

auto bool_mask_device_p = column_device_view::create(boolean_mask);
auto bool_mask_device_p = column_device_view::create(boolean_mask, stream);
column_device_view bool_mask_device = *bool_mask_device_p;

auto const has_nulls = boolean_mask.has_nulls();
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/copying/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ struct column_scalar_scatterer_impl<struct_view, MapIterator> {

// Null mask pushdown inside factory method
return make_structs_column(
target.size(), std::move(fields), null_count, std::move(*contents.null_mask));
target.size(), std::move(fields), null_count, std::move(*contents.null_mask), stream, mr);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/copying/segmented_shift.cu
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ std::unique_ptr<column> segmented_shift(column_view const& segmented_values,
rmm::mr::device_memory_resource* mr)
{
if (segmented_values.is_empty()) { return empty_like(segmented_values); }
if (offset == 0) { return std::make_unique<column>(segmented_values); };
if (offset == 0) { return std::make_unique<column>(segmented_values, stream, mr); };

return type_dispatcher<dispatch_storage_type>(segmented_values.type(),
segmented_shift_functor_forwarder{},
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/copying/shift.cu
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ struct shift_functor {
using ScalarType = cudf::scalar_type_t<T>;
auto& scalar = static_cast<ScalarType const&>(fill_value);

auto device_input = column_device_view::create(input);
auto device_input = column_device_view::create(input, stream);
auto output =
detail::allocate_like(input, input.size(), mask_allocation_policy::NEVER, stream, mr);
auto device_output = mutable_column_device_view::create(*output);
auto device_output = mutable_column_device_view::create(*output, stream);

auto const scalar_is_valid = scalar.is_valid(stream);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/filling/fill.cu
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void in_place_fill(cudf::mutable_column_view& destination,
using ScalarType = cudf::scalar_type_t<T>;
auto p_scalar = static_cast<ScalarType const*>(&value);
T fill_value = p_scalar->value(stream);
bool is_valid = p_scalar->is_valid();
bool is_valid = p_scalar->is_valid(stream);
cudf::detail::copy_range(thrust::make_constant_iterator(fill_value),
thrust::make_constant_iterator(is_valid),
destination,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/groupby/common/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ namespace detail {

template <typename RequestType>
inline std::vector<aggregation_result> extract_results(host_span<RequestType const> requests,
cudf::detail::result_cache& cache)
cudf::detail::result_cache& cache,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
std::vector<aggregation_result> results(requests.size());
std::unordered_map<std::pair<column_view, std::reference_wrapper<aggregation const>>,
Expand All @@ -45,7 +47,7 @@ inline std::vector<aggregation_result> extract_results(host_span<RequestType con
} else {
auto it = repeated_result.find({requests[i].values, *agg});
if (it != repeated_result.end()) {
results[i].results.emplace_back(std::make_unique<column>(it->second));
results[i].results.emplace_back(std::make_unique<column>(it->second, stream, mr));
} else {
CUDF_FAIL("Cannot extract result from the cache");
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ class hash_compound_agg_finalizer final : public cudf::detail::aggregation_final
column_view sum_result = sparse_results->get_result(col, *sum_agg);
column_view count_result = sparse_results->get_result(col, *count_agg);

auto values_view = column_device_view::create(col);
auto sum_view = column_device_view::create(sum_result);
auto count_view = column_device_view::create(count_result);
auto values_view = column_device_view::create(col, stream);
auto sum_view = column_device_view::create(sum_result, stream);
auto count_view = column_device_view::create(count_result, stream);

auto var_result = make_fixed_width_column(
cudf::detail::target_type(result_type, agg.kind), col.size(), mask_state::ALL_NULL, stream);
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
auto var_result_view = mutable_column_device_view::create(var_result->mutable_view());
auto var_result_view = mutable_column_device_view::create(var_result->mutable_view(), stream);
mutable_table_view var_table_view{{var_result->mutable_view()}};
cudf::detail::initialize_with_identity(var_table_view, {agg.kind}, stream);

Expand Down Expand Up @@ -668,7 +668,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby(
std::unique_ptr<table> unique_keys =
groupby(keys, requests, &cache, has_nulls(keys), include_null_keys, stream, mr);

return std::make_pair(std::move(unique_keys), extract_results(requests, cache));
return std::make_pair(std::move(unique_keys), extract_results(requests, cache, stream, mr));
}
} // namespace hash
} // namespace detail
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort
}
}

auto results = detail::extract_results(requests, cache);
auto results = detail::extract_results(requests, cache, stream, mr);

return std::make_pair(helper().unique_keys(stream, mr), std::move(results));
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_count.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::unique_ptr<column> group_count_valid(column_view const& values,
if (num_groups == 0) { return result; }

if (values.nullable()) {
auto values_view = column_device_view::create(values);
auto values_view = column_device_view::create(values, stream);

// make_validity_iterator returns a boolean iterator that sums to 1 (1+1=1)
// so we need to transform it to cast it to an integer type
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_nth_element.cu
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ std::unique_ptr<column> group_nth_element(column_view const& values,
});
} else { // skip nulls (equivalent to pandas nth(dropna='any'))
// Returns index of nth value.
auto values_view = column_device_view::create(values);
auto values_view = column_device_view::create(values, stream);
auto bitmask_iterator =
thrust::make_transform_iterator(cudf::detail::make_validity_iterator(*values_view),
[] __device__(auto b) { return static_cast<size_type>(b); });
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/groupby/sort/group_scan_util.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ struct group_scan_functor<K,
return make_structs_column(values.size(),
std::move(scanned_children),
values.null_count(),
cudf::detail::copy_bitmask(values, stream, mr));
cudf::detail::copy_bitmask(values, stream, mr),
stream,
mr);
}
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/group_tdigest.cu
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ struct typed_group_tdigest {

// device column view. handy because the .element() function
// automatically handles fixed-point conversions for us
auto d_col = cudf::column_device_view::create(col);
auto d_col = cudf::column_device_view::create(col, stream);

// compute min and max columns
auto min_col = cudf::make_numeric_column(
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort
}
}

auto results = detail::extract_results(requests, cache);
auto results = detail::extract_results(requests, cache, stream, mr);

return std::make_pair(helper().sorted_keys(stream, mr), std::move(results));
}
Expand Down
26 changes: 16 additions & 10 deletions cpp/src/interop/from_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,14 @@ std::unique_ptr<column> dispatch_to_cudf_column::operator()<cudf::string_view>(
UNKNOWN_NULL_COUNT,
std::move(*get_mask_buffer(array, stream, mr)));

return num_rows == array.length() ? std::move(out_col)
: std::make_unique<column>(cudf::detail::slice(
out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())));
return num_rows == array.length()
? std::move(out_col)
: std::make_unique<column>(
cudf::detail::slice(out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())),
stream,
mr);
}

template <>
Expand Down Expand Up @@ -383,11 +386,14 @@ std::unique_ptr<column> dispatch_to_cudf_column::operator()<cudf::list_view>(
stream,
mr);

return num_rows == array.length() ? std::move(out_col)
: std::make_unique<column>(cudf::detail::slice(
out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())));
return num_rows == array.length()
? std::move(out_col)
: std::make_unique<column>(
cudf::detail::slice(out_col->view(),
static_cast<size_type>(array.offset()),
static_cast<size_type>(array.offset() + array.length())),
stream,
mr);
}

std::unique_ptr<column> get_column(arrow::Array const& array,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/interop/to_arrow.cu
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::string_view>(
std::unique_ptr<column> tmp_column =
((input.offset() != 0) or
((input.num_children() == 2) and (input.child(0).size() - 1 != input.size())))
? std::make_unique<cudf::column>(input)
? std::make_unique<cudf::column>(input, stream)
: nullptr;

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
Expand Down Expand Up @@ -245,7 +245,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::struct_view>(
"Number of field names and number of children doesn't match\n");
std::unique_ptr<column> tmp_column = nullptr;

if (input.offset() != 0) { tmp_column = std::make_unique<cudf::column>(input); }
if (input.offset() != 0) { tmp_column = std::make_unique<cudf::column>(input, stream); }

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
auto child_arrays = fetch_child_array(input_view, metadata.children_meta, ar_mr, stream);
Expand Down Expand Up @@ -280,7 +280,7 @@ std::shared_ptr<arrow::Array> dispatch_to_arrow::operator()<cudf::list_view>(
std::unique_ptr<column> tmp_column = nullptr;
if ((input.offset() != 0) or
((input.num_children() == 2) and (input.child(0).size() - 1 != input.size()))) {
tmp_column = std::make_unique<cudf::column>(input);
tmp_column = std::make_unique<cudf::column>(input, stream);
}

column_view input_view = (tmp_column != nullptr) ? tmp_column->view() : input;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ rmm::device_buffer decompress_data(datasource& source,
0);
uncompressed_data_offsets.host_to_device(stream);

thrust::tabulate(rmm::exec_policy(),
thrust::tabulate(rmm::exec_policy(stream),
uncompressed_data_ptrs.begin(),
uncompressed_data_ptrs.end(),
[off = uncompressed_data_offsets.device_ptr(),
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/labeling/label_bins.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ std::unique_ptr<column> label_bins(column_view const& input,
left_begin, left_end, right_begin));
}

const auto mask_and_count = valid_if(output_begin, output_end, filter_null_sentinel());
auto mask_and_count = valid_if(output_begin, output_end, filter_null_sentinel(), stream, mr);

output->set_null_mask(mask_and_count.first, mask_and_count.second);
output->set_null_mask(std::move(mask_and_count.first), mask_and_count.second);
return output;
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/lists/combine/concatenate_list_elements.cu
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ std::unique_ptr<column> concatenate_lists_ignore_null(column_view const& input,

// The child column of the output lists column is just copied from the input column.
auto out_entries = std::make_unique<column>(
lists_column_view(lists_column_view(input).get_sliced_child(stream)).get_sliced_child(stream));
lists_column_view(lists_column_view(input).get_sliced_child(stream)).get_sliced_child(stream),
stream,
mr);

auto [null_mask, null_count] = [&] {
if (!build_null_mask)
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/lists/copying/copying.cu
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ std::unique_ptr<cudf::column> copy_slice(lists_column_view const& lists,
std::move(offsets),
std::move(child),
cudf::UNKNOWN_NULL_COUNT,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

} // namespace detail
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/lists/copying/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ std::unique_ptr<column> gather_list_nested(cudf::lists_column_view const& list,
std::move(child_gd.offsets),
std::move(child),
null_count,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

// it's a leaf. do a regular gather
Expand All @@ -187,7 +189,9 @@ std::unique_ptr<column> gather_list_nested(cudf::lists_column_view const& list,
std::move(child_gd.offsets),
std::move(child),
null_count,
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

} // namespace detail
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/lists/drop_list_duplicates.cu
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,21 @@ std::pair<std::unique_ptr<column>, std::unique_ptr<column>> drop_list_duplicates
// If the values lists column is not given, its corresponding output will be nullptr.
auto out_values =
values ? make_lists_column(keys.size(),
std::make_unique<column>(output_offsets->view()),
std::make_unique<column>(output_offsets->view(), stream, mr),
std::move(unique_entries_and_list_indices[1]),
values.value().null_count(),
cudf::detail::copy_bitmask(values.value().parent(), stream, mr))
cudf::detail::copy_bitmask(values.value().parent(), stream, mr),
stream,
mr)
: nullptr;

auto out_keys = make_lists_column(keys.size(),
std::move(output_offsets),
std::move(unique_entries_and_list_indices[0]),
keys.null_count(),
cudf::detail::copy_bitmask(keys.parent(), stream, mr));
cudf::detail::copy_bitmask(keys.parent(), stream, mr),
stream,
mr);

return std::pair{std::move(out_keys), std::move(out_values)};
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/lists/interleave_columns.cu
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ generate_list_offsets_and_validities(table_view const& input,
auto const num_cols = input.num_columns();
auto const num_rows = input.num_rows();
auto const num_output_lists = num_rows * num_cols;
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);

// The output offsets column.
static_assert(sizeof(offset_type) == sizeof(int32_t));
Expand Down Expand Up @@ -217,7 +217,7 @@ struct interleave_list_entries_impl<T, std::enable_if_t<std::is_same_v<T, cudf::
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr) const noexcept
{
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);
auto comp_fn = compute_string_sizes_and_interleave_lists_fn{
*table_dv_ptr, output_list_offsets.template begin<offset_type>(), data_has_null_mask};

Expand Down Expand Up @@ -250,15 +250,15 @@ struct interleave_list_entries_impl<T, std::enable_if_t<cudf::is_fixed_width<T>(
rmm::mr::device_memory_resource* mr) const noexcept
{
auto const num_cols = input.num_columns();
auto const table_dv_ptr = table_device_view::create(input);
auto const table_dv_ptr = table_device_view::create(input, stream);

// The output child column.
auto output = allocate_like(lists_column_view(*input.begin()).child(),
num_output_entries,
mask_allocation_policy::NEVER,
stream,
mr);
auto output_dv_ptr = mutable_column_device_view::create(*output);
auto output_dv_ptr = mutable_column_device_view::create(*output, stream);

// The array of int8_t to store entry validities.
auto validities =
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/lists/segmented_sort.cu
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ std::unique_ptr<column> sort_lists(lists_column_view const& input,
std::move(output_offset),
std::move(output_child),
input.null_count(),
std::move(null_mask));
std::move(null_mask),
stream,
mr);
}

std::unique_ptr<column> stable_sort_lists(lists_column_view const& input,
Expand Down Expand Up @@ -300,7 +302,9 @@ std::unique_ptr<column> stable_sort_lists(lists_column_view const& input,
std::move(output_offset),
std::move(sorted_child_table->release().front()),
input.null_count(),
cudf::detail::copy_bitmask(input.parent(), stream, mr));
cudf::detail::copy_bitmask(input.parent(), stream, mr),
stream,
mr);
}
} // namespace detail

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/merge/merge.cu
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,9 @@ table_ptr_type merge(std::vector<table_view> const& tables_to_merge,
});

// If there is only one non-empty table_view, return its copy
if (merge_queue.size() == 1) { return std::make_unique<cudf::table>(merge_queue.top().view); }
if (merge_queue.size() == 1) {
return std::make_unique<cudf::table>(merge_queue.top().view, stream, mr);
}
// No inputs have rows, return a table with same columns as the first one
if (merge_queue.empty()) { return empty_like(first_table); }

Expand Down
Loading