Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand JIT groupby test suite #13813

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ def _iterative_groupby_apply(
chunks = [
grouped_values[s:e] for s, e in zip(offsets[:-1], offsets[1:])
]
breakpoint()
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
chunk_results = [function(chk, *args) for chk in chunks]
return self._post_process_chunk_results(
chunk_results, group_names, group_keys, grouped_values
Expand Down
257 changes: 197 additions & 60 deletions python/cudf/cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,19 +381,76 @@ def emulate(df):


@pytest.fixture(scope="module")
def groupby_jit_data():
def groupby_jit_data_small():
"""
Small dataset for testing groupby apply with jit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write this comment with complete sentences / capitalization? It also feels like it's wrapped pretty harshly, like 50-60 chars / line? Same request for other comments below.

returns a dataframe whose keys columns define
4 groups of size 1, 2, 3, 4 as well as an additional
key column that can be used to test subgroups

useful for very basic testing of result values

"""
np.random.seed(0)
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
df = DataFrame()
nelem = 20
df["key1"] = np.random.randint(0, 3, nelem)
df["key2"] = np.random.randint(0, 2, nelem)
df["val1"] = np.random.random(nelem)
df["val2"] = np.random.random(nelem)
df["val3"] = np.random.randint(0, 10, nelem)
df["val4"] = np.random.randint(0, 10, nelem)
key1 = (
[1] # group of size 1
+ [2, 2] # group of size 2
+ [3, 3, 3] # group of size 3
+ [4, 4, 4, 4] # group of size 4
)
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
key2 = [1, 2] * 5
df["key1"] = key1
df["key2"] = key2

df["val1"] = np.random.randint(0, 10, len(key1))
df["val2"] = np.random.randint(0, 10, len(key1))

# randomly permute data
df = df.sample(frac=1).reset_index(drop=True)
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
return df


@pytest.fixture(scope="module")
def groupby_jit_data_large(groupby_jit_data_small):
"""
Large dataset for testing groupby apply with jit
useful for validating that block level algorithms
return the correct result for groups larger than
the maximum thread per block size
"""
max_tpb = 1024
factor = (
max_tpb + 1
) # bigger than a block but not always an exact multiple
df = cudf.concat([groupby_jit_data_small] * factor)

return df


@pytest.fixture(scope="module")
def groupby_jit_data_nans(groupby_jit_data_small):
"""
Small dataset containing nans
"""

df = groupby_jit_data_small.sort_values(["key1", "key2"])
df["val1"][::2] = np.nan
df = df.sample(frac=1).reset_index(drop=True)
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
return df


@pytest.fixture(scope="module")
def groupby_jit_datasets(
groupby_jit_data_small, groupby_jit_data_large, groupby_jit_data_nans
):
return {
"small": groupby_jit_data_small,
"large": groupby_jit_data_large,
"nans": groupby_jit_data_nans,
}


def run_groupby_apply_jit_test(data, func, keys, *args):
expect_groupby_obj = data.to_pandas().groupby(keys)
got_groupby_obj = data.groupby(keys)
Expand All @@ -404,15 +461,7 @@ def run_groupby_apply_jit_test(data, func, keys, *args):
assert_groupby_results_equal(cudf_jit_result, pandas_result)


@pytest.mark.parametrize(
"dtype",
SUPPORTED_GROUPBY_NUMPY_TYPES,
ids=[str(t) for t in SUPPORTED_GROUPBY_NUMPY_TYPES],
)
@pytest.mark.parametrize(
"func", ["min", "max", "sum", "mean", "var", "std", "idxmin", "idxmax"]
)
def test_groupby_apply_jit_reductions(func, groupby_jit_data, dtype):
def groupby_apply_jit_reductions_test_inner(func, data, dtype):
# ideally we'd just have:
# lambda group: getattr(group, func)()
# but the current kernel caching mechanism relies on pickle which
Expand All @@ -429,48 +478,92 @@ def func(df):
exec(funcstr, lcl)
func = lcl["func"]

groupby_jit_data["val1"] = groupby_jit_data["val1"].astype(dtype)
groupby_jit_data["val2"] = groupby_jit_data["val2"].astype(dtype)
data["val1"] = data["val1"].astype(dtype)
data["val2"] = data["val2"].astype(dtype)

run_groupby_apply_jit_test(groupby_jit_data, func, ["key1"])
run_groupby_apply_jit_test(data, func, ["key1"])


@pytest.mark.parametrize("dtype", ["int32", "int64"])
def test_groupby_apply_jit_correlation(groupby_jit_data, dtype):
# test unary reductions
@pytest.mark.parametrize(
"dtype",
SUPPORTED_GROUPBY_NUMPY_TYPES,
ids=[str(t) for t in SUPPORTED_GROUPBY_NUMPY_TYPES],
)
@pytest.mark.parametrize(
"func", ["min", "max", "sum", "mean", "var", "std", "idxmin", "idxmax"]
)
@pytest.mark.parametrize("dataset", ["small", "large", "nans"])
def test_groupby_apply_jit_unary_reductions(
func, dtype, dataset, groupby_jit_datasets
):
dataset = groupby_jit_datasets[dataset]

groupby_jit_data["val3"] = groupby_jit_data["val3"].astype(dtype)
groupby_jit_data["val4"] = groupby_jit_data["val4"].astype(dtype)
if func in {"sum", "mean", "var", "std"} and dtype == "int32":
with pytest.xfail(
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
reason="https://github.com/rapidsai/cudf/issues/13873"
):
groupby_apply_jit_reductions_test_inner(func, dataset, dtype)
return
else:
groupby_apply_jit_reductions_test_inner(func, dataset, dtype)

keys = ["key1", "key2"]

def func(group):
return group["val3"].corr(group["val4"])
# test unary reductions for special values
def groupby_apply_jit_reductions_special_vals_inner(
func, data, dtype, special_val
):
funcstr = textwrap.dedent(
f"""
def func(df):
return df['val1'].{func}()
"""
)
lcl = {}
exec(funcstr, lcl)
func = lcl["func"]

run_groupby_apply_jit_test(groupby_jit_data, func, keys)
data["val1"] = data["val1"].astype(dtype)
data["val2"] = data["val2"].astype(dtype)
data["val1"] = special_val
data["val2"] = special_val

run_groupby_apply_jit_test(data, func, ["key1"])

@pytest.mark.parametrize("dtype", ["float64"])
@pytest.mark.parametrize("func", ["min", "max", "sum", "mean", "var", "std"])
@pytest.mark.parametrize("special_val", [np.nan, np.inf, -np.inf])
def test_groupby_apply_jit_reductions_special_vals(
func, groupby_jit_data, dtype, special_val

# test unary index reductions for special values
def groupby_apply_jit_idx_reductions_special_vals_inner(
func, data, dtype, special_val
):
# dynamically generate to avoid pickling error.
# see test_groupby_apply_jit_reductions for details.
funcstr = textwrap.dedent(
f"""
def func(df):
return df['val1'].{func}()
def func(df):
return df['val1'].{func}()
"""
)
lcl = {}
exec(funcstr, lcl)
func = lcl["func"]

groupby_jit_data["val1"] = special_val
groupby_jit_data["val1"] = groupby_jit_data["val1"].astype(dtype)
data["val1"] = data["val1"].astype(dtype)
data["val2"] = data["val2"].astype(dtype)
data["val1"] = special_val
data["val2"] = special_val

run_groupby_apply_jit_test(groupby_jit_data, func, ["key1"])
run_groupby_apply_jit_test(data, func, ["key1"])


@pytest.mark.parametrize("dtype", ["float64"])
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.parametrize("func", ["min", "max", "sum", "mean", "var", "std"])
@pytest.mark.parametrize("special_val", [np.nan, np.inf, -np.inf])
@pytest.mark.parametrize("dataset", ["small", "large", "nans"])
def test_groupby_apply_jit_reductions_special_vals(
func, dtype, dataset, groupby_jit_datasets, special_val
):
dataset = groupby_jit_datasets[dataset]
groupby_apply_jit_reductions_special_vals_inner(
func, dataset, dtype, special_val
)


@pytest.mark.parametrize("dtype", ["float64"])
Expand All @@ -488,28 +581,70 @@ def func(df):
-np.inf,
],
)
@pytest.mark.parametrize("dataset", ["small", "large", "nans"])
def test_groupby_apply_jit_idx_reductions_special_vals(
func, groupby_jit_data, dtype, special_val
func, dtype, dataset, groupby_jit_datasets, special_val
):
# dynamically generate to avoid pickling error.
# see test_groupby_apply_jit_reductions for details.
funcstr = textwrap.dedent(
f"""
def func(df):
return df['val1'].{func}()
"""
dataset = groupby_jit_datasets[dataset]
groupby_apply_jit_idx_reductions_special_vals_inner(
func, dataset, dtype, special_val
)
lcl = {}
exec(funcstr, lcl)
func = lcl["func"]

groupby_jit_data["val1"] = special_val
groupby_jit_data["val1"] = groupby_jit_data["val1"].astype(dtype)

expect = groupby_jit_data.to_pandas().groupby("key1").apply(func)
got = groupby_jit_data.groupby("key1").apply(func, engine="jit")
@pytest.mark.parametrize(
"dtype",
[
"int32",
"int64",
pytest.param(
"float32",
marks=pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/13839"
),
),
pytest.param(
"float64",
marks=pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/13839"
),
),
],
)
@pytest.mark.parametrize(
"dataset",
[
pytest.param(
"small",
marks=[
pytest.mark.filterwarnings(
"ignore:Degrees of Freedom <= 0 for slice"
),
pytest.mark.filterwarnings(
"ignore:divide by zero encountered in divide"
),
],
),
pytest.param(
"large",
marks=pytest.mark.xfail(
reason="https://github.com/rapidsai/cudf/issues/13875"
),
),
],
)
def test_groupby_apply_jit_correlation(dataset, groupby_jit_datasets, dtype):

