From 7456690220eb1677aaf0aada0afa61a0307331d9 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 23 Mar 2023 16:50:31 +0000 Subject: [PATCH] Implement `groupby.sample` (#12882) To do so, obtain the group offsets and values (and hence index). Sample within each group, and then pull out rows from the original object. The fastest way to do this in Python is via the builtin random library, since neither numpy nor cupy offer a broadcasted/ufunc random.sample, and looping over the groups is very slow using either of them. Looping over the groups and using python random.sample is also slow, but less so. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/12882 --- python/cudf/benchmarks/API/bench_dataframe.py | 26 +++- python/cudf/cudf/_lib/cpp/sorting.pxd | 9 +- python/cudf/cudf/_lib/sort.pyx | 67 ++++++++- python/cudf/cudf/core/groupby/groupby.py | 136 +++++++++++++++++- python/cudf/cudf/tests/test_groupby.py | 87 +++++++++++ 5 files changed, 318 insertions(+), 7 deletions(-) diff --git a/python/cudf/benchmarks/API/bench_dataframe.py b/python/cudf/benchmarks/API/bench_dataframe.py index 42bfa854396..28777b23583 100644 --- a/python/cudf/benchmarks/API/bench_dataframe.py +++ b/python/cudf/benchmarks/API/bench_dataframe.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. """Benchmarks of DataFrame methods.""" @@ -104,6 +104,30 @@ def bench_groupby_agg(benchmark, dataframe, agg, num_key_cols, as_index, sort): benchmark(dataframe.groupby(by=by, as_index=as_index, sort=sort).agg, agg) +@benchmark_with_object(cls="dataframe", dtype="int", nulls=False, cols=6) +@pytest.mark.parametrize( + "num_key_cols", + [2, 3, 4], +) +@pytest.mark.parametrize("use_frac", [True, False]) +@pytest.mark.parametrize("replace", [True, False]) +@pytest.mark.parametrize("target_sample_frac", [0.1, 0.5, 1]) +def bench_groupby_sample( + benchmark, dataframe, num_key_cols, use_frac, replace, target_sample_frac +): + grouper = dataframe.groupby(by=list(dataframe.columns[:num_key_cols])) + if use_frac: + kwargs = {"frac": target_sample_frac, "replace": replace} + else: + minsize = grouper.size().min() + target_size = numpy.round( + target_sample_frac * minsize, decimals=0 + ).astype(int) + kwargs = {"n": target_size, "replace": replace} + + benchmark(grouper.sample, **kwargs) + + @benchmark_with_object(cls="dataframe", dtype="int") @pytest.mark.parametrize("num_cols_to_sort", [1]) def bench_sort_values(benchmark, dataframe, num_cols_to_sort): diff --git a/python/cudf/cudf/_lib/cpp/sorting.pxd b/python/cudf/cudf/_lib/cpp/sorting.pxd index c6c42c327ac..b210ddf81dd 100644 --- a/python/cudf/cudf/_lib/cpp/sorting.pxd +++ b/python/cudf/cudf/_lib/cpp/sorting.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. from libcpp cimport bool from libcpp.memory cimport unique_ptr @@ -38,3 +38,10 @@ cdef extern from "cudf/sorting.hpp" namespace "cudf" nogil: const table_view& table, vector[libcudf_types.order] column_order, vector[libcudf_types.null_order] null_precedence) except + + + cdef unique_ptr[table] segmented_sort_by_key( + const table_view& values, + const table_view& keys, + const column_view& segment_offsets, + vector[libcudf_types.order] column_order, + vector[libcudf_types.null_order] null_precedence) except + diff --git a/python/cudf/cudf/_lib/sort.pyx b/python/cudf/cudf/_lib/sort.pyx index 3b96cc618dd..3c3f8cabda6 100644 --- a/python/cudf/cudf/_lib/sort.pyx +++ b/python/cudf/cudf/_lib/sort.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. from cudf.core.buffer import acquire_spill_lock @@ -18,11 +18,13 @@ from cudf._lib.cpp.search cimport lower_bound, upper_bound from cudf._lib.cpp.sorting cimport ( is_sorted as cpp_is_sorted, rank, + segmented_sort_by_key as cpp_segmented_sort_by_key, sorted_order, ) +from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport null_order, null_policy, order -from cudf._lib.utils cimport table_view_from_columns +from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns @acquire_spill_lock() @@ -143,6 +145,67 @@ def order_by(list columns_from_table, object ascending, str na_position): return Column.from_unique_ptr(move(c_result)) +def segmented_sort_by_key( + list values, + list keys, + Column segment_offsets, + list column_order=None, + list null_precedence=None, +): + """ + Sort segments of a table by given keys + + Parameters + ---------- + values : list[Column] + Columns of the table which will be sorted + keys : list[Column] + Columns making up the sort key + offsets : Column + Segment offsets + column_order : list[bool], optional + Sequence of boolean values which correspond to each column in + keys providing the sort order (default all True). + With True <=> ascending; False <=> descending. + null_precedence : list[str], optional + Sequence of "first" or "last" values (default "first") + indicating the position of null values when sorting the keys. + + Returns + ------- + list[Column] + list of value columns sorted by keys + """ + cdef table_view values_view = table_view_from_columns(values) + cdef table_view keys_view = table_view_from_columns(keys) + cdef column_view offsets_view = segment_offsets.view() + cdef vector[order] c_column_order + cdef vector[null_order] c_null_precedence + cdef unique_ptr[table] result + ncol = len(values) + column_order = column_order or [True] * ncol + null_precedence = null_precedence or ["first"] * ncol + for asc, null in zip(column_order, null_precedence): + c_column_order.push_back(order.ASCENDING if asc else order.DESCENDING) + if asc ^ (null == "first"): + c_null_precedence.push_back(null_order.AFTER) + elif asc ^ (null == "last"): + c_null_precedence.push_back(null_order.BEFORE) + else: + raise ValueError(f"Invalid null precedence {null}") + with nogil: + result = move( + cpp_segmented_sort_by_key( + values_view, + keys_view, + offsets_view, + c_column_order, + c_null_precedence, + ) + ) + return columns_from_unique_ptr(move(result)) + + @acquire_spill_lock() def digitize(list source_columns, list bins, bool right=False): """ diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 0e671cb6412..1ab1da92bc7 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -6,7 +6,7 @@ import warnings from collections import abc from functools import cached_property -from typing import Any, Iterable, List, Tuple, Union +from typing import Any, Iterable, List, Optional, Tuple, Union import cupy as cp import numpy as np @@ -16,6 +16,8 @@ from cudf._lib import groupby as libgroupby from cudf._lib.null_mask import bitmask_or from cudf._lib.reshape import interleave_columns +from cudf._lib.sort import segmented_sort_by_key +from cudf._lib.types import size_type_dtype from cudf._typing import AggType, DataFrameOrSeries, MultiColumnAggType from cudf.api.types import is_list_like from cudf.core.abc import Serializable @@ -637,7 +639,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): # aggregation scheme in libcudf. This is probably "fast # enough" for most reasonable input sizes. _, offsets, _, group_values = self._grouped() - group_offsets = np.asarray(offsets, dtype=np.int32) + group_offsets = np.asarray(offsets, dtype=size_type_dtype) size_per_group = np.diff(group_offsets) # "Out of bounds" n for the group size either means no entries # (negative) or all the entries (positive) @@ -651,7 +653,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): group_offsets = group_offsets[:-1] else: group_offsets = group_offsets[1:] - size_per_group - to_take = np.arange(size_per_group.sum(), dtype=np.int32) + to_take = np.arange(size_per_group.sum(), dtype=size_type_dtype) fixup = np.empty_like(size_per_group) fixup[0] = 0 np.cumsum(size_per_group[:-1], out=fixup[1:]) @@ -870,6 +872,134 @@ def ngroup(self, ascending=True): group_ids._index = index return self._broadcast(group_ids) + def sample( + self, + n: Optional[int] = None, + frac: Optional[float] = None, + replace: bool = False, + weights: Union[abc.Sequence, "cudf.Series", None] = None, + random_state: Union[np.random.RandomState, int, None] = None, + ): + """Return a random sample of items in each group. + + Parameters + ---------- + n + Number of items to return for each group, if sampling + without replacement must be at most the size of the + smallest group. Cannot be used with frac. Default is + ``n=1`` if frac is None. + frac + Fraction of items to return. Cannot be used with n. + replace + Should sampling occur with or without replacement? + weights + Sampling probability for each element. Must be the same + length as the grouped frame. Not currently supported. + random_state + Seed for random number generation. + + Returns + ------- + New dataframe or series with samples of appropriate size drawn + from each group. + + """ + if weights is not None: + # To implement this case again needs different algorithms + # in both cases. + # + # Without replacement, use the weighted reservoir sampling + # approach of Efraimidas and Spirakis (2006) + # https://doi.org/10.1016/j.ipl.2005.11.003, essentially, + # do a segmented argsort sorting on weight-scaled + # logarithmic deviates. See + # https://timvieira.github.io/blog/post/ + # 2019/09/16/algorithms-for-sampling-without-replacement/ + # + # With replacement is trickier, one might be able to use + # the alias method, otherwise we're back to bucketed + # rejection sampling. + raise NotImplementedError("Sampling with weights is not supported") + if frac is not None and n is not None: + raise ValueError("Cannot supply both of frac and n") + elif n is None and frac is None: + n = 1 + elif frac is not None and not (0 <= frac <= 1): + raise ValueError( + "Sampling with fraction must provide fraction in " + f"[0, 1], got {frac=}" + ) + # TODO: handle random states properly. + if random_state is not None and not isinstance(random_state, int): + raise NotImplementedError( + "Only integer seeds are supported for random_state " + "in this case" + ) + # Get the groups + # TODO: convince Cython to convert the std::vector offsets + # into a numpy array directly, rather than a list. + # TODO: this uses the sort-based groupby, could one use hash-based? + _, offsets, _, group_values = self._grouped() + group_offsets = np.asarray(offsets, dtype=size_type_dtype) + size_per_group = np.diff(group_offsets) + if n is not None: + samples_per_group = np.broadcast_to( + size_type_dtype.type(n), size_per_group.shape + ) + if not replace and (minsize := size_per_group.min()) < n: + raise ValueError( + f"Cannot sample {n=} without replacement, " + f"smallest group is {minsize}" + ) + else: + # Pandas uses round-to-nearest, ties to even to + # pick sample sizes for the fractional case (unlike IEEE + # which is round-to-nearest, ties to sgn(x) * inf). + samples_per_group = np.round( + size_per_group * frac, decimals=0 + ).astype(size_type_dtype) + if replace: + # We would prefer to use cupy here, but their rng.integers + # interface doesn't take array-based low and high + # arguments. + low = 0 + high = np.repeat(size_per_group, samples_per_group) + rng = np.random.default_rng(seed=random_state) + indices = rng.integers(low, high, dtype=size_type_dtype) + indices += np.repeat(group_offsets[:-1], samples_per_group) + else: + # Approach: do a segmented argsort of the index array and take + # the first samples_per_group entries from sorted array. + # We will shuffle the group indices and then pick them out + # from the grouped dataframe index. + nrows = len(group_values) + indices = cp.arange(nrows, dtype=size_type_dtype) + if len(size_per_group) < 500: + # Empirically shuffling with cupy is faster at this scale + rs = cp.random.get_random_state() + rs.seed(seed=random_state) + for off, size in zip(group_offsets, size_per_group): + rs.shuffle(indices[off : off + size]) + else: + rng = cp.random.default_rng(seed=random_state) + (indices,) = segmented_sort_by_key( + [as_column(indices)], + [as_column(rng.random(size=nrows))], + as_column(group_offsets), + [], + [], + ) + indices = cp.asarray(indices.data_array_view(mode="read")) + # Which indices are we going to want? + want = np.arange(samples_per_group.sum(), dtype=size_type_dtype) + scan = np.empty_like(samples_per_group) + scan[0] = 0 + np.cumsum(samples_per_group[:-1], out=scan[1:]) + want += np.repeat(group_offsets[:-1] - scan, samples_per_group) + indices = indices[want] + return group_values.iloc[indices] + def serialize(self): header = {} frames = [] diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 35a01b81042..1b86c68b582 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -1,8 +1,10 @@ # Copyright (c) 2018-2023, NVIDIA CORPORATION. +import collections import datetime import itertools import operator +import string import textwrap from decimal import Decimal @@ -2962,6 +2964,91 @@ def test_groupby_dtypes(groups): assert_eq(pdf.groupby(groups).dtypes, df.groupby(groups).dtypes) +class TestSample: + @pytest.fixture(params=["default", "rangeindex", "intindex", "strindex"]) + def index(self, request): + n = 12 + if request.param == "rangeindex": + return cudf.RangeIndex(2, n + 2) + elif request.param == "intindex": + return cudf.Index( + [2, 3, 4, 1, 0, 5, 6, 8, 7, 9, 10, 13], dtype="int32" + ) + elif request.param == "strindex": + return cudf.StringIndex(list(string.ascii_lowercase[:n])) + elif request.param == "default": + return None + + @pytest.fixture( + params=[ + ["a", "a", "b", "b", "c", "c", "c", "d", "d", "d", "d", "d"], + [1, 1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 4], + ], + ids=["str-group", "int-group"], + ) + def df(self, index, request): + return cudf.DataFrame( + {"a": request.param, "b": request.param, "v": request.param}, + index=index, + ) + + @pytest.fixture(params=["a", ["a", "b"]], ids=["single-col", "two-col"]) + def by(self, request): + return request.param + + def expected(self, df, *, n=None, frac=None): + value_counts = collections.Counter(df.a.values_host) + if n is not None: + values = list( + itertools.chain.from_iterable( + itertools.repeat(v, n) for v in value_counts.keys() + ) + ) + elif frac is not None: + values = list( + itertools.chain.from_iterable( + itertools.repeat(v, round(count * frac)) + for v, count in value_counts.items() + ) + ) + else: + raise ValueError("Must provide either n or frac") + values = cudf.Series(sorted(values), dtype=df.a.dtype) + return cudf.DataFrame({"a": values, "b": values, "v": values}) + + @pytest.mark.parametrize("n", [None, 0, 1, 2]) + def test_constant_n_no_replace(self, df, by, n): + result = df.groupby(by).sample(n=n).sort_values("a") + n = 1 if n is None else n + assert_eq(self.expected(df, n=n), result.reset_index(drop=True)) + + def test_constant_n_no_replace_too_large_raises(self, df): + with pytest.raises(ValueError): + df.groupby("a").sample(n=3) + + @pytest.mark.parametrize("n", [1, 2, 3]) + def test_constant_n_replace(self, df, by, n): + result = df.groupby(by).sample(n=n, replace=True).sort_values("a") + assert_eq(self.expected(df, n=n), result.reset_index(drop=True)) + + def test_invalid_arguments(self, df): + with pytest.raises(ValueError): + df.groupby("a").sample(n=1, frac=0.1) + + def test_not_implemented_arguments(self, df): + with pytest.raises(NotImplementedError): + # These are valid weights, but we don't implement this yet. + df.groupby("a").sample(n=1, weights=[1 / len(df)] * len(df)) + + @pytest.mark.parametrize("frac", [0, 1 / 3, 1 / 2, 2 / 3, 1]) + @pytest.mark.parametrize("replace", [False, True]) + def test_fraction_rounding(self, df, by, frac, replace): + result = ( + df.groupby(by).sample(frac=frac, replace=replace).sort_values("a") + ) + assert_eq(self.expected(df, frac=frac), result.reset_index(drop=True)) + + class TestHeadTail: @pytest.fixture(params=[-3, -2, -1, 0, 1, 2, 3], ids=lambda n: f"{n=}") def n(self, request):