Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Implement groupby.sample #12882

Merged
merged 21 commits into from
Mar 23, 2023

Conversation

wence-
Copy link
Contributor

@wence- wence- commented Mar 6, 2023

Description

To do so, obtain the group offsets and values (and hence index). Sample within each group, and then pull out rows from the original object.

The fastest way to do this in Python is via the builtin random library, since neither numpy nor cupy offer a broadcasted/ufunc random.sample, and looping over the groups is very slow using either of them. Looping over the groups and using python random.sample is also slow, but less so.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@github-actions github-actions bot added the Python Affects Python cuDF API. label Mar 6, 2023
@wence- wence- self-assigned this Mar 6, 2023
@wence- wence- added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Mar 6, 2023
@wence-
Copy link
Contributor Author

wence- commented Mar 6, 2023

We needed groupby-sample for some work, this was the fastest implementation I could come up with that didn't involve writing specific C++. This is only a partial implementation of the full pandas API, and I am using the Python random module (since both cupy and numpy are horribly slow when taking samples: there is no ufunc/vectorised version of xxx.random.choice).

Is this is a reasonable approach? Or should I come up with some way of doing the sample all-at-once on device?

Trivial benchmark:

import cudf
import cupy as cp
df = cudf.DataFrame({"a": cp.random.randint(0, 1000, size=10_000_000), "b": cp.arange(10_000_000)})
pdf = df.to_pandas()
%time pdf.groupby("a").sample(2) # => 1.2s
%time df.groupby("a").sample(2) # => 40ms

Bumping to 100_000 groups, pandas takes 4s, this approach takes 260ms.

To do so, obtain the group offsets and values (and hence index).
Sample within each group, and then pull out rows from the original
object.

The fastest way to do this in Python is via the builtin random
library, since neither numpy nor cupy offer a broadcasted/ufunc
random.sample, and looping over the groups is very slow using either
of them. Looping over the groups and using python random.sample is
also slow, but less so.
@wence- wence- force-pushed the wence/fea/groupby-sample branch from 1774fa7 to 008cfe1 Compare March 6, 2023 12:02
@shwina
Copy link
Contributor

shwina commented Mar 6, 2023

A while ago I prototyped a version of group_sample that perhaps you could salvage something from?

https://gist.github.com/shwina/ac4cdffe00ce341a65e5e27b78d0b2a0

