Skip to content

Commit

Permalink
ENH: numba engine in df.apply (#54666)
Browse files Browse the repository at this point in the history
* ENH: numba engine in df.apply

* fixes

* more fixes

* try to fix

* address code review

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* go for green

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* update type

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
lithomas1 and pre-commit-ci[bot] authored Sep 11, 2023
1 parent aadd9e3 commit ce5fdf0
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 18 deletions.
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v2.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enhancement2

Other enhancements
^^^^^^^^^^^^^^^^^^
-
- DataFrame.apply now allows the usage of numba (via ``engine="numba"``) to JIT compile the passed function, allowing for potential speedups (:issue:`54666`)
-

.. ---------------------------------------------------------------------------
Expand Down
39 changes: 39 additions & 0 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,45 @@
from pandas.compat._optional import import_optional_dependency


@functools.cache
def generate_apply_looper(func, nopython=True, nogil=True, parallel=False):
if TYPE_CHECKING:
import numba
else:
numba = import_optional_dependency("numba")
nb_compat_func = numba.extending.register_jitable(func)

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def nb_looper(values, axis):
# Operate on the first row/col in order to get
# the output shape
if axis == 0:
first_elem = values[:, 0]
dim0 = values.shape[1]
else:
first_elem = values[0]
dim0 = values.shape[0]
res0 = nb_compat_func(first_elem)
# Use np.asarray to get shape for
# https://github.com/numba/numba/issues/4202#issuecomment-1185981507
buf_shape = (dim0,) + np.atleast_1d(np.asarray(res0)).shape
if axis == 0:
buf_shape = buf_shape[::-1]
buff = np.empty(buf_shape)

if axis == 1:
buff[0] = res0
for i in numba.prange(1, values.shape[0]):
buff[i] = nb_compat_func(values[i])
else:
buff[:, 0] = res0
for j in numba.prange(1, values.shape[1]):
buff[:, j] = nb_compat_func(values[:, j])
return buff

return nb_looper


@functools.cache
def make_looper(func, result_dtype, is_grouped_kernel, nopython, nogil, parallel):
if TYPE_CHECKING:
Expand Down
37 changes: 34 additions & 3 deletions pandas/core/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
ABCSeries,
)

from pandas.core._numba.executor import generate_apply_looper
import pandas.core.common as com
from pandas.core.construction import ensure_wrapped_if_datetimelike

