Skip to content

Commit

Permalink
Add groupby scan operations (sort groupby) (#7387)
Browse files Browse the repository at this point in the history
Adds support for groupby scan operations. 

Addresses part of 
#1298 cumsum
#1296 cumcount

- sum
- min
- max
- count

Authors:
  - Karthikeyan (@karthikeyann)
  - Michael Wang (@isVoid)

Approvers:
  - Vukasin Milovanovic (@vuule)
  - Jake Hemstad (@jrhemstad)
  - Nghia Truong (@ttnghia)
  - David (@davidwendt)

URL: #7387
  • Loading branch information
karthikeyann authored Mar 23, 2021
1 parent 4e9241e commit 500f42c
Show file tree
Hide file tree
Showing 24 changed files with 1,610 additions and 193 deletions.
9 changes: 7 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#=============================================================================
# Copyright (c) 2018-2020, NVIDIA CORPORATION.
# Copyright (c) 2018-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -194,7 +194,7 @@ add_library(cudf
src/groupby/hash/groupby.cu
src/groupby/sort/group_argmax.cu
src/groupby/sort/group_argmin.cu
src/groupby/sort/groupby.cu
src/groupby/sort/aggregate.cpp
src/groupby/sort/group_collect.cu
src/groupby/sort/group_count.cu
src/groupby/sort/group_max.cu
Expand All @@ -204,6 +204,11 @@ add_library(cudf
src/groupby/sort/group_quantiles.cu
src/groupby/sort/group_std.cu
src/groupby/sort/group_sum.cu
src/groupby/sort/scan.cpp
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_sum_scan.cu
src/groupby/sort/sort_helper.cu
src/hash/hashing.cu
src/interop/dlpack.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/detail/groupby/sort_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ struct sort_groupby_helper {
sorted keys_pre_sorted = sorted::NO)
: _keys(keys),
_num_keys(-1),
_include_null_keys(include_null_keys),
_keys_pre_sorted(keys_pre_sorted)
_keys_pre_sorted(keys_pre_sorted),
_include_null_keys(include_null_keys)
{
if (keys_pre_sorted == sorted::YES and include_null_keys == null_policy::EXCLUDE and
has_nulls(keys)) {
Expand Down
2 changes: 0 additions & 2 deletions cpp/include/cudf/detail/null_mask.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

#include <rmm/cuda_stream_view.hpp>

using cudf::device_span;

namespace cudf {
namespace detail {
/**
Expand Down
62 changes: 61 additions & 1 deletion cpp/include/cudf/groupby.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -166,6 +166,61 @@ class groupby {
std::vector<aggregation_request> const& requests,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Performs grouped scans on the specified values.
*
* The values to aggregate and the aggregations to perform are specifed in an
* `aggregation_request`. Each request contains a `column_view` of values to
* aggregate and a set of `aggregation`s to perform on those elements.
*
* For each `aggregation` in a request, `values[i]` is scan aggregated with
* all previous `values[j]` where rows `i` and `j` in `keys` are equivalent.
*
* The `size()` of the request column must equal `keys.num_rows()`.
*
* For every `aggregation_request` an `aggregation_result` will be returned.
* The `aggregation_result` holds the resulting column(s) for each requested
* aggregation on the `request`s values. The order of the columns in each
* result is the same order as was specified in the request.
*
* The returned `table` contains the group labels for each row, i.e., the
* `keys` given to groupby object. Element `i` across all aggregation results
* belongs to the group at row `i` in the group labels table.
*
* The order of the rows in the group labels is arbitrary. Furthermore,
* successive `groupby::scan` calls may return results in different orders.
*
* @throws cudf::logic_error If `requests[i].values.size() !=
* keys.num_rows()`.
*
* Example:
* ```
* Input:
* keys: {1 2 1 3 1}
* {1 2 1 4 1}
* request:
* values: {3 1 4 9 2}
* aggregations: {{SUM}, {MIN}}
*
* result:
*
* keys: {3 1 1 1 2}
* {4 1 1 1 2}
* values:
* SUM: {9 3 7 9 1}
* MIN: {9 3 3 2 1}
* ```
*
* @param requests The set of columns to scan and the scans to perform
* @param mr Device memory resource used to allocate the returned table and columns' device memory
* @return Pair containing the table with each group's key and
* a vector of aggregation_results for each request in the same order as
* specified in `requests`.
*/
std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> scan(
std::vector<aggregation_request> const& requests,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief The grouped data corresponding to a groupby operation on a set of values.
*
Expand Down Expand Up @@ -231,6 +286,11 @@ class groupby {
std::vector<aggregation_request> const& requests,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> sort_scan(
std::vector<aggregation_request> const& requests,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
};
/** @} */
} // namespace groupby
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/bitmask/null_mask.cu
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
#include <numeric>
#include <type_traits>

using cudf::device_span;

namespace cudf {
size_type state_null_count(mask_state state, size_type size)
{
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/groupby/groupby.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -159,6 +159,24 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::aggr
return dispatch_aggregation(requests, 0, mr);
}

// Compute scan requests
std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::scan(
std::vector<aggregation_request> const& requests, rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
CUDF_EXPECTS(
std::all_of(requests.begin(),
requests.end(),
[this](auto const& request) { return request.values.size() == _keys.num_rows(); }),
"Size mismatch between request values and groupby keys.");

verify_valid_requests(requests);

if (_keys.num_rows() == 0) { return std::make_pair(empty_like(_keys), empty_results(requests)); }

return sort_scan(requests, rmm::cuda_stream_default, mr);
}

groupby::groups groupby::get_groups(table_view values, rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,8 @@
*/

#include <groupby/common/utils.hpp>
#include "group_reductions.hpp"
#include <groupby/sort/functors.hpp>
#include <groupby/sort/group_reductions.hpp>

#include <cudf/aggregation.hpp>
#include <cudf/column/column.hpp>
Expand Down Expand Up @@ -51,71 +52,16 @@ namespace detail {
* memoised sorted and/or grouped values and re-using will save on computation
* of these values.
*/
struct store_result_functor {
store_result_functor(size_type col_idx,
column_view const& values,
sort::sort_groupby_helper& helper,
cudf::detail::result_cache& cache,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: col_idx(col_idx), helper(helper), cache(cache), values(values), stream(stream), mr(mr)
{
}

struct aggregrate_result_functor final : store_result_functor {
using store_result_functor::store_result_functor;
template <aggregation::Kind k>
void operator()(aggregation const& agg)
{
}

private:
/**
* @brief Get the grouped values
*
* Computes the grouped values from @p values on first invocation and returns
* the stored result on subsequent invocation
*/
column_view get_grouped_values()
{
// TODO (dm): After implementing single pass multi-agg, explore making a
// cache of all grouped value columns rather than one at a time
if (grouped_values)
return grouped_values->view();
else if (sorted_values)
// TODO (dm): When we implement scan, it wouldn't be ok to return sorted
// values when asked for grouped values. Change this then.
return sorted_values->view();
else
grouped_values = helper.grouped_values(values);
return grouped_values->view();
};

/**
* @brief Get the grouped and sorted values
*
* Computes the grouped and sorted (within each group) values from @p values
* on first invocation and returns the stored result on subsequent invocation
*/
column_view get_sorted_values()
{
if (not sorted_values) sorted_values = helper.sorted_values(values);
return sorted_values->view();
};

private:
size_type col_idx; ///< Index of column in requests being operated on
sort::sort_groupby_helper& helper; ///< Sort helper
cudf::detail::result_cache& cache; ///< cache of results to store into
column_view const& values; ///< Column of values to group and aggregate

rmm::cuda_stream_view stream; ///< CUDA stream on which to execute kernels
rmm::mr::device_memory_resource* mr; ///< Memory resource to allocate space for results

std::unique_ptr<column> sorted_values; ///< Memoised grouped and sorted values
std::unique_ptr<column> grouped_values; ///< Memoised grouped values
};

template <>
void store_result_functor::operator()<aggregation::COUNT_VALID>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::COUNT_VALID>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -129,7 +75,7 @@ void store_result_functor::operator()<aggregation::COUNT_VALID>(aggregation cons
}

template <>
void store_result_functor::operator()<aggregation::COUNT_ALL>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::COUNT_ALL>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -138,7 +84,7 @@ void store_result_functor::operator()<aggregation::COUNT_ALL>(aggregation const&
}

template <>
void store_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -149,7 +95,7 @@ void store_result_functor::operator()<aggregation::SUM>(aggregation const& agg)
};

template <>
void store_result_functor::operator()<aggregation::ARGMAX>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::ARGMAX>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -164,7 +110,7 @@ void store_result_functor::operator()<aggregation::ARGMAX>(aggregation const& ag
};

template <>
void store_result_functor::operator()<aggregation::ARGMIN>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::ARGMIN>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -179,7 +125,7 @@ void store_result_functor::operator()<aggregation::ARGMIN>(aggregation const& ag
};

template <>
void store_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand Down Expand Up @@ -216,7 +162,7 @@ void store_result_functor::operator()<aggregation::MIN>(aggregation const& agg)
};

template <>
void store_result_functor::operator()<aggregation::MAX>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::MAX>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand Down Expand Up @@ -253,7 +199,7 @@ void store_result_functor::operator()<aggregation::MAX>(aggregation const& agg)
};

template <>
void store_result_functor::operator()<aggregation::MEAN>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::MEAN>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -277,7 +223,7 @@ void store_result_functor::operator()<aggregation::MEAN>(aggregation const& agg)
};

template <>
void store_result_functor::operator()<aggregation::VARIANCE>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::VARIANCE>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -300,7 +246,7 @@ void store_result_functor::operator()<aggregation::VARIANCE>(aggregation const&
};

template <>
void store_result_functor::operator()<aggregation::STD>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::STD>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -314,7 +260,7 @@ void store_result_functor::operator()<aggregation::STD>(aggregation const& agg)
};

template <>
void store_result_functor::operator()<aggregation::QUANTILE>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::QUANTILE>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -335,7 +281,7 @@ void store_result_functor::operator()<aggregation::QUANTILE>(aggregation const&
};

template <>
void store_result_functor::operator()<aggregation::MEDIAN>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::MEDIAN>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -355,7 +301,7 @@ void store_result_functor::operator()<aggregation::MEDIAN>(aggregation const& ag
};

template <>
void store_result_functor::operator()<aggregation::NUNIQUE>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::NUNIQUE>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand All @@ -372,7 +318,7 @@ void store_result_functor::operator()<aggregation::NUNIQUE>(aggregation const& a
};

template <>
void store_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

Expand Down Expand Up @@ -401,7 +347,7 @@ void store_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation cons
}

template <>
void store_result_functor::operator()<aggregation::COLLECT>(aggregation const& agg)
void aggregrate_result_functor::operator()<aggregation::COLLECT>(aggregation const& agg)
{
auto null_handling =
static_cast<cudf::detail::collect_list_aggregation const&>(agg)._null_handling;
Expand Down Expand Up @@ -431,7 +377,7 @@ std::pair<std::unique_ptr<table>, std::vector<aggregation_result>> groupby::sort

for (size_t i = 0; i < requests.size(); i++) {
auto store_functor =
detail::store_result_functor(i, requests[i].values, helper(), cache, stream, mr);
detail::aggregrate_result_functor(i, requests[i].values, helper(), cache, stream, mr);
for (size_t j = 0; j < requests[i].aggregations.size(); j++) {
// TODO (dm): single pass compute all supported reductions
cudf::detail::aggregation_dispatcher(
Expand Down
Loading

0 comments on commit 500f42c

Please sign in to comment.