Skip to content

Commit

Permalink
Implement all methods of groupby rank aggregation in libcudf, python (#…
Browse files Browse the repository at this point in the history
…9569)

Addresses part of #3591

- [x] move RANK (min method), DENSE_RANK (dense method) into single RANK aggregation
- [x] max method
- [x] average method
- [x] first method
- [x] percentage
- [x] order, null order
RANK, DENSE_RANK was implemented for spark requirement. Pandas groupby has 3 more methods. `rank(column_view, rank_method)` already has all 5 methods implemented.

Current implementation has 2 separate aggregations RANK and DENSE_RANK. This is merged to single RANK with parameters `rank_aggregation(rank_method method, null_policy null_handling, bool percentage)`
Groupby.rank support for 3 more methods will be added.

This PR is also pre-requisite for spearman correlation.


Additionally
- [x] Cython, Python plumbing
- [x] benchmark for groupby rank (all methods)
- [x] PERCENT_RANK aggregation is replaced with MIN_0_INDEXED rank_method in RANK aggregation

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - MithunR (https://github.com/mythrocks)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9569
  • Loading branch information
karthikeyann authored Apr 28, 2022
1 parent 20569f6 commit 9ac2477
Show file tree
Hide file tree
Showing 28 changed files with 944 additions and 540 deletions.
12 changes: 10 additions & 2 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,18 @@ ConfigureBench(FILL_BENCH filling/repeat.cpp)
# ##################################################################################################
# * groupby benchmark -----------------------------------------------------------------------------
ConfigureBench(
GROUPBY_BENCH groupby/group_sum.cu groupby/group_nth.cu groupby/group_shift.cu
groupby/group_struct.cu groupby/group_no_requests.cu groupby/group_scan.cu
GROUPBY_BENCH
groupby/group_sum.cu
groupby/group_nth.cu
groupby/group_shift.cu
groupby/group_struct.cu
groupby/group_no_requests.cu
groupby/group_scan.cu
groupby/group_rank_benchmark.cu
)

ConfigureNVBench(GROUPBY_NVBENCH groupby/group_rank_benchmark.cu)

# ##################################################################################################
# * hashing benchmark -----------------------------------------------------------------------------
ConfigureBench(HASHING_BENCH hashing/hash.cpp hashing/partition.cpp)
Expand Down
109 changes: 109 additions & 0 deletions cpp/benchmarks/groupby/group_rank_benchmark.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/rmm_pool_raii.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf/groupby.hpp>
#include <cudf/sorting.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <nvbench/nvbench.cuh>

template <cudf::rank_method method>
static void nvbench_groupby_rank(nvbench::state& state,
nvbench::type_list<nvbench::enum_type<method>>)
{
using namespace cudf;
using type = int64_t;
constexpr auto dtype = type_to_id<int64_t>();
cudf::rmm_pool_raii pool_raii;

bool const is_sorted = state.get_int64("is_sorted");
cudf::size_type const column_size = state.get_int64("data_size");
constexpr int num_groups = 100;

data_profile profile;
profile.set_null_frequency(std::nullopt);
profile.set_cardinality(0);
profile.set_distribution_params<type>(dtype, distribution_id::UNIFORM, 0, num_groups);

auto source_table = create_random_table({dtype, dtype}, row_count{column_size}, profile);

// values to be pre-sorted too for groupby rank
if (is_sorted) source_table = cudf::sort(*source_table);

table_view keys{{source_table->view().column(0)}};
column_view order_by{source_table->view().column(1)};

auto agg = cudf::make_rank_aggregation<groupby_scan_aggregation>(method);
std::vector<groupby::scan_request> requests;
requests.emplace_back(groupby::scan_request());
requests[0].values = order_by;
requests[0].aggregations.push_back(std::move(agg));

groupby::groupby gb_obj(keys, null_policy::EXCLUDE, is_sorted ? sorted::YES : sorted::NO);

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
// groupby scan uses sort implementation
auto result = gb_obj.scan(requests);
});
}

enum class rank_method : int32_t {};

NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
cudf::rank_method,
[](cudf::rank_method value) {
switch (value) {
case cudf::rank_method::FIRST: return "FIRST";
case cudf::rank_method::AVERAGE: return "AVERAGE";
case cudf::rank_method::MIN: return "MIN";
case cudf::rank_method::MAX: return "MAX";
case cudf::rank_method::DENSE: return "DENSE";
default: return "unknown";
}
},
[](cudf::rank_method value) {
switch (value) {
case cudf::rank_method::FIRST: return "cudf::rank_method::FIRST";
case cudf::rank_method::AVERAGE: return "cudf::rank_method::AVERAGE";
case cudf::rank_method::MIN: return "cudf::rank_method::MIN";
case cudf::rank_method::MAX: return "cudf::rank_method::MAX";
case cudf::rank_method::DENSE: return "cudf::rank_method::DENSE";
default: return "unknown";
}
})

using methods = nvbench::enum_type_list<cudf::rank_method::AVERAGE,
cudf::rank_method::DENSE,
cudf::rank_method::FIRST,
cudf::rank_method::MAX,
cudf::rank_method::MIN>;

NVBENCH_BENCH_TYPES(nvbench_groupby_rank, NVBENCH_TYPE_AXES(methods))
.set_type_axes_names({"rank_method"})
.set_name("groupby_rank")
.add_int64_axis("data_size",
{
1000000, // 1M
10000000, // 10M
100000000, // 100M
})

.add_int64_axis("is_sorted", {0, 1});
186 changes: 64 additions & 122 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ namespace detail {
class simple_aggregations_collector;
class aggregation_finalizer;
} // namespace detail

/**
* @brief Tie-breaker method to use for ranking the column.
*
* @see cudf::make_rank_aggregation for more details.
* @ingroup column_sort
*/
enum class rank_method : int32_t {
FIRST, ///< stable sort order ranking (no ties)
AVERAGE, ///< mean of first in the group
MIN, ///< min of first in the group
MAX, ///< max of first in the group
DENSE ///< rank always increases by 1 between groups
};

/**
* @brief Whether returned rank should be percentage or not and
* mention the type of percentage normalization.
*
*/
enum class rank_percentage : int32_t {
NONE, ///< rank
ZERO_NORMALIZED, ///< rank / count
ONE_NORMALIZED ///< (rank - 1) / (count - 1)
};

