Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add COLLECT groupby aggregation #5874

Merged
merged 32 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
a6664fb
Add group_collect groupby aggregation
shwina Aug 6, 2020
b532d0b
Need only grouped_values(), not sorted_values() for a collect
shwina Aug 6, 2020
e81f18a
Add group collect tests
shwina Aug 6, 2020
d182833
Collect() is the only supported agg for lists
shwina Aug 6, 2020
4e2e040
Merge branch 'branch-0.15' of https://github.com/rapidsai/cudf into g…
shwina Aug 7, 2020
46544fe
Fix is_numerical_dtype to check for categoricals and lists first
shwina Aug 7, 2020
d0672b7
Merge branch 'branch-0.15' of https://github.com/rapidsai/cudf into g…
shwina Aug 10, 2020
0819151
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into g…
shwina Aug 18, 2020
b61ae7d
Add and pass first groupby-list pytest
shwina Aug 19, 2020
3f39b02
Remove stale files
shwina Aug 19, 2020
a344d53
Duplicate changelog entry
shwina Aug 19, 2020
63cedea
More groupby-list tests
shwina Aug 19, 2020
6ec72ad
Doc
shwina Aug 19, 2020
6fa14de
Changelog
shwina Aug 19, 2020
901e6e7
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into g…
shwina Aug 19, 2020
8465d4a
Undo unintended changelog change
shwina Aug 19, 2020
566ff58
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into g…
shwina Aug 27, 2020
fd42010
Fix group collect tests
shwina Aug 27, 2020
bb923cc
Additional groupby collect testing
shwina Aug 28, 2020
94f0e35
Update cpp/src/aggregation/aggregation.cpp
shwina Aug 28, 2020
8cdcdd4
Update cpp/src/groupby/sort/group_collect.cu
shwina Aug 28, 2020
de79a05
Update cpp/src/groupby/sort/group_collect.cu
shwina Aug 28, 2020
d610e89
Don't reference group_labels
shwina Aug 28, 2020
fd51b06
Merge branch 'groupby-collect' of github.com:shwina/cudf into groupby…
shwina Aug 28, 2020
48ebc07
We are collecting, not summing
shwina Aug 28, 2020
c657fce
ROW->COLLECT
shwina Aug 28, 2020
87f1ed1
Merge branch 'branch-0.16' into groupby-collect
shwina Aug 28, 2020
e9ce1b0
Update cpp/src/groupby/sort/group_collect.cu
shwina Aug 31, 2020
bf942af
Construct and pass empty null mask directly
shwina Aug 31, 2020
5008f9b
Merge branch 'groupby-collect' of github.com:shwina/cudf into groupby…
shwina Aug 31, 2020
6e1531e
Merge branch 'branch-0.16' of https://github.com/rapidsai/cudf into g…
shwina Aug 31, 2020
5259dee
Fix dtype arg to column constructor
shwina Aug 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## New Features

- PR #5974 Use libcudf instead of cupy for `arange` or column creation from a scalar.
- PR #5874 Add `COLLECT` groupby aggregation

