Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Implement groupby.sample #12882

Merged
merged 21 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion python/cudf/benchmarks/API/bench_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2023, NVIDIA CORPORATION.

"""Benchmarks of DataFrame methods."""

Expand Down Expand Up @@ -104,6 +104,30 @@ def bench_groupby_agg(benchmark, dataframe, agg, num_key_cols, as_index, sort):
benchmark(dataframe.groupby(by=by, as_index=as_index, sort=sort).agg, agg)


@benchmark_with_object(cls="dataframe", dtype="int", nulls=False, cols=6)
@pytest.mark.parametrize(
"num_key_cols",
[2, 3, 4],
)
@pytest.mark.parametrize("use_frac", [True, False])
@pytest.mark.parametrize("replace", [True, False])
@pytest.mark.parametrize("target_sample_frac", [0.1, 0.5, 1])
def bench_groupby_sample(
benchmark, dataframe, num_key_cols, use_frac, replace, target_sample_frac
):
grouper = dataframe.groupby(by=list(dataframe.columns[:num_key_cols]))
if use_frac:
kwargs = {"frac": target_sample_frac, "replace": replace}
else:
minsize = grouper.size().min()
target_size = numpy.round(
target_sample_frac * minsize, decimals=0
).astype(int)
kwargs = {"n": target_size, "replace": replace}

benchmark(grouper.sample, **kwargs)


@benchmark_with_object(cls="dataframe", dtype="int")
@pytest.mark.parametrize("num_cols_to_sort", [1])
def bench_sort_values(benchmark, dataframe, num_cols_to_sort):
Expand Down
9 changes: 8 additions & 1 deletion python/cudf/cudf/_lib/cpp/sorting.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
Expand Down Expand Up @@ -38,3 +38,10 @@ cdef extern from "cudf/sorting.hpp" namespace "cudf" nogil:
const table_view& table,
vector[libcudf_types.order] column_order,
vector[libcudf_types.null_order] null_precedence) except +

cdef unique_ptr[table] segmented_sort_by_key(
const table_view& values,
const table_view& keys,
const column_view& segment_offsets,
vector[libcudf_types.order] column_order,
vector[libcudf_types.null_order] null_precedence) except +
67 changes: 65 additions & 2 deletions python/cudf/cudf/_lib/sort.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from cudf.core.buffer import acquire_spill_lock

Expand All @@ -18,11 +18,13 @@ from cudf._lib.cpp.search cimport lower_bound, upper_bound
from cudf._lib.cpp.sorting cimport (
is_sorted as cpp_is_sorted,
rank,
segmented_sort_by_key as cpp_segmented_sort_by_key,
sorted_order,
)
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport null_order, null_policy, order
from cudf._lib.utils cimport table_view_from_columns
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns


@acquire_spill_lock()
Expand Down Expand Up @@ -143,6 +145,67 @@ def order_by(list columns_from_table, object ascending, str na_position):
return Column.from_unique_ptr(move(c_result))


def segmented_sort_by_key(
list values,
list keys,
Column segment_offsets,
list column_order=None,
list null_precedence=None,
):
"""
Sort segments of a table by given keys

Parameters
----------
values : list[Column]
Columns of the table which will be sorted
keys : list[Column]
Columns making up the sort key
offsets : Column
Segment offsets
column_order : list[bool], optional
Sequence of boolean values which correspond to each column in
keys providing the sort order (default all True).
With True <=> ascending; False <=> descending.
null_precedence : list[str], optional
Sequence of "first" or "last" values (default "first")
indicating the position of null values when sorting the keys.

Returns
-------
list[Column]
list of value columns sorted by keys
"""
cdef table_view values_view = table_view_from_columns(values)
cdef table_view keys_view = table_view_from_columns(keys)
cdef column_view offsets_view = segment_offsets.view()
cdef vector[order] c_column_order
cdef vector[null_order] c_null_precedence
cdef unique_ptr[table] result
ncol = len(values)
column_order = column_order or [True] * ncol,
null_precedence = null_precedence or ["first"] * ncol,
for asc, null in zip(column_order, null_precedence):
c_column_order.push_back(order.ASCENDING if asc else order.DESCENDING)
if asc ^ (null == "first"):
c_null_precedence.push_back(null_order.AFTER)
elif asc ^ (null == "last"):
c_null_precedence.push_back(null_order.BEFORE)
else:
raise ValueError(f"Invalid null precedence {null}")
with nogil:
result = move(
cpp_segmented_sort_by_key(
values_view,
keys_view,
offsets_view,
c_column_order,
c_null_precedence,
)
)
return columns_from_unique_ptr(move(result))