/**
* @brief Abstract base class for specifying the desired aggregation in an
* `aggregation_request`.
Expand Down Expand Up @@ -77,9 +103,7 @@ class aggregation {
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
RANK, ///< get rank of current index
DENSE_RANK, ///< get dense rank of current index
PERCENT_RANK, ///< get percent (i.e. fractional) rank of current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
Expand Down Expand Up @@ -323,20 +347,24 @@ std::unique_ptr<Base> make_row_number_aggregation();
/**
* @brief Factory to create a RANK aggregation
*
* `RANK` returns a non-nullable column of size_type "ranks": the number of rows preceding or
* equal to the current row plus one. As a result, ranks are not unique and gaps will appear in
* the ranking sequence.
* `RANK` returns a column of size_type or double "ranks" (see note 3 below for how the
* data type is determined) for a given rank method and column order.
* If nulls are excluded, the rank will be null for those rows, otherwise a non-nullable column is
* returned. Double precision column is returned only when percentage!=NONE and when rank method is
* average.
*
* This aggregation only works with "scan" algorithms. The input column into the group or
* ungrouped scan is an orderby column that orders the rows that the aggregate function ranks.
* If rows are ordered by more than one column, the orderby input column should be a struct
* column containing the ordering columns.
*
* Note:
* 1. This method requires that the rows are presorted by the group keys and order_by columns.
* 2. `RANK` aggregations will return a fully valid column regardless of null_handling policy
* specified in the scan.
* 3. `RANK` aggregations are not compatible with exclusive scans.
* 1. This method could work faster with the rows that are presorted by the group keys and order_by
* columns. Though groupby object does not require order_by column to be sorted, groupby rank
* scan aggregation does require the order_by column to be sorted if the keys are sorted.
* 2. `RANK` aggregations are not compatible with exclusive scans.
* 3. All rank methods except AVERAGE method and percentage!=NONE returns size_type column.
* For AVERAGE method and percentage!=NONE, the return type is double column.
*
* @code{.pseudo}
* Example: Consider a motor-racing statistics dataset, containing the following columns:
Expand All @@ -362,123 +390,37 @@ std::unique_ptr<Base> make_row_number_aggregation();
* A grouped rank aggregation scan with:
* groupby column : venue
* input orderby column: time
* Produces the following rank column:
* { 1, 2, 3, 3, 5, 1, 2, 2, 4, 5}
* (This corresponds to the following grouping and `driver` rows:)
* { "HAM", "LEC", "BOT", "NOR", "RIC", "RIC", "NOR", "BOT", "LEC", "PER" }
* <----------silverstone----------->|<-------------monza-------------->
* @endcode
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_rank_aggregation();

/**
* @brief Factory to create a DENSE_RANK aggregation
*
* `DENSE_RANK` returns a non-nullable column of size_type "dense ranks": the preceding unique
* value's rank plus one. As a result, ranks are not unique but there are no gaps in the ranking
* sequence (unlike RANK aggregations).
*
* This aggregation only works with "scan" algorithms. The input column into the group or
* ungrouped scan is an orderby column that orders the rows that the aggregate function ranks.
* If rows are ordered by more than one column, the orderby input column should be a struct
* column containing the ordering columns.
*
* Note:
* 1. This method requires that the rows are presorted by the group keys and order_by columns.
* 2. `DENSE_RANK` aggregations will return a fully valid column regardless of null_handling
* policy specified in the scan.
* 3. `DENSE_RANK` aggregations are not compatible with exclusive scans.
*
* @code{.pseudo}
* Example: Consider a motor-racing statistics dataset, containing the following columns:
* 1. venue: (STRING) Location of the race event
* 2. driver: (STRING) Name of the car driver (abbreviated to 3 characters)
* 3. time: (INT32) Time taken to complete the circuit
*
* For the following presorted data:
* Produces the following rank column for each methods:
* first: { 1, 2, 3, 4, 5, 1, 2, 3, 4, 5}
* average: { 1, 2, 3.5, 3.5, 5, 1, 2.5, 2.5, 4, 5}
* min: { 1, 2, 3, 3, 5, 1, 2, 2, 4, 5}
* max: { 1, 2, 4, 4, 5, 1, 3, 3, 4, 5}
* dense: { 1, 2, 3, 3, 4, 1, 2, 2, 3, 4}
* This corresponds to the following grouping and `driver` rows:
* { "HAM", "LEC", "BOT", "NOR", "RIC", "RIC", "NOR", "BOT", "LEC", "PER" }
* <----------silverstone----------->|<-------------monza-------------->
*
* min rank for each percentage types:
* NONE: { 1, 2, 3, 3, 5, 1, 2, 2, 4, 5 }
* ZERO_NORMALIZED : { 0.16, 0.33, 0.50, 0.50, 0.83, 0.16, 0.33, 0.33, 0.66, 0.83 }
* ONE_NORMALIZED: { 0.00, 0.25, 0.50, 0.50, 1.00, 0.00, 0.25, 0.25, 0.75, 1.00 }
* where count corresponds to the number of rows in the group. @see cudf::rank_percentage
*
* [ // venue, driver, time
* { "silverstone", "HAM" ("hamilton"), 15823},
* { "silverstone", "LEC" ("leclerc"), 15827},
* { "silverstone", "BOT" ("bottas"), 15834}, // <-- Tied for 3rd place.
* { "silverstone", "NOR" ("norris"), 15834}, // <-- Tied for 3rd place.
* { "silverstone", "RIC" ("ricciardo"), 15905},
* { "monza", "RIC" ("ricciardo"), 12154},
* { "monza", "NOR" ("norris"), 12156}, // <-- Tied for 2nd place.
* { "monza", "BOT" ("bottas"), 12156}, // <-- Tied for 2nd place.
* { "monza", "LEC" ("leclerc"), 12201},
* { "monza", "PER" ("perez"), 12203}
* ]
*
* A grouped dense rank aggregation scan with:
* groupby column : venue
* input orderby column: time
* Produces the following dense rank column:
* { 1, 2, 3, 3, 4, 1, 2, 2, 3, 4}
* (This corresponds to the following grouping and `driver` rows:)
* { "HAM", "LEC", "BOT", "NOR", "RIC", "RIC", "NOR", "BOT", "LEC", "PER" }
* <----------silverstone----------->|<-------------monza-------------->
* @endcode
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_dense_rank_aggregation();

/**
* @brief Factory to create a PERCENT_RANK aggregation
*
* `PERCENT_RANK` returns a non-nullable column of double precision "fractional" ranks.
* For row index `i`, the percent rank of row `i` is defined as:
* percent_rank = (rank - 1) / (group_row_count - 1)
* where,
* 1. rank is the `RANK` of the row within the group
* 2. group_row_count is the number of rows in the group
*
* This aggregation only works with "scan" algorithms. The input to the grouped or
* ungrouped scan is an orderby column that orders the rows that the aggregate function ranks.
* If rows are ordered by more than one column, the orderby input column should be a struct
* column containing the ordering columns.
*
* Note:
* 1. This method requires that the rows are presorted by the group keys and order_by columns.
* 2. `PERCENT_RANK` aggregations will return a fully valid column regardless of null_handling
* policy specified in the scan.
* 3. `PERCENT_RANK` aggregations are not compatible with exclusive scans.
*
* @code{.pseudo}
* Example: Consider a motor-racing statistics dataset, containing the following columns:
* 1. venue: (STRING) Location of the race event
* 2. driver: (STRING) Name of the car driver (abbreviated to 3 characters)
* 3. time: (INT32) Time taken to complete the circuit
*
* For the following presorted data:
*
* [ // venue, driver, time
* { "silverstone", "HAM" ("hamilton"), 15823},
* { "silverstone", "LEC" ("leclerc"), 15827},
* { "silverstone", "BOT" ("bottas"), 15834}, // <-- Tied for 3rd place.
* { "silverstone", "NOR" ("norris"), 15834}, // <-- Tied for 3rd place.
* { "silverstone", "RIC" ("ricciardo"), 15905},
* { "monza", "RIC" ("ricciardo"), 12154},
* { "monza", "NOR" ("norris"), 12156}, // <-- Tied for 2nd place.
* { "monza", "BOT" ("bottas"), 12156}, // <-- Tied for 2nd place.
* { "monza", "LEC" ("leclerc"), 12201},
* { "monza", "PER" ("perez"), 12203}
* ]
*
* A grouped percent rank aggregation scan with:
* groupby column : venue
* input orderby column: time
* Produces the following percent rank column:
* { 0.00, 0.25, 0.50, 0.50, 1.00, 0.00, 0.25, 0.25, 0.75, 1.00 }
*
* (This corresponds to the following grouping and `driver` rows:)
* { "HAM", "LEC", "BOT", "NOR", "RIC", "RIC", "NOR", "BOT", "LEC", "PER" }
* <----------silverstone----------->|<-------------monza-------------->
* @endcode
* @param method The ranking method used for tie breaking (same values).
* @param column_order The desired sort order for ranking
* @param null_handling flag to include nulls during ranking. If nulls are not included,
* the corresponding rank will be null.
* @param null_precedence The desired order of null compared to other elements for column
* @param percentage enum to denote the type of conversion of ranks to percentage in range (0,1]
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_percent_rank_aggregation();
std::unique_ptr<Base> make_rank_aggregation(rank_method method,
order column_order = order::ASCENDING,
null_policy null_handling = null_policy::EXCLUDE,
null_order null_precedence = null_order::AFTER,
rank_percentage percentage = rank_percentage::NONE);

/**
* @brief Factory to create a COLLECT_LIST aggregation
Expand Down
Loading

0 comments on commit 9ac2477

Please sign in to comment.