(feedback welcome -- I'm not sure if I might have overlooked something)

@wence-
Copy link
Contributor Author

wence- commented Mar 7, 2023

A while ago I prototyped a version of group_sample that perhaps you could salvage something from?

https://gist.github.com/shwina/ac4cdffe00ce341a65e5e27b78d0b2a0

(feedback welcome -- I'm not sure if I might have overlooked something)

To inline that code for discussion

def groupby_sample(df, by, n=2):
    df = df.sample(frac=1).reset_index(drop=True) # randomise order of dataframe
    return df.loc[df.groupby(by)[by].rank("first") <= 2, :]

With some futzing around groupby.rank not working for non-integer data which I've elided (which makes sense because some dtypes might not admit a total order).

This works because for replace=False with weights is None and frac is None, df.sample commutes with df.groupby.rank.

As soon as we introduce keyword arguments into the mix (and to be fair, I didn't get round to implementing those in this PR), the commutator is non-zero, and so we need an alternate approach (e.g. groupby.sample(replace=True) samples with replacement within each group, whereas df.sample(replace=True) would sample with replacement from the whole dataframe).

@shwina
Copy link
Contributor

shwina commented Mar 7, 2023

I agree -- perhaps we could use the groupby.rank() based approach for the simplest case (no kwargs), and iterate over the groups for the more complex cases?

@wence- wence- marked this pull request as ready for review March 9, 2023 18:57
@wence- wence- requested a review from a team as a code owner March 9, 2023 18:57
@wence-
Copy link
Contributor Author

wence- commented Mar 9, 2023

Benchmarks incoming.

@bdice suggested a way to squash one more slow path, but I worry that it might suffer from unfair sampling:

import numpy as np

group_sizes = np.array([5, 3, 4])
samples_from_each_group = np.array([2, 3, 0])

total_samples = np.sum(samples_from_each_group)

group_sample_modulus = np.repeat(group_sizes, samples_from_each_group)

# Technically we probably need a range [0, np.lcm.reduce(group_sizes)) to
# ensure fair sampling over the range?
rng = np.random.default_rng(12345)
rints = rng.integers(low=np.iinfo(np.int_).min, high=np.iinfo(np.int_).max, size=total_samples)

group_sample_ids = rints % group_sample_modulus

So I haven't implemented this one.

The general approach is ready to look at though.

@wence-
Copy link
Contributor Author

wence- commented Mar 9, 2023

.head/.tail are pretty straightforward as well. One just concats indices from slices offset by the offset array that comes back from groupby._grouped()

@bdice
Copy link
Contributor

bdice commented Mar 9, 2023

@wence- If you can do a segmented argsort, I may have a solution for sampling without replacement (SWOR). Based on https://timvieira.github.io/blog/post/2019/09/16/algorithms-for-sampling-without-replacement/

edit: ooh, maybe not. The analytical probabilities of SWOR may be different here because the sampling should be with respect to each group. I retract this idea. A proper expression is probably derivable from similar reasoning to that presented in the article, though.

edit 2: On second thought, maybe this is still legitimate? The probabilities don't depend on group size, and doing this for each segment individually should be equivalent to doing all segments together, so long as you use the segmented argsort correctly.


Use a modified swor_exp from the linked article, like this:

import numpy as np

def swor_exp_segmented(total_samples, segment_indices):
    E = -np.log(np.random.uniform(0,1,size=total_samples))
    # You can implement segmented_argsort using CUB. There are probably primitives in cupy/cudf that can do this?
    return segmented_argsort(E, segment_indices)

This gives you a randomly sorted list of indices within each segment. Then you can use a gather to fetch the desired number of indices from each segment, and then use that result as a gather map of indices to fetch from the original groupings.

@wence-
Copy link
Contributor Author

wence- commented Mar 9, 2023

@wence- If you can do a segmented argsort, I may have a solution for sampling without replacement (SWOR). Based on https://timvieira.github.io/blog/post/2019/09/16/algorithms-for-sampling-without-replacement/

edit: ooh, maybe not. The analytical probabilities of SWOR may be different here because the sampling should be with respect to each group. I retract this idea. A proper expression is probably derivable from similar reasoning to that presented in the article, though.

edit 2: On second thought, maybe this is still legitimate? The probabilities don't depend on group size, and doing this for each segment individually should be equivalent to doing all segments together, so long as you use the segmented argsort correctly.

Use a modified swor_exp from the linked article, like this:

import numpy as np

def swor_exp_segmented(total_samples, segment_indices):
    E = -np.log(np.random.uniform(0,1,size=total_samples))
    # You can implement segmented_argsort using CUB. There are probably primitives in cupy/cudf that can do this?
    return segmented_argsort(E, segment_indices)

This gives you a randomly sorted list of indices within each segment. Then you can use a gather to fetch the desired number of indices from each segment, and then use that result as a gather map of indices to fetch from the original groupings.

Oh that's a cute trick!

@bdice
Copy link
Contributor

bdice commented Mar 10, 2023

In fact, you don’t even need the “log” bit. Assuming all the sample elements have equal weight, any random permutation of the group indices will be equally likely.

@wence-
Copy link
Contributor Author

wence- commented Mar 10, 2023

In fact, you don’t even need the “log” bit. Assuming all the sample elements have equal weight, any random permutation of the group indices will be equally likely.

Yeah, so that post is fast ways of sampling from the categorical distribution (where you have specified weights for each element of the group, and all elements are unique). That will be useful (since groupby.sample with weights is something that is in the pandas API, though I am punting on it in this PR), though here we have a slightly simpler problem, we have a group of size $n$ which we can treat as $n$ categories, each with probably $\frac{1}{n}$. So then, if you want a sample of size $k$, you can just take a permutation of the group and select the first $k$ entries.

Indeed, this is exactly the implementation used in the PR, for the case of groupby.sample(n=k, replace=False): permute the whole data frame, group and take the first k from each group. This could be extended to the groupby.sample(frac=0.2, replace=False) case if there's a good way to push the sample sizes into groupby.rank(...). The downside to this implementation is that the memory footprint is 3x the input dataframe (rather than only 2x when you group and then select).

I'll noodle through a few more ideas...

wence- added 2 commits March 10, 2023 13:00
Also remove large memory footprint for sampling without replacement.
@bdice
Copy link
Contributor

bdice commented Mar 10, 2023

The downside to this implementation is that the memory footprint is 3x the input dataframe (rather than only 2x when you group and then select).

It’s not so bad! You’re only dealing with integer columns of indices for gathering, not full dataframes (all columns) as far as I can tell. Usually I’d say that adding 2-3 columns of integers with the same number of rows as the dataframe is an acceptable memory cost. It is hefty for single-column dataframes but not a big deal for 10 column dataframes, as a fraction of the dataframe memory. Are there copies of the full dataframe that I’m not considering ?

Copy link
Contributor Author

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some notes.

python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/tests/test_groupby.py Show resolved Hide resolved
python/cudf/cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/tests/test_groupby.py Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
wence- added 2 commits March 13, 2023 13:07
Also speed up index masking and add pointers to implementation ideas
for weighted sampling.
@wence-
Copy link
Contributor Author

wence- commented Mar 13, 2023

I did some basic benchmarking of this code.

def create_df(n, nunique):
    return cudf.DataFrame(
        {"a": cp.random.randint(0, nunique, size=n), "b": cp.arange(n)}
    )


def sample_n(df, n, replace):
    start = time.time()
    _ = df.groupby("a").sample(n=n, replace=replace)
    end = time.time()
    return end - start


def sample_frac(df, frac, replace):
    start = time.time()
    _ = df.groupby("a").sample(frac=frac, replace=replace)
    end = time.time()
    return end - start


def gather_results():
    data = []
    for n in [100_000, 1_000_000, 10_000_000, 100_000_000]:
        print(f"DataFrame size {n=}")
        nuniques = [10, 100, 1000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000]
        nuniques = nuniques[: nuniques.index(n) - 1]
        for nunique in nuniques:
            df = create_df(n, nunique)
            for pandas in [False, True]:
                if pandas:
                    idf = df.to_pandas()
                else:
                    idf = df
                for replace in [False, True]:
                    nsamp = 5
                    for nsamp in [1, 2, 10, 50]:
                        frac = nunique * nsamp / n
                        t = sample_n(idf, nsamp, replace)
                        data.append(
                            (
                                n,
                                nunique,
                                nsamp,
                                frac,
                                replace,
                                False,
                                "pandas" if pandas else "cudf",
                                t,
                            )
                        )
                        gc.collect()
                        t = sample_frac(idf, frac, replace)
                        data.append(
                            (
                                n,
                                nunique,
                                nsamp,
                                frac,
                                replace,
                                True,
                                "pandas" if pandas else "cudf",
                                t,
                            )
                        )
                        gc.collect()
    return pd.DataFrame(
        data,
        columns=[
            "df_size",
            "unique_vals",
            "sample_size",
            "sample_frac",
            "replace",
            "use_frac",
            "backend",
            "time",
        ],
    )

Run on an A6000 (cudf) and an intel xeon gold 6226R CPU (pandas).

Parquet data for further analysis attached (groupby-sample-benchmarks.zip), but I also upload some faceted-plots (log-scale on the timings axis).

tl;dr: For a dataframe with $10^8$ rows and $10^6$ groups (each around size 100), sampling without replacement groups of size 50 takes ~0.9s, sampling with replacement takes ~1.3s (pandas 50-55s for the same operation).

With replacement sampling is slower because there isn't a fast cupy way, AFAICT, to generate the random numbers which takes about a second of the run. Without replacement sampling could be faster if cupy had a batched/segmented shuffle.

In contrast, if one does the same thing but only selects a single value from each group, then the cudf implementation takes between 0.25 and 0.5s.

replace=False, use_frac=False
no-replace-no-use-frac

replace=False, use_frac=True
no-replace-use-frac

replace=True, use_frac=False
replace-no-use-frac

replace=True, use_frac=True
replace-use-frac

@wence- wence- changed the title [RFC] Implement sketch of groupby.sample [ENH] Implement groupby.sample Mar 13, 2023
@wence-
Copy link
Contributor Author

wence- commented Mar 21, 2023

Fixed up the merge conflicts, so I think this is good for a final look.

Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job! A few comments.

Comment on lines 128 to 131
def _():
return grouper.sample(**kwargs)

benchmark(_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Suggested change
def _():
return grouper.sample(**kwargs)
benchmark(_)
benchmark(grouper.sample, **kwargs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, probably yes...

Comment on lines 189 to 192
if asc:
c_column_order.push_back(order.ASCENDING)
else:
c_column_order.push_back(order.DESCENDING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if asc:
c_column_order.push_back(order.ASCENDING)
else:
c_column_order.push_back(order.DESCENDING)
c_column_order.push_back(order.ASCENDING if asc else order.DESCENDING)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was just copying the previous order_by implementation, but done.

python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
python/cudf/cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
Copy link
Contributor Author

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I think everything is addressed.

Comment on lines 128 to 131
def _():
return grouper.sample(**kwargs)

benchmark(_)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, probably yes...

python/cudf/cudf/tests/test_groupby.py Outdated Show resolved Hide resolved
Comment on lines 189 to 192
if asc:
c_column_order.push_back(order.ASCENDING)
else:
c_column_order.push_back(order.DESCENDING)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was just copying the previous order_by implementation, but done.

python/cudf/cudf/core/groupby/groupby.py Show resolved Hide resolved
python/cudf/cudf/core/groupby/groupby.py Outdated Show resolved Hide resolved
@wence- wence- force-pushed the wence/fea/groupby-sample branch from 12ff2cd to 558541d Compare March 22, 2023 17:05
@wence-
Copy link
Contributor Author

wence- commented Mar 22, 2023

/merge

@rapids-bot rapids-bot bot merged commit 7456690 into rapidsai:branch-23.04 Mar 23, 2023
@wence- wence- deleted the wence/fea/groupby-sample branch March 23, 2023 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants