Skip to content

Commit

Permalink
Merge pull request #5874 from shwina/groupby-collect
Browse files Browse the repository at this point in the history
[REVIEW] Add `COLLECT` groupby aggregation
  • Loading branch information
Keith Kraus authored Sep 1, 2020
2 parents 0d83d64 + 5259dee commit da6c03a
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- PR #5843 Add `filters` parameter to Python `read_parquet` function for filtering row groups
- PR #5974 Use libcudf instead of cupy for `arange` or column creation from a scalar.
- PR #5874 Add `COLLECT` groupby aggregation
- PR #6119 Add support for `dayofweek` property in `DateTimeIndex` and `DatetimeProperties`

## Improvements
Expand Down
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ add_library(cudf
src/groupby/sort/group_nth_element.cu
src/groupby/sort/group_std.cu
src/groupby/sort/group_quantiles.cu
src/groupby/sort/group_collect.cu
src/aggregation/aggregation.cpp
src/aggregation/aggregation.cu
src/aggregation/result_cache.cpp
Expand Down
4 changes: 4 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class aggregation {
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of element
COLLECT, ///< collect values into a list
PTX, ///< PTX UDF based reduction
CUDA ///< CUDA UDf based reduction
};
Expand Down Expand Up @@ -192,6 +193,9 @@ std::unique_ptr<aggregation> make_nth_element_aggregation(
/// Factory to create a ROW_NUMBER aggregation
std::unique_ptr<aggregation> make_row_number_aggregation();

/// Factory to create a COLLECT_NUMBER aggregation
std::unique_ptr<aggregation> make_collect_aggregation();

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
*
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ struct target_type_impl<Source, aggregation::ROW_NUMBER> {
using type = cudf::size_type;
};

// Always use list for COLLECT
template <typename Source>
struct target_type_impl<Source, aggregation::COLLECT> {
using type = cudf::list_view;
};

/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
Expand Down Expand Up @@ -440,6 +446,8 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()<aggregation::NTH_ELEMENT>(std::forward<Ts>(args)...);
case aggregation::ROW_NUMBER:
return f.template operator()<aggregation::ROW_NUMBER>(std::forward<Ts>(args)...);
case aggregation::COLLECT:
return f.template operator()<aggregation::COLLECT>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ std::unique_ptr<aggregation> make_row_number_aggregation()
{
return std::make_unique<aggregation>(aggregation::ROW_NUMBER);
}
/// Factory to create a COLLECT aggregation
std::unique_ptr<aggregation> make_collect_aggregation()
{
return std::make_unique<aggregation>(aggregation::COLLECT);
}
/// Factory to create a UDF aggregation
std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
Expand Down
48 changes: 48 additions & 0 deletions cpp/src/groupby/sort/group_collect.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/gather.cuh>
#include <cudf/types.hpp>

namespace cudf {
namespace groupby {
namespace detail {
std::unique_ptr<column> group_collect(column_view const &values,
rmm::device_vector<size_type> const &group_offsets,
size_type num_groups,
rmm::mr::device_memory_resource *mr,
cudaStream_t stream)
{
rmm::device_buffer offsets_data(
group_offsets.data().get(), group_offsets.size() * sizeof(cudf::size_type), stream, mr);

auto offsets = std::make_unique<cudf::column>(
cudf::data_type(cudf::type_to_id<cudf::size_type>()), num_groups + 1, std::move(offsets_data));

return make_lists_column(num_groups,
std::move(offsets),
std::make_unique<cudf::column>(values, stream, mr),
0,
rmm::device_buffer{0, stream, mr},
stream,
mr);
}
} // namespace detail
} // namespace groupby
} // namespace cudf
15 changes: 15 additions & 0 deletions cpp/src/groupby/sort/group_reductions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ std::unique_ptr<column> group_nth_element(column_view const& values,
null_policy null_handling,
rmm::mr::device_memory_resource* mr,
cudaStream_t stream = 0);
/**
* @brief Internal API to collect grouped values into a lists column
*
* @param values Grouped values to collect
* @param group_offsets Offsets of groups' starting points within @p values
* @param num_groups Number of groups
* @param mr Device memory resource used to allocate the returned column's device memory
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
std::unique_ptr<column> group_collect(column_view const& values,
rmm::device_vector<size_type> const& group_offsets,
size_type num_groups,
rmm::mr::device_memory_resource* mr,
cudaStream_t stream = 0);

} // namespace detail
} // namespace groupby
} // namespace cudf
12 changes: 12 additions & 0 deletions cpp/src/groupby/sort/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,18 @@ void store_result_functor::operator()<aggregation::NTH_ELEMENT>(aggregation cons
mr,
stream));
}

template <>
void store_result_functor::operator()<aggregation::COLLECT>(aggregation const& agg)
{
if (cache.has_result(col_idx, agg)) return;

auto result = detail::group_collect(
get_grouped_values(), helper.group_offsets(), helper.num_groups(), mr, stream);

cache.add_result(col_idx, agg, std::move(result));
};

} // namespace detail

// Sort-based groupby
Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ set(GROUPBY_TEST_SRC
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_median_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_quantile_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nunique_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nth_element_test.cpp")
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_nth_element_test.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/groupby/group_collect_test.cpp")

ConfigureTest(GROUPBY_TEST "${GROUPBY_TEST_SRC}")

Expand Down
91 changes: 91 additions & 0 deletions cpp/tests/groupby/group_collect_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <tests/groupby/groupby_test_util.hpp>

#include <tests/utilities/base_fixture.hpp>
#include <tests/utilities/column_wrapper.hpp>
#include <tests/utilities/type_lists.hpp>

#include <cudf/detail/aggregation/aggregation.hpp>

namespace cudf {
namespace test {

template <typename V>
struct groupby_collect_test : public cudf::test::BaseFixture {
};

using FixedWidthTypesNotBool = cudf::test::Concat<cudf::test::IntegralTypesNotBool,
cudf::test::FloatingPointTypes,
cudf::test::TimestampTypes>;
TYPED_TEST_CASE(groupby_collect_test, FixedWidthTypesNotBool);

TYPED_TEST(groupby_collect_test, CollectWithoutNulls)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 1, 2, 2, 2};
fixed_width_column_wrapper<V, int32_t> values{1, 2, 3, 4, 5, 6};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2};
lists_column_wrapper<V, int32_t> expect_vals{{1, 2, 3}, {4, 5, 6}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_test, CollectWithNulls)
{
using K = int32_t;
using V = TypeParam;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3};
fixed_width_column_wrapper<V, int32_t> values{{1, 2, 3, 4, 5, 6},
{true, false, true, false, true, false}};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3};

std::vector<int32_t> validity({true, false});
lists_column_wrapper<V, int32_t> expect_vals{
{{1, 2}, validity.begin()}, {{3, 4}, validity.begin()}, {{5, 6}, validity.begin()}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

TYPED_TEST(groupby_collect_test, CollectLists)
{
using K = int32_t;
using V = TypeParam;

using LCW = cudf::test::lists_column_wrapper<TypeParam, int32_t>;

fixed_width_column_wrapper<K, int32_t> keys{1, 1, 2, 2, 3, 3};
lists_column_wrapper<V, int32_t> values{{1, 2}, {3, 4}, {5, 6, 7}, LCW{}, {9, 10}, {11}};

fixed_width_column_wrapper<K, int32_t> expect_keys{1, 2, 3};

lists_column_wrapper<V, int32_t> expect_vals{
{{1, 2}, {3, 4}}, {{5, 6, 7}, LCW{}}, {{9, 10}, {11}}};

auto agg = cudf::make_collect_aggregation();
test_single_agg(keys, values, expect_keys, expect_vals, std::move(agg));
}

} // namespace test
} // namespace cudf
11 changes: 10 additions & 1 deletion python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class AggregationKind(Enum):
ARGMIN = libcudf_aggregation.aggregation.Kind.ARGMIN
NUNIQUE = libcudf_aggregation.aggregation.Kind.NUNIQUE
NTH = libcudf_aggregation.aggregation.Kind.NTH_ELEMENT
COLLECT = libcudf_aggregation.aggregation.Kind.COLLECT
PTX = libcudf_aggregation.aggregation.Kind.PTX
CUDA = libcudf_aggregation.aggregation.Kind.CUDA

Expand Down Expand Up @@ -86,7 +87,9 @@ cdef unique_ptr[aggregation] make_aggregation(op, kwargs={}) except *:
if isinstance(op, str):
agg = getattr(_AggregationFactory, op)(**kwargs)
elif callable(op):
if "dtype" in kwargs:
if op is list:
agg = _AggregationFactory.collect()
elif "dtype" in kwargs:
agg = _AggregationFactory.from_udf(op, **kwargs)
else:
agg = op(_AggregationFactory)
Expand Down Expand Up @@ -215,6 +218,12 @@ cdef class _AggregationFactory:
)
return agg

@classmethod
def collect(cls):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
agg.c_obj = move(libcudf_aggregation.make_collect_aggregation())
return agg

@classmethod
def from_udf(cls, op, *args, **kwargs):
cdef Aggregation agg = Aggregation.__new__(Aggregation)
Expand Down
3 changes: 3 additions & 0 deletions python/cudf/cudf/_lib/cpp/aggregation.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil:
ARGMIN 'cudf::aggregation::ARGMIN'
NUNIQUE 'cudf::aggregation::NUNIQUE'
NTH_ELEMENT 'cudf::aggregation::NTH_ELEMENT'
COLLECT 'cudf::aggregation::COLLECT'
PTX 'cudf::aggregation::PTX'
CUDA 'cudf::aggregation::CUDA'
Kind kind
Expand Down Expand Up @@ -82,6 +83,8 @@ cdef extern from "cudf/aggregation.hpp" namespace "cudf" nogil:
size_type n
) except +

cdef unique_ptr[aggregation] make_collect_aggregation() except +

cdef unique_ptr[aggregation] make_udf_aggregation(
udf_type type,
string user_defined_aggregator,
Expand Down
15 changes: 13 additions & 2 deletions python/cudf/cudf/_lib/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ _GROUPBY_AGGS = {
"median",
"nunique",
"nth",
"collect"
}

_CATEGORICAL_AGGS = {
Expand All @@ -51,6 +52,9 @@ _STRING_AGGS = {
"nth",
}

_LIST_AGGS = {
"collect"
}

cdef class GroupBy:
cdef unique_ptr[libcudf_groupby.groupby] c_obj
Expand All @@ -70,7 +74,7 @@ cdef class GroupBy:
self.c_obj.reset(
new libcudf_groupby.groupby(
keys_view,
c_null_handling
c_null_handling,
)
)

Expand Down Expand Up @@ -175,12 +179,19 @@ def _drop_unsupported_aggs(Table values, aggs):

from cudf.utils.dtypes import (
is_categorical_dtype,
is_string_dtype
is_string_dtype,
is_list_dtype
)
result = aggs.copy()

for col_name in aggs:
if (
is_list_dtype(values._data[col_name].dtype)
):
for i, agg_name in enumerate(aggs[col_name]):
if Aggregation(agg_name).kind not in _LIST_AGGS:
del result[col_name][i]
elif (
is_string_dtype(values._data[col_name].dtype)
):
for i, agg_name in enumerate(aggs[col_name]):
Expand Down
Loading

0 comments on commit da6c03a

Please sign in to comment.