diff --git a/python/cudf/benchmarks/API/bench_dataframe.py b/python/cudf/benchmarks/API/bench_dataframe.py index 42bfa854396..28777b23583 100644 --- a/python/cudf/benchmarks/API/bench_dataframe.py +++ b/python/cudf/benchmarks/API/bench_dataframe.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. """Benchmarks of DataFrame methods.""" @@ -104,6 +104,30 @@ def bench_groupby_agg(benchmark, dataframe, agg, num_key_cols, as_index, sort): benchmark(dataframe.groupby(by=by, as_index=as_index, sort=sort).agg, agg) +@benchmark_with_object(cls="dataframe", dtype="int", nulls=False, cols=6) +@pytest.mark.parametrize( + "num_key_cols", + [2, 3, 4], +) +@pytest.mark.parametrize("use_frac", [True, False]) +@pytest.mark.parametrize("replace", [True, False]) +@pytest.mark.parametrize("target_sample_frac", [0.1, 0.5, 1]) +def bench_groupby_sample( + benchmark, dataframe, num_key_cols, use_frac, replace, target_sample_frac +): + grouper = dataframe.groupby(by=list(dataframe.columns[:num_key_cols])) + if use_frac: + kwargs = {"frac": target_sample_frac, "replace": replace} + else: + minsize = grouper.size().min() + target_size = numpy.round( + target_sample_frac * minsize, decimals=0 + ).astype(int) + kwargs = {"n": target_size, "replace": replace} + + benchmark(grouper.sample, **kwargs) + + @benchmark_with_object(cls="dataframe", dtype="int") @pytest.mark.parametrize("num_cols_to_sort", [1]) def bench_sort_values(benchmark, dataframe, num_cols_to_sort): diff --git a/python/cudf/cudf/_lib/cpp/sorting.pxd b/python/cudf/cudf/_lib/cpp/sorting.pxd index c6c42c327ac..b210ddf81dd 100644 --- a/python/cudf/cudf/_lib/cpp/sorting.pxd +++ b/python/cudf/cudf/_lib/cpp/sorting.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. from libcpp cimport bool from libcpp.memory cimport unique_ptr @@ -38,3 +38,10 @@ cdef extern from "cudf/sorting.hpp" namespace "cudf" nogil: const table_view& table, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence) except + + + cdef unique_ptr[table] segmented_sort_by_key( + const table_view& values, + const table_view& keys, + const column_view& segment_offsets, + vector[libcudf_types.order] column_order, + vector[libcudf_types.null_order] null_precedence) except + diff --git a/python/cudf/cudf/_lib/sort.pyx b/python/cudf/cudf/_lib/sort.pyx index 3b96cc618dd..3c3f8cabda6 100644 --- a/python/cudf/cudf/_lib/sort.pyx +++ b/python/cudf/cudf/_lib/sort.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. from cudf.core.buffer import acquire_spill_lock @@ -18,11 +18,13 @@ from cudf._lib.cpp.search cimport lower_bound, upper_bound from cudf._lib.cpp.sorting cimport ( is_sorted as cpp_is_sorted, rank, + segmented_sort_by_key as cpp_segmented_sort_by_key, sorted_order, ) +from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport null_order, null_policy, order -from cudf._lib.utils cimport table_view_from_columns +from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns @acquire_spill_lock() @@ -143,6 +145,67 @@ def order_by(list columns_from_table, object ascending, str na_position): return Column.from_unique_ptr(move(c_result)) +def segmented_sort_by_key( + list values, + list keys, + Column segment_offsets, + list column_order=None, + list null_precedence=None, +): + """ + Sort segments of a table by given keys + + Parameters + ---------- + values : list[Column] + Columns of the table which will be sorted + keys : list[Column] + Columns making up the sort key + offsets : Column + Segment offsets + column_order : list[bool], optional + Sequence of boolean values which correspond to each column in + keys providing the sort order (default all True). + With True <=> ascending; False <=> descending. + null_precedence : list[str], optional + Sequence of "first" or "last" values (default "first") + indicating the position of null values when sorting the keys. + + Returns + ------- + list[Column] + list of value columns sorted by keys + """ + cdef table_view values_view = table_view_from_columns(values) + cdef table_view keys_view = table_view_from_columns(keys) + cdef column_view offsets_view = segment_offsets.view() + cdef vector[order] c_column_order + cdef vector[null_order] c_null_precedence + cdef unique_ptr[table] result + ncol = len(values) + column_order = column_order or [True] * ncol + null_precedence = null_precedence or ["first"] * ncol + for asc, null in zip(column_order, null_precedence): + c_column_order.push_back(order.ASCENDING if asc else order.DESCENDING) + if asc ^ (null == "first"): + c_null_precedence.push_back(null_order.AFTER) + elif asc ^ (null == "last"): + c_null_precedence.push_back(null_order.BEFORE) + else: + raise ValueError(f"Invalid null precedence {null}") + with nogil: + result = move( + cpp_segmented_sort_by_key( + values_view, + keys_view, + offsets_view, + c_column_order, + c_null_precedence, + ) + ) + return columns_from_unique_ptr(move(result)) + + @acquire_spill_lock() def digitize(list source_columns, list bins, bool right=False): """ diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 0e671cb6412..1ab1da92bc7 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -6,7 +6,7 @@ import warnings from collections import abc from functools import cached_property -from typing import Any, Iterable, List, Tuple, Union +from typing import Any, Iterable, List, Optional, Tuple, Union import cupy as cp import numpy as np @@ -16,6 +16,8 @@ from cudf._lib import groupby as libgroupby from cudf._lib.null_mask import bitmask_or from cudf._lib.reshape import interleave_columns +from cudf._lib.sort import segmented_sort_by_key +from cudf._lib.types import size_type_dtype from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType from cudf.api.types import is_list_like from cudf.core.abc import Serializable @@ -637,7 +639,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): # aggregation scheme in libcudf. This is probably "fast # enough" for most reasonable input sizes. _, offsets, _, group_values = self._grouped() - group_offsets = np.asarray(offsets, dtype=np.int32) + group_offsets = np.asarray(offsets, dtype=size_type_dtype) size_per_group = np.diff(group_offsets) # "Out of bounds" n for the group size either means no entries # (negative) or all the entries (positive) @@ -651,7 +653,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): group_offsets = group_offsets[:-1] else: group_offsets = group_offsets[1:] - size_per_group - to_take = np.arange(size_per_group.sum(), dtype=np.int32) + to_take = np.arange(size_per_group.sum(), dtype=size_type_dtype) fixup = np.empty_like(size_per_group) fixup[0] = 0 np.cumsum(size_per_group[:-1], out=fixup[1:]) @@ -870,6 +872,134 @@ def ngroup(self, ascending=True): group_ids._index = index return self._broadcast(group_ids) + def sample( + self, + n: Optional[int] = None, + frac: Optional[float] = None, + replace: bool = False, + weights: Union[abc.Sequence, "cudf.Series", None] = None, + random_state: Union[np.random.RandomState, int, None] = None, + ): + """Return a random sample of items in each group. + + Parameters + ---------- + n + Number of items to return for each group, if sampling + without replacement must be at most the size of the + smallest group. Cannot be used with frac. Default is + ``n=1`` if frac is None. + frac + Fraction of items to return. Cannot be used with n. + replace + Should sampling occur with or without replacement? + weights + Sampling probability for each element. Must be the same + length as the grouped frame. Not currently supported. + random_state + Seed for random number generation. + + Returns + ------- + New dataframe or series with samples of appropriate size drawn + from each group. + + """ + if weights is not None: + # To implement this case again needs different algorithms + # in both cases. + # + # Without replacement, use the weighted reservoir sampling + # approach of Efraimidas and Spirakis (2006) + # https://doi.org/10.1016/j.ipl.2005.11.003, essentially, + # do a segmented argsort sorting on weight-scaled + # logarithmic deviates. See + # https://timvieira.github.io/blog/post/ + # 2019/09/16/algorithms-for-sampling-without-replacement/ + # + # With replacement is trickier, one might be able to use + # the alias method, otherwise we're back to bucketed + # rejection sampling. + raise NotImplementedError("Sampling with weights is not supported") + if frac is not None and n is not None: + raise ValueError("Cannot supply both of frac and n") + elif n is None and frac is None: + n = 1 + elif frac is not None and not (0 <= frac <= 1): + raise ValueError( + "Sampling with fraction must provide fraction in " + f"[0, 1], got {frac=}" + ) + # TODO: handle random states properly. + if random_state is not None and not isinstance(random_state, int): + raise NotImplementedError( + "Only integer seeds are supported for random_state " + "in this case" + ) + # Get the groups + # TODO: convince Cython to convert the std::vector offsets + # into a numpy array directly, rather than a list. + # TODO: this uses the sort-based groupby, could one use hash-based? + _, offsets, _, group_values = self._grouped() + group_offsets = np.asarray(offsets, dtype=size_type_dtype) + size_per_group = np.diff(group_offsets) + if n is not None: + samples_per_group = np.broadcast_to( + size_type_dtype.type(n), size_per_group.shape + ) + if not replace and (minsize := size_per_group.min()) < n: + raise ValueError( + f"Cannot sample {n=} without replacement, " + f"smallest group is {minsize}" + ) + else: + # Pandas uses round-to-nearest, ties to even to + # pick sample sizes for the fractional case (unlike IEEE + # which is round-to-nearest, ties to sgn(x) * inf). + samples_per_group = np.round( + size_per_group * frac, decimals=0 + ).astype(size_type_dtype) + if replace: + # We would prefer to use cupy here, but their rng.integers + # interface doesn't take array-based low and high + # arguments. + low = 0 + high = np.repeat(size_per_group, samples_per_group) + rng = np.random.default_rng(seed=random_state) + indices = rng.integers(low, high, dtype=size_type_dtype) + indices += np.repeat(group_offsets[:-1], samples_per_group) + else: + # Approach: do a segmented argsort of the index array and take + # the first samples_per_group entries from sorted array. + # We will shuffle the group indices and then pick them out + # from the grouped dataframe index. + nrows = len(group_values) + indices = cp.arange(nrows, dtype=size_type_dtype) + if len(size_per_group) < 500: + # Empirically shuffling with cupy is faster at this scale + rs = cp.random.get_random_state() + rs.seed(seed=random_state) + for off, size in zip(group_offsets, size_per_group): + rs.shuffle(indices[off : off + size]) + else: + rng = cp.random.default_rng(seed=random_state) + (indices,) = segmented_sort_by_key( + [as_column(indices)], + [as_column(rng.random(size=nrows))], + as_column(group_offsets), + [], + [], + ) + indices = cp.asarray(indices.data_array_view(mode="read")) + # Which indices are we going to want? + want = np.arange(samples_per_group.sum(), dtype=size_type_dtype) + scan = np.empty_like(samples_per_group) + scan[0] = 0 + np.cumsum(samples_per_group[:-1], out=scan[1:]) + want += np.repeat(group_offsets[:-1] - scan, samples_per_group) + indices = indices[want] + return group_values.iloc[indices] + def serialize(self): header = {} frames = [] diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 35a01b81042..1b86c68b582 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -1,8 +1,10 @@ # Copyright (c) 2018-2023, NVIDIA CORPORATION. +import collections import datetime import itertools import operator +import string import textwrap from decimal import Decimal @@ -2962,6 +2964,91 @@ def test_groupby_dtypes(groups): assert_eq(pdf.groupby(groups).dtypes, df.groupby(groups).dtypes) +class TestSample: + @pytest.fixture(params=["default", "rangeindex", "intindex", "strindex"]) + def index(self, request): + n = 12 + if request.param == "rangeindex": + return cudf.RangeIndex(2, n + 2) + elif request.param == "intindex": + return cudf.Index( + [2, 3, 4, 1, 0, 5, 6, 8, 7, 9, 10, 13], dtype="int32" + ) + elif request.param == "strindex": + return cudf.StringIndex(list(string.ascii_lowercase[:n])) + elif request.param == "default": + return None + + @pytest.fixture( + params=[ + ["a", "a", "b", "b", "c", "c", "c", "d", "d", "d", "d", "d"], + [1, 1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 4], + ], + ids=["str-group", "int-group"], + ) + def df(self, index, request): + return cudf.DataFrame( + {"a": request.param, "b": request.param, "v": request.param}, + index=index, + ) + + @pytest.fixture(params=["a", ["a", "b"]], ids=["single-col", "two-col"]) + def by(self, request): + return request.param + + def expected(self, df, *, n=None, frac=None): + value_counts = collections.Counter(df.a.values_host) + if n is not None: + values = list( + itertools.chain.from_iterable( + itertools.repeat(v, n) for v in value_counts.keys() + ) + ) + elif frac is not None: + values = list( + itertools.chain.from_iterable( + itertools.repeat(v, round(count * frac)) + for v, count in value_counts.items() + ) + ) + else: + raise ValueError("Must provide either n or frac") + values = cudf.Series(sorted(values), dtype=df.a.dtype) + return cudf.DataFrame({"a": values, "b": values, "v": values}) + + @pytest.mark.parametrize("n", [None, 0, 1, 2]) + def test_constant_n_no_replace(self, df, by, n): + result = df.groupby(by).sample(n=n).sort_values("a") + n = 1 if n is None else n + assert_eq(self.expected(df, n=n), result.reset_index(drop=True)) + + def test_constant_n_no_replace_too_large_raises(self, df): + with pytest.raises(ValueError): + df.groupby("a").sample(n=3) + + @pytest.mark.parametrize("n", [1, 2, 3]) + def test_constant_n_replace(self, df, by, n): + result = df.groupby(by).sample(n=n, replace=True).sort_values("a") + assert_eq(self.expected(df, n=n), result.reset_index(drop=True)) + + def test_invalid_arguments(self, df): + with pytest.raises(ValueError): + df.groupby("a").sample(n=1, frac=0.1) + + def test_not_implemented_arguments(self, df): + with pytest.raises(NotImplementedError): + # These are valid weights, but we don't implement this yet. + df.groupby("a").sample(n=1, weights=[1 / len(df)] * len(df)) + + @pytest.mark.parametrize("frac", [0, 1 / 3, 1 / 2, 2 / 3, 1]) + @pytest.mark.parametrize("replace", [False, True]) + def test_fraction_rounding(self, df, by, frac, replace): + result = ( + df.groupby(by).sample(frac=frac, replace=replace).sort_values("a") + ) + assert_eq(self.expected(df, frac=frac), result.reset_index(drop=True)) + + class TestHeadTail: @pytest.fixture(params=[-3, -2, -1, 0, 1, 2, 3], ids=lambda n: f"{n=}") def n(self, request):