assert_eq(expect, got, check_dtype=False)
dataset = groupby_jit_datasets[dataset]

dataset["val1"] = dataset["val1"].astype(dtype)
dataset["val2"] = dataset["val2"].astype(dtype)

keys = ["key1"]

def func(group):
return group["val1"].corr(group["val2"])

run_groupby_apply_jit_test(dataset, func, keys)


@pytest.mark.parametrize(
Expand All @@ -520,8 +655,8 @@ def func(df):
lambda df: df["val1"].mean() + df["val2"].std(),
],
)
def test_groupby_apply_jit_basic(func, groupby_jit_data):
run_groupby_apply_jit_test(groupby_jit_data, func, ["key1", "key2"])
def test_groupby_apply_jit_basic(func, groupby_jit_data_small):
run_groupby_apply_jit_test(groupby_jit_data_small, func, ["key1", "key2"])


def create_test_groupby_apply_jit_args_params():
Expand All @@ -540,8 +675,10 @@ def f3(df, k, L, m):
@pytest.mark.parametrize(
"func,args", create_test_groupby_apply_jit_args_params()
)
def test_groupby_apply_jit_args(func, args, groupby_jit_data):
run_groupby_apply_jit_test(groupby_jit_data, func, ["key1", "key2"], *args)
def test_groupby_apply_jit_args(func, args, groupby_jit_data_small):
run_groupby_apply_jit_test(
groupby_jit_data_small, func, ["key1", "key2"], *args
)


def test_groupby_apply_jit_block_divergence():
Expand Down