Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stream in groupby calls #7705

Merged
merged 10 commits into from
Mar 27, 2021
9 changes: 6 additions & 3 deletions cpp/include/cudf/detail/groupby/sort_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ struct sort_groupby_helper {
/**
* @brief Get the number of groups in `keys`
*/
size_type num_groups() { return group_offsets().size() - 1; }
size_type num_groups(rmm::cuda_stream_view stream = rmm::cuda_stream_default)
karthikeyann marked this conversation as resolved.
Show resolved Hide resolved
{
return group_offsets(stream).size() - 1;
}

/**
* @brief Return the effective number of keys
Expand Down Expand Up @@ -175,7 +178,7 @@ struct sort_groupby_helper {
* @brief Get the group labels corresponding to the sorted order of `keys`.
*
* Each group is assigned a unique numerical "label" in
* `[0, 1, 2, ... , num_groups() - 1, num_groups())`.
* `[0, 1, 2, ... , num_groups() - 1, num_groups(stream))`.
* For a row in sorted `keys`, its corresponding group label indicates which
* group it belongs to.
*
Expand All @@ -192,7 +195,7 @@ struct sort_groupby_helper {
*
* Returns the group label for every row in the original `keys` table. For a
* given unique key row, its group label is equivalent to what is returned by
* `group_labels()`. However, if a row contains a null value, and
* `group_labels(stream)`. However, if a row contains a null value, and
* `include_null_keys == NO`, then its label is NULL.
*
* Computes and stores unsorted labels on first invocation and returns stored
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::aggr

if (_keys.num_rows() == 0) { return std::make_pair(empty_like(_keys), empty_results(requests)); }

return dispatch_aggregation(requests, 0, mr);
return dispatch_aggregation(requests, rmm::cuda_stream_default, mr);
}

// Compute scan requests
Expand Down
60 changes: 32 additions & 28 deletions cpp/src/groupby/sort/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ void aggregrate_result_functor::operator()<aggregation::COUNT_VALID>(aggregation
agg,
get_grouped_values().nullable()
? detail::group_count_valid(
get_grouped_values(), helper.group_labels(), helper.num_groups(), stream, mr)
: detail::group_count_all(helper.group_offsets(), helper.num_groups(), stream, mr));
get_grouped_values(), helper.group_labels(stream), helper.num_groups(stream), stream, mr)
: detail::group_count_all(
helper.group_offsets(stream), helper.num_groups(stream), stream, mr));
}

template <>
Expand All @@ -80,18 +81,21 @@ void aggregrate_result_functor::operator()<aggregation::COUNT_ALL>(aggregation c
if (cache.has_result(col_idx, agg)) return;

cache.add_result(
col_idx, agg, detail::group_count_all(helper.group_offsets(), helper.num_groups(), stream, mr));
col_idx,
agg,
detail::group_count_all(helper.group_offsets(stream), helper.num_groups(stream), stream, mr));
}

template <>
void aggregrate_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

cache.add_result(col_idx,
agg,
detail::group_sum(
get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr));
cache.add_result(
col_idx,
agg,
detail::group_sum(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
};

template <>
Expand All @@ -102,9 +106,9 @@ void aggregrate_result_functor::operator()<aggregation::ARGMAX>(aggregation cons
cache.add_result(col_idx,
agg,
detail::group_argmax(get_grouped_values(),
helper.num_groups(),
helper.group_labels(),
helper.key_sort_order(),
helper.num_groups(stream),
helper.group_labels(stream),
helper.key_sort_order(stream),
stream,
mr));
};
Expand All @@ -117,9 +121,9 @@ void aggregrate_result_functor::operator()<aggregation::ARGMIN>(aggregation cons
cache.add_result(col_idx,
agg,
detail::group_argmin(get_grouped_values(),
helper.num_groups(),
helper.group_labels(),
helper.key_sort_order(),
helper.num_groups(stream),
helper.group_labels(stream),
helper.key_sort_order(stream),
stream,
mr));
};
Expand All @@ -132,7 +136,7 @@ void aggregrate_result_functor::operator()<aggregation::MIN>(aggregation const&
auto result = [&]() {
if (cudf::is_fixed_width(values.type())) {
return detail::group_min(
get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr);
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr);
} else {
auto argmin_agg = make_argmin_aggregation();
operator()<aggregation::ARGMIN>(*argmin_agg);
Expand Down Expand Up @@ -169,7 +173,7 @@ void aggregrate_result_functor::operator()<aggregation::MAX>(aggregation const&
auto result = [&]() {
if (cudf::is_fixed_width(values.type())) {
return detail::group_max(
get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr);
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr);
} else {
auto argmax_agg = make_argmax_aggregation();
operator()<aggregation::ARGMAX>(*argmax_agg);
Expand Down Expand Up @@ -238,7 +242,7 @@ void aggregrate_result_functor::operator()<aggregation::VARIANCE>(aggregation co
auto result = detail::group_var(get_grouped_values(),
mean_result,
group_sizes,
helper.group_labels(),
helper.group_labels(stream),
var_agg._ddof,
stream,
mr);
Expand Down Expand Up @@ -271,8 +275,8 @@ void aggregrate_result_functor::operator()<aggregation::QUANTILE>(aggregation co

auto result = detail::group_quantiles(get_sorted_values(),
group_sizes,
helper.group_offsets(),
helper.num_groups(),
helper.group_offsets(stream),
helper.num_groups(stream),
quantile_agg._quantiles,
quantile_agg._interpolation,
stream,
Expand All @@ -291,8 +295,8 @@ void aggregrate_result_functor::operator()<aggregation::MEDIAN>(aggregation cons

auto result = detail::group_quantiles(get_sorted_values(),
group_sizes,
helper.group_offsets(),
helper.num_groups(),
helper.group_offsets(stream),
helper.num_groups(stream),
{0.5},
interpolation::LINEAR,
stream,
Expand All @@ -308,9 +312,9 @@ void aggregrate_result_functor::operator()<aggregation::NUNIQUE>(aggregation con
auto nunique_agg = static_cast<cudf::detail::nunique_aggregation const&>(agg);

auto result = detail::group_nunique(get_sorted_values(),
helper.group_labels(),
helper.num_groups(),
helper.group_offsets(),
helper.group_labels(stream),
helper.num_groups(stream),
helper.group_offsets(stream),
nunique_agg._null_handling,
stream,
mr);
Expand All @@ -337,9 +341,9 @@ void aggregrate_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation
agg,
detail::group_nth_element(get_grouped_values(),
group_sizes,
helper.group_labels(),
helper.group_offsets(),
helper.num_groups(),
helper.group_labels(stream),
helper.group_offsets(stream),
helper.num_groups(stream),
nth_element_agg._n,
nth_element_agg._null_handling,
stream,
Expand All @@ -357,7 +361,7 @@ void aggregrate_result_functor::operator()<aggregation::COLLECT_LIST>(aggregatio
if (cache.has_result(col_idx, agg)) return;

auto result = detail::group_collect(
get_grouped_values(), helper.group_offsets(), helper.num_groups(), stream, mr);
get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr);

cache.add_result(col_idx, agg, std::move(result));
};
Expand All @@ -373,7 +377,7 @@ void aggregrate_result_functor::operator()<aggregation::COLLECT_SET>(aggregation
if (cache.has_result(col_idx, agg)) { return; }

auto const collect_result = detail::group_collect(
get_grouped_values(), helper.group_offsets(), helper.num_groups(), stream, mr);
get_grouped_values(), helper.group_offsets(stream), helper.num_groups(stream), stream, mr);
auto const nulls_equal =
static_cast<cudf::detail::collect_set_aggregation const&>(agg)._null_equal;
cache.add_result(col_idx,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/groupby/sort/functors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct store_result_functor {
// It's overridden in scan implementation.
return sorted_values->view();
else
return (grouped_values = helper.grouped_values(values))->view();
return (grouped_values = helper.grouped_values(values, stream))->view();
};

/**
Expand All @@ -76,7 +76,7 @@ struct store_result_functor {
column_view get_sorted_values()
{
return sorted_values ? sorted_values->view()
: (sorted_values = helper.sorted_values(values))->view();
: (sorted_values = helper.sorted_values(values, stream))->view();
};

protected:
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct scan_result_functor final : store_result_functor {
if (grouped_values)
return grouped_values->view();
else
return (grouped_values = helper.grouped_values(values))->view();
return (grouped_values = helper.grouped_values(values, stream))->view();
};
};

Expand All @@ -71,7 +71,8 @@ void scan_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
cache.add_result(
col_idx,
agg,
detail::sum_scan(get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr));
detail::sum_scan(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
Expand All @@ -82,7 +83,8 @@ void scan_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
cache.add_result(
col_idx,
agg,
detail::min_scan(get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr));
detail::min_scan(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
Expand All @@ -93,15 +95,16 @@ void scan_result_functor::operator()<aggregation::MAX>(aggregation const& agg)
cache.add_result(
col_idx,
agg,
detail::max_scan(get_grouped_values(), helper.num_groups(), helper.group_labels(), stream, mr));
detail::max_scan(
get_grouped_values(), helper.num_groups(stream), helper.group_labels(stream), stream, mr));
}

template <>
void scan_result_functor::operator()<aggregation::COUNT_ALL>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

cache.add_result(col_idx, agg, detail::count_scan(helper.group_labels(), stream, mr));
cache.add_result(col_idx, agg, detail::count_scan(helper.group_labels(stream), stream, mr));
}
} // namespace detail

Expand Down
26 changes: 13 additions & 13 deletions cpp/src/groupby/sort/sort_helper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ column_view sort_groupby_helper::key_sort_order(rmm::cuda_stream_view stream)
// presence of a null value within a row. This allows moving all rows that
// contain a null value to the end of the sorted order.

auto augmented_keys = table_view({table_view({keys_bitmask_column()}), _keys});
auto augmented_keys = table_view({table_view({keys_bitmask_column(stream)}), _keys});

_key_sorted_order = cudf::detail::stable_sorted_order(
augmented_keys,
Expand All @@ -164,7 +164,7 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_offsets(
_group_offsets = std::make_unique<index_vector>(num_keys(stream) + 1, stream);

auto device_input_table = table_device_view::create(_keys, stream);
auto sorted_order = key_sort_order().data<size_type>();
auto sorted_order = key_sort_order(stream).data<size_type>();
decltype(_group_offsets->begin()) result_end;

if (has_nulls(_keys)) {
Expand Down Expand Up @@ -207,9 +207,9 @@ sort_groupby_helper::index_vector const& sort_groupby_helper::group_labels(
group_labels.end(),
index_vector::value_type{0});
thrust::scatter(rmm::exec_policy(stream),
thrust::make_constant_iterator(1, decltype(num_groups())(1)),
thrust::make_constant_iterator(1, num_groups()),
group_offsets().begin() + 1,
thrust::make_constant_iterator(1, decltype(num_groups(stream))(1)),
thrust::make_constant_iterator(1, num_groups(stream)),
group_offsets(stream).begin() + 1,
group_labels.begin());

thrust::inclusive_scan(
Expand All @@ -226,9 +226,9 @@ column_view sort_groupby_helper::unsorted_keys_labels(rmm::cuda_stream_view stre
data_type(type_to_id<size_type>()), _keys.num_rows(), mask_state::ALL_NULL, stream);

auto group_labels_view = cudf::column_view(
data_type(type_to_id<size_type>()), group_labels().size(), group_labels().data());
data_type(type_to_id<size_type>()), group_labels(stream).size(), group_labels(stream).data());

auto scatter_map = key_sort_order();
auto scatter_map = key_sort_order(stream);

std::unique_ptr<table> t_unsorted_keys_labels =
cudf::detail::scatter(table_view({group_labels_view}),
Expand Down Expand Up @@ -267,7 +267,7 @@ sort_groupby_helper::column_ptr sort_groupby_helper::sorted_values(
column_view const& values, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr)
{
column_ptr values_sort_order =
cudf::detail::stable_sorted_order(table_view({unsorted_keys_labels(), values}),
cudf::detail::stable_sorted_order(table_view({unsorted_keys_labels(stream), values}),
{},
std::vector<null_order>(2, null_order::AFTER),
stream,
Expand All @@ -289,7 +289,7 @@ sort_groupby_helper::column_ptr sort_groupby_helper::sorted_values(
sort_groupby_helper::column_ptr sort_groupby_helper::grouped_values(
column_view const& values, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr)
{
auto gather_map = key_sort_order();
auto gather_map = key_sort_order(stream);

auto grouped_values_table = cudf::detail::gather(table_view({values}),
gather_map,
Expand All @@ -304,14 +304,14 @@ sort_groupby_helper::column_ptr sort_groupby_helper::grouped_values(
std::unique_ptr<table> sort_groupby_helper::unique_keys(rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto idx_data = key_sort_order().data<size_type>();
auto idx_data = key_sort_order(stream).data<size_type>();

auto gather_map_it = thrust::make_transform_iterator(
group_offsets().begin(), [idx_data] __device__(size_type i) { return idx_data[i]; });
group_offsets(stream).begin(), [idx_data] __device__(size_type i) { return idx_data[i]; });

return cudf::detail::gather(_keys,
gather_map_it,
gather_map_it + num_groups(),
gather_map_it + num_groups(stream),
out_of_bounds_policy::DONT_CHECK,
stream,
mr);
Expand All @@ -321,7 +321,7 @@ std::unique_ptr<table> sort_groupby_helper::sorted_keys(rmm::cuda_stream_view st
rmm::mr::device_memory_resource* mr)
{
return cudf::detail::gather(_keys,
key_sort_order(),
key_sort_order(stream),
cudf::out_of_bounds_policy::DONT_CHECK,
cudf::detail::negative_index_policy::NOT_ALLOWED,
stream,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,8 @@ std::unique_ptr<column> grouped_time_range_rolling_window(table_view const& grou
index_vector group_offsets(0, stream), group_labels(0, stream);
if (group_keys.num_columns() > 0) {
sort_groupby_helper helper{group_keys, cudf::null_policy::INCLUDE, cudf::sorted::YES};
group_offsets = index_vector(helper.group_offsets(), stream);
group_labels = index_vector(helper.group_labels(), stream);
group_offsets = index_vector(helper.group_offsets(stream), stream);
group_labels = index_vector(helper.group_labels(stream), stream);
}

// Assumes that `timestamp_column` is actually of a timestamp type.
Expand Down