Expand Down Expand Up @@ -80,6 +81,8 @@ def frame_apply(
raw: bool = False,
result_type: str | None = None,
by_row: Literal[False, "compat"] = "compat",
engine: str = "python",
engine_kwargs: dict[str, bool] | None = None,
args=None,
kwargs=None,
) -> FrameApply:
Expand All @@ -100,6 +103,8 @@ def frame_apply(
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
Expand Down Expand Up @@ -756,11 +761,15 @@ def __init__(
result_type: str | None,
*,
by_row: Literal[False, "compat"] = False,
engine: str = "python",
engine_kwargs: dict[str, bool] | None = None,
args,
kwargs,
) -> None:
if by_row is not False and by_row != "compat":
raise ValueError(f"by_row={by_row} not allowed")
self.engine = engine
self.engine_kwargs = engine_kwargs
super().__init__(
obj, func, raw, result_type, by_row=by_row, args=args, kwargs=kwargs
)
Expand Down Expand Up @@ -805,6 +814,12 @@ def values(self):

def apply(self) -> DataFrame | Series:
"""compute the results"""

if self.engine == "numba" and not self.raw:
raise ValueError(
"The numba engine in DataFrame.apply can only be used when raw=True"
)

# dispatch to handle list-like or dict-like
if is_list_like(self.func):
return self.apply_list_or_dict_like()
Expand Down Expand Up @@ -834,7 +849,7 @@ def apply(self) -> DataFrame | Series:

# raw
elif self.raw:
return self.apply_raw()
return self.apply_raw(engine=self.engine, engine_kwargs=self.engine_kwargs)

return self.apply_standard()

Expand Down Expand Up @@ -907,7 +922,7 @@ def apply_empty_result(self):
else:
return self.obj.copy()

def apply_raw(self):
def apply_raw(self, engine="python", engine_kwargs=None):
"""apply to the values as a numpy array"""

def wrap_function(func):
Expand All @@ -925,7 +940,23 @@ def wrapper(*args, **kwargs):

return wrapper

result = np.apply_along_axis(wrap_function(self.func), self.axis, self.values)
if engine == "numba":
engine_kwargs = {} if engine_kwargs is None else engine_kwargs

# error: Argument 1 to "__call__" of "_lru_cache_wrapper" has
# incompatible type "Callable[..., Any] | str | list[Callable
# [..., Any] | str] | dict[Hashable,Callable[..., Any] | str |
# list[Callable[..., Any] | str]]"; expected "Hashable"
nb_looper = generate_apply_looper(
self.func, **engine_kwargs # type: ignore[arg-type]
)
result = nb_looper(self.values, self.axis)
# If we made the result 2-D, squeeze it back to 1-D
result = np.squeeze(result)
else:
result = np.apply_along_axis(
wrap_function(self.func), self.axis, self.values
)

# TODO: mixed type case
if result.ndim == 2:
Expand Down
33 changes: 33 additions & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -9925,6 +9925,8 @@ def apply(
result_type: Literal["expand", "reduce", "broadcast"] | None = None,
args=(),
by_row: Literal[False, "compat"] = "compat",
engine: Literal["python", "numba"] = "python",
engine_kwargs: dict[str, bool] | None = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -9984,6 +9986,35 @@ def apply(
If False, the funcs will be passed the whole Series at once.
.. versionadded:: 2.1.0
engine : {'python', 'numba'}, default 'python'
Choose between the python (default) engine or the numba engine in apply.
The numba engine will attempt to JIT compile the passed function,
which may result in speedups for large DataFrames.
It also supports the following engine_kwargs :
- nopython (compile the function in nopython mode)
- nogil (release the GIL inside the JIT compiled function)
- parallel (try to apply the function in parallel over the DataFrame)
Note: The numba compiler only supports a subset of
valid Python/numpy operations.
Please read more about the `supported python features
<https://numba.pydata.org/numba-doc/dev/reference/pysupported.html>`_
and `supported numpy features
<https://numba.pydata.org/numba-doc/dev/reference/numpysupported.html>`_
in numba to learn what you can or cannot use in the passed function.
As of right now, the numba engine can only be used with raw=True.
.. versionadded:: 2.2.0
engine_kwargs : dict
Pass keyword arguments to the engine.
This is currently only used by the numba engine,
see the documentation for the engine argument for more information.
**kwargs
Additional keyword arguments to pass as keywords arguments to
`func`.
Expand Down Expand Up @@ -10084,6 +10115,8 @@ def apply(
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
Expand Down
62 changes: 48 additions & 14 deletions pandas/tests/apply/test_frame_apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
from pandas.tests.frame.common import zip_frames


@pytest.fixture(params=["python", "numba"])
def engine(request):
if request.param == "numba":
pytest.importorskip("numba")
return request.param


def test_apply(float_frame):
with np.errstate(all="ignore"):
# ufunc
Expand Down Expand Up @@ -234,36 +241,42 @@ def test_apply_broadcast_series_lambda_func(int_frame_const_col):


@pytest.mark.parametrize("axis", [0, 1])
def test_apply_raw_float_frame(float_frame, axis):
def test_apply_raw_float_frame(float_frame, axis, engine):
if engine == "numba":
pytest.skip("numba can't handle when UDF returns None.")

def _assert_raw(x):
assert isinstance(x, np.ndarray)
assert x.ndim == 1

float_frame.apply(_assert_raw, axis=axis, raw=True)
float_frame.apply(_assert_raw, axis=axis, engine=engine, raw=True)


@pytest.mark.parametrize("axis", [0, 1])
def test_apply_raw_float_frame_lambda(float_frame, axis):
result = float_frame.apply(np.mean, axis=axis, raw=True)
def test_apply_raw_float_frame_lambda(float_frame, axis, engine):
result = float_frame.apply(np.mean, axis=axis, engine=engine, raw=True)
expected = float_frame.apply(lambda x: x.values.mean(), axis=axis)
tm.assert_series_equal(result, expected)


def test_apply_raw_float_frame_no_reduction(float_frame):
def test_apply_raw_float_frame_no_reduction(float_frame, engine):
# no reduction
result = float_frame.apply(lambda x: x * 2, raw=True)
result = float_frame.apply(lambda x: x * 2, engine=engine, raw=True)
expected = float_frame * 2
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize("axis", [0, 1])
def test_apply_raw_mixed_type_frame(mixed_type_frame, axis):
def test_apply_raw_mixed_type_frame(mixed_type_frame, axis, engine):
if engine == "numba":
pytest.skip("isinstance check doesn't work with numba")

def _assert_raw(x):
assert isinstance(x, np.ndarray)
assert x.ndim == 1

# Mixed dtype (GH-32423)
mixed_type_frame.apply(_assert_raw, axis=axis, raw=True)
mixed_type_frame.apply(_assert_raw, axis=axis, engine=engine, raw=True)


def test_apply_axis1(float_frame):
Expand Down Expand Up @@ -300,14 +313,20 @@ def test_apply_mixed_dtype_corner_indexing():
)
@pytest.mark.parametrize("raw", [True, False])
@pytest.mark.parametrize("axis", [0, 1])
def test_apply_empty_infer_type(ax, func, raw, axis):
def test_apply_empty_infer_type(ax, func, raw, axis, engine, request):
df = DataFrame(**{ax: ["a", "b", "c"]})

with np.errstate(all="ignore"):
test_res = func(np.array([], dtype="f8"))
is_reduction = not isinstance(test_res, np.ndarray)

result = df.apply(func, axis=axis, raw=raw)
if engine == "numba" and raw is False:
mark = pytest.mark.xfail(
reason="numba engine only supports raw=True at the moment"
)
request.node.add_marker(mark)

result = df.apply(func, axis=axis, engine=engine, raw=raw)
if is_reduction:
agg_axis = df._get_agg_axis(axis)
assert isinstance(result, Series)
Expand Down Expand Up @@ -607,8 +626,10 @@ def non_reducing_function(row):
assert names == list(df.index)


def test_apply_raw_function_runs_once():
def test_apply_raw_function_runs_once(engine):
# https://github.com/pandas-dev/pandas/issues/34506
if engine == "numba":
pytest.skip("appending to list outside of numba func is not supported")

df = DataFrame({"a": [1, 2, 3]})
values = [] # Save row values function is applied to
Expand All @@ -623,7 +644,7 @@ def non_reducing_function(row):
for func in [reducing_function, non_reducing_function]:
del values[:]

df.apply(func, raw=True, axis=1)
df.apply(func, engine=engine, raw=True, axis=1)
assert values == list(df.a.to_list())


Expand Down Expand Up @@ -1449,10 +1470,12 @@ def test_apply_no_suffix_index():
tm.assert_frame_equal(result, expected)


def test_apply_raw_returns_string():
def test_apply_raw_returns_string(engine):
# https://github.com/pandas-dev/pandas/issues/35940
if engine == "numba":
pytest.skip("No object dtype support in numba")
df = DataFrame({"A": ["aa", "bbb"]})
result = df.apply(lambda x: x[0], axis=1, raw=True)
result = df.apply(lambda x: x[0], engine=engine, axis=1, raw=True)
expected = Series(["aa", "bbb"])
tm.assert_series_equal(result, expected)

Expand Down Expand Up @@ -1632,3 +1655,14 @@ def test_agg_dist_like_and_nonunique_columns():
result = df.agg({"A": "count"})
expected = df["A"].count()
tm.assert_series_equal(result, expected)


def test_numba_unsupported():
df = DataFrame(
{"A": [None, 2, 3], "B": [1.0, np.nan, 3.0], "C": ["foo", None, "bar"]}
)
with pytest.raises(
ValueError,
match="The numba engine in DataFrame.apply can only be used when raw=True",
):
df.apply(lambda x: x, engine="numba", raw=False)

0 comments on commit ce5fdf0

Please sign in to comment.