@acquire_spill_lock()
def digitize(list source_columns, list bins, bool right=False):
"""
Expand Down
136 changes: 133 additions & 3 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import warnings
from collections import abc
from functools import cached_property
from typing import Any, Iterable, List, Tuple, Union
from typing import Any, Iterable, List, Optional, Tuple, Union

import cupy as cp
import numpy as np
Expand All @@ -16,6 +16,8 @@
from cudf._lib import groupby as libgroupby
from cudf._lib.null_mask import bitmask_or
from cudf._lib.reshape import interleave_columns
from cudf._lib.sort import segmented_sort_by_key
from cudf._lib.types import size_type_dtype
from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType
from cudf.api.types import is_list_like
from cudf.core.abc import Serializable
Expand Down Expand Up @@ -637,7 +639,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool):
# aggregation scheme in libcudf. This is probably "fast
# enough" for most reasonable input sizes.
_, offsets, _, group_values = self._grouped()
group_offsets = np.asarray(offsets, dtype=np.int32)
group_offsets = np.asarray(offsets, dtype=size_type_dtype)
size_per_group = np.diff(group_offsets)
# "Out of bounds" n for the group size either means no entries
# (negative) or all the entries (positive)
Expand All @@ -651,7 +653,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool):
group_offsets = group_offsets[:-1]
else:
group_offsets = group_offsets[1:] - size_per_group
to_take = np.arange(size_per_group.sum(), dtype=np.int32)
to_take = np.arange(size_per_group.sum(), dtype=size_type_dtype)
fixup = np.empty_like(size_per_group)
fixup[0] = 0
np.cumsum(size_per_group[:-1], out=fixup[1:])
Expand Down Expand Up @@ -870,6 +872,134 @@ def ngroup(self, ascending=True):
group_ids._index = index
return self._broadcast(group_ids)

def sample(
self,
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
weights: Union[abc.Sequence, "cudf.Series", None] = None,
random_state: Union[np.random.RandomState, int, None] = None,
):
"""Return a random sample of items in each group.

Parameters
----------
n
Number of items to return for each group, if sampling
without replacement must be at most the size of the
smallest group. Cannot be used with frac. Default is
wence- marked this conversation as resolved.
Show resolved Hide resolved
``n=1`` if frac is None.
frac
Fraction of items to return. Cannot be used with n.
replace
Should sampling occur with or without replacement?
weights
Sampling probability for each element. Must be the same
length as the grouped frame. Not currently supported.
random_state
Seed for random number generation.

Returns
-------
New dataframe or series with samples of appropriate size drawn
from each group.

"""
if weights is not None:
# To implement this case again needs different algorithms
# in both cases.
#
# Without replacement, use the weighted reservoir sampling
# approach of Efraimidas and Spirakis (2006)
# https://doi.org/10.1016/j.ipl.2005.11.003, essentially,
# do a segmented argsort sorting on weight-scaled
# logarithmic deviates. See
# https://timvieira.github.io/blog/post/
# 2019/09/16/algorithms-for-sampling-without-replacement/
#
# With replacement is trickier, one might be able to use
# the alias method, otherwise we're back to bucketed
# rejection sampling.
raise NotImplementedError("Sampling with weights is not supported")
if frac is not None and n is not None:
raise ValueError("Cannot supply both of frac and n")
elif n is None and frac is None:
n = 1
elif frac is not None and not (0 <= frac <= 1):
raise ValueError(
"Sampling with fraction must provide fraction in "
f"[0, 1], got {frac=}"
)
# TODO: handle random states properly.
if random_state is not None and not isinstance(random_state, int):
raise NotImplementedError(
"Only integer seeds are supported for random_state "
"in this case"
)
# Get the groups
# TODO: convince Cython to convert the std::vector offsets
# into a numpy array directly, rather than a list.
# TODO: this uses the sort-based groupby, could one use hash-based?
_, offsets, _, group_values = self._grouped()
group_offsets = np.asarray(offsets, dtype=size_type_dtype)
size_per_group = np.diff(group_offsets)
if n is not None:
samples_per_group = np.broadcast_to(
size_type_dtype(n), size_per_group.shape
)
if not replace and (minsize := size_per_group.min()) < n:
raise ValueError(
f"Cannot sample {n=} without replacement, "
f"smallest group is {minsize}"
)
else:
# Pandas uses round-to-nearest, ties to even to
# pick sample sizes for the fractional case (unlike IEEE
# which is round-to-nearest, ties to sgn(x) * inf).
samples_per_group = np.round(
size_per_group * frac, decimals=0
).astype(size_type_dtype)
if replace:
# We would prefer to use cupy here, but their rng.integers
# interface doesn't take array-based low and high
# arguments.
low = 0
high = np.repeat(size_per_group, samples_per_group)
rng = np.random.default_rng(seed=random_state)
indices = rng.integers(low, high, dtype=size_type_dtype)
indices += np.repeat(group_offsets[:-1], samples_per_group)
else:
# Approach: do a segmented argsort of the index array and take
# the first samples_per_group entries from sorted array.
# We will shuffle the group indices and then pick them out
# from the grouped dataframe index.
nrows = len(group_values)
indices = cp.arange(nrows, dtype=size_type_dtype)
if len(size_per_group) < 500:
# Empirically shuffling with cupy is faster at this scale
wence- marked this conversation as resolved.
Show resolved Hide resolved
rs = cp.random.get_random_state()
rs.seed(seed=random_state)
for off, size in zip(group_offsets, size_per_group):
rs.shuffle(indices[off : off + size])
else:
rng = cp.random.default_rng(seed=random_state)
(indices,) = segmented_sort_by_key(
[as_column(indices)],
[as_column(rng.random(size=nrows))],
as_column(group_offsets),
[],
[],
)
indices = cp.asarray(indices.data_array_view(mode="read"))
# Which indices are we going to want?
want = np.arange(samples_per_group.sum(), dtype=size_type_dtype)
scan = np.empty_like(samples_per_group)
scan[0] = 0
np.cumsum(samples_per_group[:-1], out=scan[1:])
want += np.repeat(group_offsets[:-1] - scan, samples_per_group)
indices = indices[want]
return group_values.iloc[indices]

def serialize(self):
header = {}
frames = []
Expand Down
Loading