## Improvements

Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ add_library(cudf
src/groupby/sort/group_nth_element.cu
src/groupby/sort/group_std.cu
src/groupby/sort/group_quantiles.cu
src/groupby/sort/group_collect.cu
src/aggregation/aggregation.cpp
src/aggregation/aggregation.cu
src/aggregation/result_cache.cpp
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class aggregation {
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of element
COLLECT, ///< collect values into a list
PTX, ///< PTX UDF based reduction
CUDA ///< CUDA UDf based reduction
};
Expand Down Expand Up @@ -192,6 +193,9 @@ std::unique_ptr<aggregation> make_nth_element_aggregation(
/// Factory to create a ROW_NUMBER aggregation
std::unique_ptr<aggregation> make_row_number_aggregation();

/// Factory to create a ROW_NUMBER aggregation
shwina marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<aggregation> make_collect_aggregation();

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
*
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ struct target_type_impl<Source, aggregation::ROW_NUMBER> {
using type = cudf::size_type;
};

// Always use list for COLLECT
template <typename Source>
struct target_type_impl<Source, aggregation::COLLECT> {
using type = cudf::list_view;
};

Comment on lines +358 to +363
Copy link
Contributor

@jrhemstad jrhemstad Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually needed? The target_type logic is for determining the type to use for an accumulator for ops like sum/min/max. I wouldn't think it would be needed for COLLECT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- otherwise we fail this check: https://github.com/rapidsai/cudf/blob/branch-0.16/cpp/src/groupby/groupby.cu#L102

which eventually calls:

return (not std::is_void<target_type_t<Source, k>>::value);

/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
Expand Down Expand Up @@ -440,6 +446,8 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()<aggregation::NTH_ELEMENT>(std::forward<Ts>(args)...);
case aggregation::ROW_NUMBER:
return f.template operator()<aggregation::ROW_NUMBER>(std::forward<Ts>(args)...);
case aggregation::COLLECT:
return f.template operator()<aggregation::COLLECT>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ std::unique_ptr<aggregation> make_row_number_aggregation()
{
return std::make_unique<aggregation>(aggregation::ROW_NUMBER);
}
/// Factory to create a COLLECT aggregation
std::unique_ptr<aggregation> make_collect_aggregation()
{
return std::make_unique<aggregation>(aggregation::COLLECT);
}
/// Factory to create a UDF aggregation
std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/groupby/sort/group_collect.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/gather.cuh>
#include <cudf/types.hpp>

namespace cudf {
namespace groupby {
namespace detail {
std::unique_ptr<column> group_collect(column_view const &values,
rmm::device_vector<size_type> const &group_offsets,
size_type num_groups,
rmm::mr::device_memory_resource *mr,
cudaStream_t stream)
{
rmm::device_buffer offsets_data(
group_offsets.data().get(), group_offsets.size() * sizeof(cudf::size_type), stream, mr);
rmm::device_buffer null_mask(0, stream, mr);

auto offsets = std::make_unique<cudf::column>(
cudf::data_type{cudf::type_id::INT32}, num_groups + 1, offsets_data);
shwina marked this conversation as resolved.
Show resolved Hide resolved

return make_lists_column(num_groups,
std::move(offsets),
std::make_unique<cudf::column>(values, stream, mr),
0,
std::move(null_mask),
jrhemstad marked this conversation as resolved.
Show resolved Hide resolved
stream,
mr);
}
} // namespace detail
} // namespace groupby
} // namespace cudf
15 changes: 15 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ std::unique_ptr<column> group_nth_element(column_view const& values,
null_policy null_handling,
rmm::mr::device_memory_resource* mr,
cudaStream_t stream = 0);
/**
* @brief Internal API to collect grouped values into a lists column
*
* @param values Grouped values to collect
* @param group_offsets Offsets of groups' starting points within @p values
* @param num_groups Number of groups
* @param mr Device memory resource used to allocate the returned column's device memory
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<column> group_collect(column_view const& values,
rmm::device_vector<size_type> const& group_offsets,
size_type num_groups,
rmm::mr::device_memory_resource* mr,
cudaStream_t stream = 0);

} // namespace detail
} // namespace groupby
} // namespace cudf
12 changes: 12 additions & 0 deletions cpp/src/groupby/sort/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,18 @@ void store_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation cons
mr,
stream));
}

template <>
void store_result_functor::operator()<aggregation::COLLECT>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

auto result = detail::group_collect(
get_grouped_values(), helper.group_offsets(), helper.num_groups(), mr, stream);

cache.add_result(col_idx, agg, std::move(result));
};

} // namespace detail

// Sort-based groupby
Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ set(GROUPBY_TEST_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_median_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_quantile_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nunique_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nth_element_test.cpp")
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nth_element_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_collect_test.cpp")

ConfigureTest(GROUPBY_TEST "${GROUPBY_TEST_SRC}")

Expand Down
91 changes: 91 additions & 0 deletions cpp/tests/groupby/group_collect_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <tests/groupby/groupby_test_util.hpp>

#include <tests/utilities/base_fixture.hpp>
#include <tests/utilities/column_wrapper.hpp>
#include <tests/utilities/type_lists.hpp>

#include <cudf/detail/aggregation/aggregation.hpp>

namespace cudf {
namespace test {

template <typename V>
struct groupby_collect_test : public cudf::test::BaseFixture {
};

using FixedWidthTypesNotBool = cudf::test::Concat<cudf::test::IntegralTypesNotBool,
cudf::test::FloatingPointTypes,
cudf::test::TimestampTypes>;
TYPED_TEST_CASE(groupby_collect_test, FixedWidthTypesNotBool);

TYPED_TEST(groupby_collect_test, CollectWithoutNulls)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 1, 2, 2, 2};
fixed_width_column_wrapper<V, int32_t> values{1, 2, 3, 4, 5, 6};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2};
lists_column_wrapper<V, int32_t> expect_vals{{1, 2, 3}, {4, 5, 6}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_test, CollectWithNulls)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3};
fixed_width_column_wrapper<V, int32_t> values{{1, 2, 3, 4, 5, 6},
{true, false, true, false, true, false}};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3};

std::vector<int32_t> validity({true, false});
lists_column_wrapper<V, int32_t> expect_vals{
{{1, 2}, validity.begin()}, {{3, 4}, validity.begin()}, {{5, 6}, validity.begin()}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_test, CollectLists)
{
using K = int32_t;
using V = TypeParam;

using LCW = cudf::test::lists_column_wrapper<TypeParam, int32_t>;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3};
lists_column_wrapper<V, int32_t> values{{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3};

lists_column_wrapper<V, int32_t> expect_vals{
{{1, 2}, {3, 4}}, {{5, 6, 7}, LCW{}}, {{9, 10}, {11}}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

} // namespace test
} // namespace cudf
11 changes: 10 additions & 1 deletion python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class AggregationKind(Enum):
ARGMIN = libcudf_aggregation.aggregation.Kind.ARGMIN
NUNIQUE = libcudf_aggregation.aggregation.Kind.NUNIQUE
NTH = libcudf_aggregation.aggregation.Kind.NTH_ELEMENT
COLLECT = libcudf_aggregation.aggregation.Kind.COLLECT
PTX = libcudf_aggregation.aggregation.Kind.PTX
CUDA = libcudf_aggregation.aggregation.Kind.CUDA

Expand Down Expand Up @@ -86,7 +87,9 @@ cdef unique_ptr[aggregation] make_aggregation(op, kwargs={}) except *:
if isinstance(op, str):
agg = getattr(_AggregationFactory, op)(**kwargs)
elif callable(op):
if "dtype" in kwargs:
if op is list:
agg = _AggregationFactory.collect()
elif "dtype" in kwargs:
agg = _AggregationFactory.from_udf(op, **kwargs)
else:
agg = op(_AggregationFactory)
Expand Down Expand Up @@ -215,6 +218,12 @@ cdef class _AggregationFactory:
)
return agg

@classmethod
def collect(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
agg.c_obj = move(libcudf_aggregation.make_collect_aggregation())
return agg

@classmethod
def from_udf(cls, op, *args, **kwargs):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
Expand Down
3 changes: 3 additions & 0 deletions python/cudf/cudf/_lib/cpp/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil:
ARGMIN 'cudf::aggregation::ARGMIN'
NUNIQUE 'cudf::aggregation::NUNIQUE'
NTH_ELEMENT 'cudf::aggregation::NTH_ELEMENT'
COLLECT 'cudf::aggregation::COLLECT'
PTX 'cudf::aggregation::PTX'
CUDA 'cudf::aggregation::CUDA'
Kind kind
Expand Down Expand Up @@ -82,6 +83,8 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil:
size_type n
) except +

cdef unique_ptr[aggregation] make_collect_aggregation() except +

cdef unique_ptr[aggregation] make_udf_aggregation(
udf_type type,
string user_defined_aggregator,
Expand Down
15 changes: 13 additions & 2 deletions python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ _GROUPBY_AGGS = {
"median",
"nunique",
"nth",
"collect"
}

_CATEGORICAL_AGGS = {
Expand All @@ -51,6 +52,9 @@ _STRING_AGGS = {
"nth",
}

_LIST_AGGS = {
"collect"
}

cdef class GroupBy:
cdef unique_ptr[libcudf_groupby.groupby] c_obj
Expand All @@ -70,7 +74,7 @@ cdef class GroupBy:
self.c_obj.reset(
new libcudf_groupby.groupby(
keys_view,
c_null_handling
c_null_handling,
)
)

Expand Down Expand Up @@ -175,12 +179,19 @@ def _drop_unsupported_aggs(Table values, aggs):

from cudf.utils.dtypes import (
is_categorical_dtype,
is_string_dtype
is_string_dtype,
is_list_dtype
)
result = aggs.copy()

for col_name in aggs:
if (
is_list_dtype(values._data[col_name].dtype)
):
for i, agg_name in enumerate(aggs[col_name]):
if Aggregation(agg_name).kind not in _LIST_AGGS:
del result[col_name][i]
elif (
is_string_dtype(values._data[col_name].dtype)
):
for i, agg_name in enumerate(aggs[col_name]):
Expand Down
Loading