Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Implement groupby.sample #12882

Merged
merged 21 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion python/cudf/cudf/_lib/cpp/sorting.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libcpp cimport bool
from libcpp.memory cimport unique_ptr
Expand Down Expand Up @@ -38,3 +38,10 @@ cdef extern from "cudf/sorting.hpp" namespace "cudf" nogil:
const table_view& table,
vector[libcudf_types.order] column_order,
vector[libcudf_types.null_order] null_precedence) except +

cdef unique_ptr[table] segmented_sort_by_key(
const table_view& values,
const table_view& keys,
const column_view& segment_offsets,
vector[libcudf_types.order] column_order,
vector[libcudf_types.null_order] null_precedence) except +
70 changes: 68 additions & 2 deletions python/cudf/cudf/_lib/sort.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from cudf.core.buffer import acquire_spill_lock

Expand All @@ -18,11 +18,13 @@ from cudf._lib.cpp.search cimport lower_bound, upper_bound
from cudf._lib.cpp.sorting cimport (
is_sorted as cpp_is_sorted,
rank,
segmented_sort_by_key as cpp_segmented_sort_by_key,
sorted_order,
)
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport null_order, null_policy, order
from cudf._lib.utils cimport table_view_from_columns
from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns


@acquire_spill_lock()
Expand Down Expand Up @@ -143,6 +145,70 @@ def order_by(list columns_from_table, object ascending, str na_position):
return Column.from_unique_ptr(move(c_result))


