From ef11061911aa9ef77cf615fea042a2bfa6f6cdea Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Tue, 12 Dec 2023 13:50:32 +0000 Subject: [PATCH] Implement `IndexedFrame.duplicated` with `distinct_indices` + `scatter` (#14493) To obtain the duplicate rows in a dataframe we previously performed a drop-duplicates with a carrier column of row indices and then set entries in a boolean column to False for those row indices that remained. Furthermore, we were performing an unnecessary merge after the drop-duplicates call to obtain the row indices. Note that the carrier column provides exactly the information that is computed internally in `libcudf` by `cudf::detail::get_distinct_indices` (called as part of `cudf::distinct`). We therefore promote `get_distinct_indices` to a public function (as `cudf::distinct_indices`) and replace the (unnecessary) merge plus `iloc`-based setting of the result with a call to `libcudf.copying.scatter`. This provides a reasonable speedup (around 1.5x) for `duplicated()` on `Series`, and significantly improves performance of `duplicated()` on `DataFrames`, especially when providing a `subset` argument. Previously we would pay the cost in drop-duplicates of moving all columns of the distinct rows to the output table, even though we only actually needed the carrier "indices" column. Now we just obtain those indices directly, `duplicated()` scales only with the number of "active" columns. In some simple benchmarking this is between two and five times faster for tables with 10% distinct rows depending on the number of passive additional columns. - Closes #14486 Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Bradley Dice (https://github.com/bdice) - Karthikeyan (https://github.com/karthikeyann) URL: https://github.com/rapidsai/cudf/pull/14493 --- cpp/include/cudf/detail/stream_compaction.hpp | 23 ++----- cpp/include/cudf/stream_compaction.hpp | 23 +++++++ cpp/src/stream_compaction/distinct.cu | 36 ++++++---- cpp/src/stream_compaction/stable_distinct.cu | 12 ++-- .../cudf/cudf/_lib/cpp/stream_compaction.pxd | 9 +++ python/cudf/cudf/_lib/stream_compaction.pyx | 65 +++++++++++++++++++ python/cudf/cudf/core/indexed_frame.py | 28 +++----- 7 files changed, 144 insertions(+), 52 deletions(-) diff --git a/cpp/include/cudf/detail/stream_compaction.hpp b/cpp/include/cudf/detail/stream_compaction.hpp index 5476000fc29..7f366c06a1c 100644 --- a/cpp/include/cudf/detail/stream_compaction.hpp +++ b/cpp/include/cudf/detail/stream_compaction.hpp @@ -99,25 +99,16 @@ std::unique_ptr stable_distinct(table_view const& input, rmm::mr::device_memory_resource* mr); /** - * @brief Create a column of indices of all distinct rows in the input table. + * @copydoc cudf::distinct_indices * - * Given an `input` table_view, an output vector of all row indices of the distinct rows is - * generated. If there are duplicate rows, which index is kept depends on the `keep` parameter. - * - * @param input The input table - * @param keep Get index of any, first, last, or none of the found duplicates - * @param nulls_equal Flag to specify whether null elements should be considered as equal - * @param nans_equal Flag to specify whether NaN elements should be considered as equal - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the returned vector * @return A device_uvector containing the result indices */ -rmm::device_uvector get_distinct_indices(table_view const& input, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr); +rmm::device_uvector distinct_indices(table_view const& input, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr); /** * @copydoc cudf::unique_count(column_view const&, null_policy, nan_policy) diff --git a/cpp/include/cudf/stream_compaction.hpp b/cpp/include/cudf/stream_compaction.hpp index 984e3037cd1..3e7bdf13707 100644 --- a/cpp/include/cudf/stream_compaction.hpp +++ b/cpp/include/cudf/stream_compaction.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include #include @@ -277,6 +278,28 @@ std::unique_ptr
distinct( nan_equality nans_equal = nan_equality::ALL_EQUAL, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); +/** + * @brief Create a column of indices of all distinct rows in the input table. + * + * Given an `input` table_view, an output vector of all row indices of the distinct rows is + * generated. If there are duplicate rows, which index is kept depends on the `keep` parameter. + * + * @param input The input table + * @param keep Get index of any, first, last, or none of the found duplicates + * @param nulls_equal Flag to specify whether null elements should be considered as equal + * @param nans_equal Flag to specify whether NaN elements should be considered as equal + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned vector + * @return Column containing the result indices + */ +std::unique_ptr distinct_indices( + table_view const& input, + duplicate_keep_option keep = duplicate_keep_option::KEEP_ANY, + null_equality nulls_equal = null_equality::EQUAL, + nan_equality nans_equal = nan_equality::ALL_EQUAL, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Create a new table without duplicate rows, preserving input order. * diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index 7adce5d3cbc..b867df1565a 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -40,12 +40,12 @@ namespace cudf { namespace detail { -rmm::device_uvector get_distinct_indices(table_view const& input, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) +rmm::device_uvector distinct_indices(table_view const& input, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) { if (input.num_rows() == 0 or input.num_columns() == 0) { return rmm::device_uvector(0, stream, mr); @@ -152,12 +152,12 @@ std::unique_ptr
distinct(table_view const& input, return empty_like(input); } - auto const gather_map = get_distinct_indices(input.select(keys), - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const gather_map = detail::distinct_indices(input.select(keys), + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); return detail::gather(input, gather_map, out_of_bounds_policy::DONT_CHECK, @@ -180,4 +180,16 @@ std::unique_ptr
distinct(table_view const& input, input, keys, keep, nulls_equal, nans_equal, cudf::get_default_stream(), mr); } +std::unique_ptr distinct_indices(table_view const& input, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + auto indices = detail::distinct_indices(input, keep, nulls_equal, nans_equal, stream, mr); + return std::make_unique(std::move(indices), rmm::device_buffer{}, 0); +} + } // namespace cudf diff --git a/cpp/src/stream_compaction/stable_distinct.cu b/cpp/src/stream_compaction/stable_distinct.cu index 45a2de9288b..63167b45b2d 100644 --- a/cpp/src/stream_compaction/stable_distinct.cu +++ b/cpp/src/stream_compaction/stable_distinct.cu @@ -40,12 +40,12 @@ std::unique_ptr
stable_distinct(table_view const& input, return empty_like(input); } - auto const distinct_indices = get_distinct_indices(input.select(keys), - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); + auto const distinct_indices = detail::distinct_indices(input.select(keys), + keep, + nulls_equal, + nans_equal, + stream, + rmm::mr::get_current_device_resource()); // The only difference between this implementation and the unstable version // is that the stable implementation must retain the input order. The diff --git a/python/cudf/cudf/_lib/cpp/stream_compaction.pxd b/python/cudf/cudf/_lib/cpp/stream_compaction.pxd index bba2d1ffb7c..aef2f639d76 100644 --- a/python/cudf/cudf/_lib/cpp/stream_compaction.pxd +++ b/python/cudf/cudf/_lib/cpp/stream_compaction.pxd @@ -6,10 +6,12 @@ from libcpp.vector cimport vector from cudf._lib.types import cudf_to_np_types, np_to_cudf_types +from cudf._lib.cpp.column.column cimport column from cudf._lib.cpp.column.column_view cimport column_view from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport ( + nan_equality, nan_policy, null_equality, null_policy, @@ -45,3 +47,10 @@ cdef extern from "cudf/stream_compaction.hpp" namespace "cudf" \ duplicate_keep_option keep, null_equality nulls_equal, ) except + + + cdef unique_ptr[column] distinct_indices( + table_view input, + duplicate_keep_option keep, + null_equality nulls_equal, + nan_equality nans_equal, + ) except + diff --git a/python/cudf/cudf/_lib/stream_compaction.pyx b/python/cudf/cudf/_lib/stream_compaction.pyx index 37387b0e520..9b22728d2f0 100644 --- a/python/cudf/cudf/_lib/stream_compaction.pyx +++ b/python/cudf/cudf/_lib/stream_compaction.pyx @@ -8,10 +8,12 @@ from libcpp.utility cimport move from libcpp.vector cimport vector from cudf._lib.column cimport Column +from cudf._lib.cpp.column.column cimport column from cudf._lib.cpp.column.column_view cimport column_view from cudf._lib.cpp.stream_compaction cimport ( apply_boolean_mask as cpp_apply_boolean_mask, distinct_count as cpp_distinct_count, + distinct_indices as cpp_distinct_indices, drop_nulls as cpp_drop_nulls, duplicate_keep_option, stable_distinct as cpp_stable_distinct, @@ -19,6 +21,7 @@ from cudf._lib.cpp.stream_compaction cimport ( from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport ( + nan_equality, nan_policy, null_equality, null_policy, @@ -160,6 +163,68 @@ def drop_duplicates(list columns, return columns_from_unique_ptr(move(c_result)) +@acquire_spill_lock() +def distinct_indices( + list columns, + object keep="first", + bool nulls_equal=True, + bool nans_equal=True, +): + """ + Return indices of the distinct rows in a table. + + Parameters + ---------- + columns : list of columns to check for duplicates + keep : treat "first", "last", or (False) none of any duplicate + rows as distinct + nulls_equal : Should nulls compare equal + nans_equal: Should nans compare equal + + Returns + ------- + Column of indices + + See Also + -------- + drop_duplicates + """ + cdef duplicate_keep_option cpp_keep_option + + if keep == 'first': + cpp_keep_option = duplicate_keep_option.KEEP_FIRST + elif keep == 'last': + cpp_keep_option = duplicate_keep_option.KEEP_LAST + elif keep is False: + cpp_keep_option = duplicate_keep_option.KEEP_NONE + else: + raise ValueError('keep must be either "first", "last", or False') + + # shifting the index number by number of index columns + cdef null_equality cpp_nulls_equal = ( + null_equality.EQUAL + if nulls_equal + else null_equality.UNEQUAL + ) + cdef nan_equality cpp_nans_equal = ( + nan_equality.ALL_EQUAL + if nans_equal + else nan_equality.NANS_UNEQUAL + ) + cdef table_view source = table_view_from_columns(columns) + cdef unique_ptr[column] c_result + with nogil: + c_result = move( + cpp_distinct_indices( + source, + cpp_keep_option, + cpp_nulls_equal, + cpp_nans_equal, + ) + ) + return Column.from_unique_ptr(move(c_result)) + + @acquire_spill_lock() def distinct_count(Column source_column, ignore_nulls=True, nan_as_null=False): """ diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 86a8e64b213..874cceea9af 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -31,7 +31,6 @@ import cudf import cudf._lib as libcudf -from cudf._lib.types import size_type_dtype from cudf._typing import ( ColumnLike, DataFrameOrSeries, @@ -2089,26 +2088,19 @@ def duplicated(self, subset=None, keep="first"): subset = self._preprocess_subset(subset) if isinstance(self, cudf.Series): - df = self.to_frame(name="None") - subset = ["None"] + columns = [self._column] else: - df = self.copy(deep=False) - df._data["index"] = cudf.core.column.arange( - 0, len(self), dtype=size_type_dtype + columns = [self._data[n] for n in subset] + distinct = libcudf.stream_compaction.distinct_indices( + columns, keep=keep ) - - new_df = df.drop_duplicates(subset=subset, keep=keep) - idx = df.merge(new_df, how="inner")["index"] - s = cudf.Series._from_data( - { - None: cudf.core.column.full( - size=len(self), fill_value=True, dtype="bool" - ) - }, - index=self.index, + (result,) = libcudf.copying.scatter( + [cudf.Scalar(False, dtype=bool)], + distinct, + [full(len(self), True, dtype=bool)], + bounds_check=False, ) - s.iloc[idx] = False - return s + return cudf.Series(result, index=self.index) @_cudf_nvtx_annotate def _empty_like(self, keep_index=True) -> Self: