Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically select GroupBy.apply algorithm based on if the UDF is jittable #13113

Merged
4 changes: 4 additions & 0 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ def _dtypes(self):
zip(self._data.names, (col.dtype for col in self._data.columns))
)

@property
def _has_nulls(self):
return any(col.has_nulls() for col in self._data.values())

def serialize(self):
header = {
"type-serialized": pickle.dumps(type(self)),
Expand Down
28 changes: 20 additions & 8 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.mixins import Reducible, Scannable
from cudf.core.multiindex import MultiIndex
from cudf.core.udf.groupby_utils import jit_groupby_apply
from cudf.core.udf.groupby_utils import _can_be_jitted, jit_groupby_apply
from cudf.utils.utils import GetAttrGetItemMixin, _cudf_nvtx_annotate


Expand Down Expand Up @@ -1161,11 +1161,8 @@ def _jit_groupby_apply(
self, function, group_names, offsets, group_keys, grouped_values, *args
):
# Nulls are not yet supported
for colname in self.grouping.values._data.keys():
if self.obj._data[colname].has_nulls():
raise ValueError(
"Nulls not yet supported with groupby JIT engine"
)
if self.grouping._obj._has_nulls:
raise ValueError("Nulls not yet supported with groupby JIT engine")

chunk_results = jit_groupby_apply(
offsets, grouped_values, function, *args
Expand Down Expand Up @@ -1242,7 +1239,7 @@ def _post_process_chunk_results(
return result

@_cudf_nvtx_annotate
def apply(self, function, *args, engine="cudf"):
def apply(self, function, *args, engine="auto"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring needs to be updated to discuss new "auto" option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some docs here.

"""Apply a python transformation function over the grouped chunk.

Parameters
Expand All @@ -1252,7 +1249,7 @@ def apply(self, function, *args, engine="cudf"):
on the grouped chunk.
args : tuple
Optional positional arguments to pass to the function.
engine: {'cudf', 'jit'}, default 'cudf'
engine: {'cudf', 'jit'}, default 'auto'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my knowledge: Is there a performance cost to the fallback? i.e. Does the JIT attempt have measurable overhead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does have measurable overhead. One way of measuring it is with

import cudf
df = cudf.DataFrame({
    'a':[0,1,1],
    'b':[1,2,3]
})

def func(grp):
    # binops can't be jitted without refcounting
    return grp + grp

grouped = df.groupby('a')

import cProfile
cProfile.run('grouped.apply(func)', sort='cumtime')

For this I get

        1    0.000    0.000    0.213    0.213 groupby_utils.py:207(_can_be_jitted)

Meaning it's quite impactful. However if this becomes a problem users should be able to obtain the old behavior by just passing engine='cudf'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full disclosure, we anticipated this and I was OK with it. I think the tradeoff is generally worthwhile. If we think it isn't then I think we'd just stop this work and remove 'auto' altogether.

Also I just noticed that 'auto' is not listed in the set of valid engine arguments here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify — your snippet above is saying the JIT attempt costs ~200ms? That sounds right to me. I would think a cache could also be used here if needed to prevent multiple failed attempts from paying the overhead for the same function.

I am supportive of this change, because when it does pay off, it’s a big win. Just want to make sure we’re putting in the appropriate amount of engineering effort to mitigate the downside risk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I just noticed that 'auto' is not listed in the set of valid engine arguments here.

Fixed this.

wence- marked this conversation as resolved.
Show resolved Hide resolved
Selects the GroupBy.apply implementation. Use `jit` to
select the numba JIT pipeline. Only certain operations are allowed
within the function when using this option: min, max, sum, mean, var,
Expand All @@ -1261,6 +1258,11 @@ def apply(self, function, *args, engine="cudf"):
`df['x'] * 2` is not yet allowed.
For more information, see the `cuDF guide to user defined functions
<https://docs.rapids.ai/api/cudf/stable/user_guide/guide-to-udfs.html>`__.
Use `cudf` to select the iterative groupby apply algorithm which aims
to provide maximum flexibility at the expense of performance.
The default value `auto` will attempt to use the numba JIT pipeline
where possible and will fall back to the iterative algorithm if
necessary.

Examples
--------
Expand Down Expand Up @@ -1334,10 +1336,20 @@ def mult(df):
1 2 1
2 3 1
"""

if self.obj.empty:
return self.obj
if not callable(function):
raise TypeError(f"type {type(function)} is not callable")
group_names, offsets, group_keys, grouped_values = self._grouped()

if engine == "auto":
if (not grouped_values._has_nulls) and _can_be_jitted(
bdice marked this conversation as resolved.
Show resolved Hide resolved
grouped_values, function, args
):
engine = "jit"
else:
engine = "cudf"
if engine == "jit":
result = self._jit_groupby_apply(
function,
Expand Down
24 changes: 24 additions & 0 deletions python/cudf/cudf/core/udf/groupby_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import cupy as cp
import numpy as np
from numba import cuda, types
from numba.core.errors import TypingError
from numba.cuda.cudadrv.devices import get_context
from numba.np import numpy_support

Expand Down Expand Up @@ -201,3 +202,26 @@ def jit_groupby_apply(offsets, grouped_values, function, *args):
specialized[ngroups, tpb](*launch_args)

return output


def _can_be_jitted(frame, func, args):
"""
Determine if this UDF is supported through the JIT engine
by attempting to compile just the function to PTX using the
target set of types
"""
np_field_types = np.dtype(
list(
_supported_dtypes_from_frame(
frame, supported_types=SUPPORTED_GROUPBY_NUMPY_TYPES
).items()
)
)
dataframe_group_type = _get_frame_groupby_type(
np_field_types, frame.index.dtype
)
try:
_get_udf_return_type(dataframe_group_type, func, args)
brandon-b-miller marked this conversation as resolved.
Show resolved Hide resolved
return True
except TypingError:
return False