def segmented_sort_by_key(
list values,
list keys,
Column segment_offsets,
list column_order=None,
list null_precedence=None,
):
"""
Sort segments of a table by given keys

Parameters
----------
values : list[Column]
Columns of the table which will be sorted
keys : list[Column]
Columns making up the sort key
offsets : Column
Segment offsets
column_order : list[bool], optional
Sequence of boolean values which correspond to each column in
keys providing the sort order (default all True).
With True <=> ascending; False <=> descending.
null_precedence : list[str], optional
Sequence of "first" or "last" values (default "first")
indicating the position of null values when sorting the keys.

Returns
-------
list[Column]
list of value columns sorted by keys
"""
cdef table_view values_view = table_view_from_columns(values)
cdef table_view keys_view = table_view_from_columns(keys)
cdef column_view offsets_view = segment_offsets.view()
cdef vector[order] c_column_order
cdef vector[null_order] c_null_precedence
cdef unique_ptr[table] result
ncol = len(values)
column_order = column_order or [True] * ncol,
null_precedence = null_precedence or ["first"] * ncol,
for asc, null in zip(column_order, null_precedence):
if asc:
c_column_order.push_back(order.ASCENDING)
else:
c_column_order.push_back(order.DESCENDING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if asc:
c_column_order.push_back(order.ASCENDING)
else:
c_column_order.push_back(order.DESCENDING)
c_column_order.push_back(order.ASCENDING if asc else order.DESCENDING)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was just copying the previous order_by implementation, but done.

if asc ^ (null == "first"):
c_null_precedence.push_back(null_order.AFTER)
elif asc ^ (null == "last"):
c_null_precedence.push_back(null_order.BEFORE)
else:
raise ValueError(f"Invalid null precedence {null}")
with nogil:
result = move(
cpp_segmented_sort_by_key(
values_view,
keys_view,
offsets_view,
c_column_order,
c_null_precedence,
)
)
return columns_from_unique_ptr(move(result))


@acquire_spill_lock()
def digitize(list source_columns, list bins, bool right=False):
"""
Expand Down
122 changes: 121 additions & 1 deletion python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import warnings
from collections import abc
from functools import cached_property
from typing import Any, Iterable, List, Tuple, Union
from typing import Any, Iterable, List, Optional, Tuple, Union

import cupy as cp
import numpy as np
Expand All @@ -16,6 +16,7 @@
from cudf._lib import groupby as libgroupby
from cudf._lib.null_mask import bitmask_or
from cudf._lib.reshape import interleave_columns
from cudf._lib.sort import segmented_sort_by_key
from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType
from cudf.api.types import is_list_like
from cudf.core.abc import Serializable
Expand Down Expand Up @@ -699,6 +700,125 @@ def ngroup(self, ascending=True):
group_ids._index = index
return self._broadcast(group_ids)

def sample(
self,
n: Optional[int] = None,
frac: Optional[float] = None,
replace: bool = False,
weights: Union[abc.Sequence, "cudf.Series", None] = None,
random_state: Union[np.random.RandomState, int, None] = None,
):
"""Return a random sample of items in each group.

Parameters
----------
n
Number of items to return for each group, if sampling
without replacement must be at most the size of the
smallest group. Cannot be used with frac. Default is
wence- marked this conversation as resolved.
Show resolved Hide resolved
``n=1`` if frac is None.
frac
Fraction of items to return. Cannot be used with n.
replace
Should sampling occur with or without replacement?
weights
Sampling probability for each element. Must be the same
length as the grouped frame. Not currently supported.
random_state
Seed for random number generation.

Returns
-------
New dataframe or series with samples of appropriate size drawn
from each group
wence- marked this conversation as resolved.
Show resolved Hide resolved

"""
if weights is not None:
raise NotImplementedError(
"Sorry, sampling with weights is not supported"
wence- marked this conversation as resolved.
Show resolved Hide resolved
)
# Can't wait for match/case
wence- marked this conversation as resolved.
Show resolved Hide resolved
if frac is not None and n is not None:
raise ValueError("Cannot supply both of frac and n")
elif n is None and frac is None:
n = 1
elif frac is not None and not (0 <= frac <= 1):
raise ValueError(
"Sampling with fraction must provide fraction in "
f"[0, 1], got {frac=}"
)
# TODO: handle random states properly.
if random_state is not None and not isinstance(random_state, int):
raise NotImplementedError(
"Sorry, only integer seeds are supported for random_state "
"in this case"
)
# Get the groups
try:
_, (index,), group_offsets = self._groupby.groups(
[*self.obj._index._columns] # type: ignore
)
except ValueError:
raise NotImplementedError(
"Sorry groupby.sample with multiindex not implemented"
)
group_offsets = np.asarray(group_offsets).astype(np.int32)
wence- marked this conversation as resolved.
Show resolved Hide resolved
size_per_group = np.diff(group_offsets)
if n is not None:
samples_per_group = np.broadcast_to(
np.int32(n), size_per_group.shape
)
if not replace and (minsize := size_per_group.min()) < n:
raise ValueError(
f"Cannot sample {n=} without replacement, "
f"smallest group is {minsize}"
)
else:
# Pandas uses round-to-nearest, ties to even to
# pick sample sizes for the fractional case (unlike IEEE
# which is round-to-nearest, ties to sgn(x) * inf).
samples_per_group = np.round(
size_per_group * frac, decimals=0
).astype(np.int32)
if replace:
# Use numpy to do all-at-once generation of the
# We can use numpy in this case, which is fast for lots of
# groups and large samples
wence- marked this conversation as resolved.
Show resolved Hide resolved
lo = 0
hi = np.repeat(size_per_group, samples_per_group)
rng = np.random.default_rng(seed=random_state)
# Would be nice to use cupy here, but their rng.integers
# interface doesn't take array lo and hi arguments.
indices = rng.integers(lo, hi, dtype=np.int32).reshape(-1)
indices += np.repeat(group_offsets[:-1], samples_per_group)
else:
# Approach: do a segmented argsort of the index array and take
# the first samples_per_group entries from sorted array.
if len(size_per_group) < 500:
# Empirically shuffling with cupy is faster at this scale
wence- marked this conversation as resolved.
Show resolved Hide resolved
indices = cp.asarray(index.data_array_view(mode="read"))
rs = cp.random.get_random_state()
rs.seed(seed=random_state)
for off, size in zip(group_offsets, size_per_group):
rs.shuffle(indices[off : off + size])
else:
rng = cp.random.default_rng(seed=random_state)
offsets_col = as_column(group_offsets)
values = [index]
keys = [as_column(rng.random(size=len(index)))]
(indices,) = segmented_sort_by_key(
values, keys, offsets_col, [], []
)
indices = cp.asarray(indices.data_array_view(mode="read"))
# Which indices are we going to want?
mask = np.zeros(len(index), dtype=bool)
for offset, sample_size in zip(group_offsets, samples_per_group):
mask[offset : offset + sample_size] = True
indices = indices[mask]
wence- marked this conversation as resolved.
Show resolved Hide resolved
# This is the index into the original dataframe ordered by the groups
index = cp.asarray(index.data_array_view(mode="read"))
return self.obj.iloc[index[indices]]

def serialize(self):
header = {}
frames = []
Expand Down
71 changes: 71 additions & 0 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.

import collections
import datetime
import itertools
import textwrap
Expand Down Expand Up @@ -2972,3 +2973,73 @@ def test_groupby_dtypes(groups):
pdf = df.to_pandas()

assert_eq(pdf.groupby(groups).dtypes, df.groupby(groups).dtypes)


class TestSample:
@pytest.fixture(
params=[
["a", "a", "b", "b", "c", "c", "c", "d", "d", "d", "d", "d"],
[1, 1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 4],
],
ids=["str-group", "int-group"],
)
def df(self, request):
return cudf.DataFrame(
{"a": request.param, "b": request.param, "v": request.param}
)
wence- marked this conversation as resolved.
Show resolved Hide resolved

@pytest.fixture(params=["a", ["a", "b"]], ids=["single-col", "two-col"])
def by(self, request):
return request.param

def expected(self, df, *, n=None, frac=None):
value_counts = collections.Counter(df.a.values_host)
if n is not None:
values = list(
itertools.chain.from_iterable(
itertools.repeat(v, n) for v in value_counts.keys()
)
)
elif frac is not None:
values = list(
itertools.chain.from_iterable(
itertools.repeat(v, round(count * frac))
for v, count in value_counts.items()
)
)
else:
raise AssertionError("Invalid usage")
wence- marked this conversation as resolved.
Show resolved Hide resolved
values = cudf.Series(sorted(values), dtype=df.a.dtype)
return cudf.DataFrame({"a": values, "b": values, "v": values})

@pytest.mark.parametrize("n", [None, 0, 1, 2])
def test_constant_n_no_replace(self, df, by, n):
result = df.groupby(by).sample(n=n).sort_values("a")
n = 1 if n is None else n
assert_eq(self.expected(df, n=n), result.reset_index(drop=True))

def test_constant_n_no_replace_too_large_raises(self, df):
with pytest.raises(ValueError):
df.groupby("a").sample(n=3)

@pytest.mark.parametrize("n", [1, 2, 3])
def test_constant_n_replace(self, df, by, n):
result = df.groupby(by).sample(n=n, replace=True).sort_values("a")
assert_eq(self.expected(df, n=n), result.reset_index(drop=True))

def test_invalid_arguments(self, df):
with pytest.raises(ValueError):
df.groupby("a").sample(n=1, frac=0.1)

def test_not_implemented_arguments(self, df):
with pytest.raises(NotImplementedError):
# These are valid weights, but we don't implemented this yet.
wence- marked this conversation as resolved.
Show resolved Hide resolved
df.groupby("a").sample(n=1, weights=[1 / len(df)] * len(df))

@pytest.mark.parametrize("frac", [0, 1 / 3, 1 / 2, 2 / 3, 1])
@pytest.mark.parametrize("replace", [False, True])
def test_fraction_rounding(self, df, by, frac, replace):
result = (
df.groupby(by).sample(frac=frac, replace=replace).sort_values("a")
)
assert_eq(self.expected(df, frac=frac), result.reset_index(drop=True))
wence- marked this conversation as resolved.
Show resolved Hide resolved