Skip to content

Commit

Permalink
Implement IndexedFrame.duplicated with distinct_indices + `scatte…
Browse files Browse the repository at this point in the history
…r` (#14493)

To obtain the duplicate rows in a dataframe we previously performed a drop-duplicates with a carrier column of row indices and then set entries in a boolean column to False for those row indices that remained. Furthermore, we were performing an unnecessary merge after the drop-duplicates call to obtain the row indices.

Note that the carrier column provides exactly the information that is computed internally in `libcudf` by `cudf::detail::get_distinct_indices` (called as part of `cudf::distinct`). We therefore promote `get_distinct_indices` to a public function (as `cudf::distinct_indices`) and replace the (unnecessary) merge plus `iloc`-based setting of the result with a call to `libcudf.copying.scatter`.

This provides a reasonable speedup (around 1.5x) for `duplicated()` on `Series`, and significantly improves performance of `duplicated()` on `DataFrames`, especially when providing a `subset` argument. Previously we would pay the cost in drop-duplicates of moving all columns of the distinct rows to the output table, even though we only actually needed the carrier "indices" column. Now we just obtain those indices directly, `duplicated()` scales only with the number of "active" columns. In some simple benchmarking this is between two and five times faster for tables with 10% distinct rows depending on the number of passive additional columns.

- Closes #14486

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #14493
  • Loading branch information
wence- authored Dec 12, 2023
1 parent f8e891f commit ef11061
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 52 deletions.
23 changes: 7 additions & 16 deletions cpp/include/cudf/detail/stream_compaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,16 @@ std::unique_ptr<table> stable_distinct(table_view const& input,
rmm::mr::device_memory_resource* mr);

/**
* @brief Create a column of indices of all distinct rows in the input table.
* @copydoc cudf::distinct_indices
*
* Given an `input` table_view, an output vector of all row indices of the distinct rows is
* generated. If there are duplicate rows, which index is kept depends on the `keep` parameter.
*
* @param input The input table
* @param keep Get index of any, first, last, or none of the found duplicates
* @param nulls_equal Flag to specify whether null elements should be considered as equal
* @param nans_equal Flag to specify whether NaN elements should be considered as equal
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A device_uvector containing the result indices
*/
rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
rmm::device_uvector<size_type> distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @copydoc cudf::unique_count(column_view const&, null_policy, nan_policy)
Expand Down
23 changes: 23 additions & 0 deletions cpp/include/cudf/stream_compaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <cudf/types.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/mr/device/per_device_resource.hpp>

Expand Down Expand Up @@ -277,6 +278,28 @@ std::unique_ptr<table> distinct(
nan_equality nans_equal = nan_equality::ALL_EQUAL,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create a column of indices of all distinct rows in the input table.
*
* Given an `input` table_view, an output vector of all row indices of the distinct rows is
* generated. If there are duplicate rows, which index is kept depends on the `keep` parameter.
*
* @param input The input table
* @param keep Get index of any, first, last, or none of the found duplicates
* @param nulls_equal Flag to specify whether null elements should be considered as equal
* @param nans_equal Flag to specify whether NaN elements should be considered as equal
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return Column containing the result indices
*/
std::unique_ptr<column> distinct_indices(
table_view const& input,
duplicate_keep_option keep = duplicate_keep_option::KEEP_ANY,
null_equality nulls_equal = null_equality::EQUAL,
nan_equality nans_equal = nan_equality::ALL_EQUAL,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Create a new table without duplicate rows, preserving input order.
*
Expand Down
36 changes: 24 additions & 12 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
namespace cudf {
namespace detail {

rmm::device_uvector<size_type> get_distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
rmm::device_uvector<size_type> distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (input.num_rows() == 0 or input.num_columns() == 0) {
return rmm::device_uvector<size_type>(0, stream, mr);
Expand Down Expand Up @@ -152,12 +152,12 @@ std::unique_ptr<table> distinct(table_view const& input,
return empty_like(input);
}

auto const gather_map = get_distinct_indices(input.select(keys),
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());
auto const gather_map = detail::distinct_indices(input.select(keys),
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());
return detail::gather(input,
gather_map,
out_of_bounds_policy::DONT_CHECK,
Expand All @@ -180,4 +180,16 @@ std::unique_ptr<table> distinct(table_view const& input,
input, keys, keep, nulls_equal, nans_equal, cudf::get_default_stream(), mr);
}

std::unique_ptr<column> distinct_indices(table_view const& input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();
auto indices = detail::distinct_indices(input, keep, nulls_equal, nans_equal, stream, mr);
return std::make_unique<column>(std::move(indices), rmm::device_buffer{}, 0);
}

} // namespace cudf
12 changes: 6 additions & 6 deletions cpp/src/stream_compaction/stable_distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ std::unique_ptr<table> stable_distinct(table_view const& input,
return empty_like(input);
}

auto const distinct_indices = get_distinct_indices(input.select(keys),
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());
auto const distinct_indices = detail::distinct_indices(input.select(keys),
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());

// The only difference between this implementation and the unstable version
// is that the stable implementation must retain the input order. The
Expand Down
9 changes: 9 additions & 0 deletions python/cudf/cudf/_lib/cpp/stream_compaction.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ from libcpp.vector cimport vector

from cudf._lib.types import cudf_to_np_types, np_to_cudf_types

from cudf._lib.cpp.column.column cimport column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport (
nan_equality,
nan_policy,
null_equality,
null_policy,
Expand Down Expand Up @@ -45,3 +47,10 @@ cdef extern from "cudf/stream_compaction.hpp" namespace "cudf" \
duplicate_keep_option keep,
null_equality nulls_equal,
) except +

cdef unique_ptr[column] distinct_indices(
table_view input,
duplicate_keep_option keep,
null_equality nulls_equal,
nan_equality nans_equal,
) except +
65 changes: 65 additions & 0 deletions python/cudf/cudf/_lib/stream_compaction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@ from libcpp.utility cimport move
from libcpp.vector cimport vector

from cudf._lib.column cimport Column
from cudf._lib.cpp.column.column cimport column
from cudf._lib.cpp.column.column_view cimport column_view
from cudf._lib.cpp.stream_compaction cimport (
apply_boolean_mask as cpp_apply_boolean_mask,
distinct_count as cpp_distinct_count,
distinct_indices as cpp_distinct_indices,
drop_nulls as cpp_drop_nulls,
duplicate_keep_option,
stable_distinct as cpp_stable_distinct,
)
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport (
nan_equality,
nan_policy,
null_equality,
null_policy,
Expand Down Expand Up @@ -160,6 +163,68 @@ def drop_duplicates(list columns,
return columns_from_unique_ptr(move(c_result))


@acquire_spill_lock()
def distinct_indices(
list columns,
object keep="first",
bool nulls_equal=True,
bool nans_equal=True,
):
"""
Return indices of the distinct rows in a table.
Parameters
----------
columns : list of columns to check for duplicates
keep : treat "first", "last", or (False) none of any duplicate
rows as distinct
nulls_equal : Should nulls compare equal
nans_equal: Should nans compare equal
Returns
-------
Column of indices
See Also
--------
drop_duplicates
"""
cdef duplicate_keep_option cpp_keep_option

if keep == 'first':
cpp_keep_option = duplicate_keep_option.KEEP_FIRST
elif keep == 'last':
cpp_keep_option = duplicate_keep_option.KEEP_LAST
elif keep is False:
cpp_keep_option = duplicate_keep_option.KEEP_NONE
else:
raise ValueError('keep must be either "first", "last", or False')

# shifting the index number by number of index columns
cdef null_equality cpp_nulls_equal = (
null_equality.EQUAL
if nulls_equal
else null_equality.UNEQUAL
)
cdef nan_equality cpp_nans_equal = (
nan_equality.ALL_EQUAL
if nans_equal
else nan_equality.NANS_UNEQUAL
)
cdef table_view source = table_view_from_columns(columns)
cdef unique_ptr[column] c_result
with nogil:
c_result = move(
cpp_distinct_indices(
source,
cpp_keep_option,
cpp_nulls_equal,
cpp_nans_equal,
)
)
return Column.from_unique_ptr(move(c_result))


@acquire_spill_lock()
def distinct_count(Column source_column, ignore_nulls=True, nan_as_null=False):
"""
Expand Down
28 changes: 10 additions & 18 deletions python/cudf/cudf/core/indexed_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import cudf
import cudf._lib as libcudf
from cudf._lib.types import size_type_dtype
from cudf._typing import (
ColumnLike,
DataFrameOrSeries,
Expand Down Expand Up @@ -2089,26 +2088,19 @@ def duplicated(self, subset=None, keep="first"):
subset = self._preprocess_subset(subset)

if isinstance(self, cudf.Series):
df = self.to_frame(name="None")
subset = ["None"]
columns = [self._column]
else:
df = self.copy(deep=False)
df._data["index"] = cudf.core.column.arange(
0, len(self), dtype=size_type_dtype
columns = [self._data[n] for n in subset]
distinct = libcudf.stream_compaction.distinct_indices(
columns, keep=keep
)

new_df = df.drop_duplicates(subset=subset, keep=keep)
idx = df.merge(new_df, how="inner")["index"]
s = cudf.Series._from_data(
{
None: cudf.core.column.full(
size=len(self), fill_value=True, dtype="bool"
)
},
index=self.index,
(result,) = libcudf.copying.scatter(
[cudf.Scalar(False, dtype=bool)],
distinct,
[full(len(self), True, dtype=bool)],
bounds_check=False,
)
s.iloc[idx] = False
return s
return cudf.Series(result, index=self.index)

@_cudf_nvtx_annotate
def _empty_like(self, keep_index=True) -> Self:
Expand Down

0 comments on commit ef11061

Please sign in to comment.