Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: Improve performance in rolling.mean(engine="numba") #43612

Merged
merged 28 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5e0b2cc
Add mean kernel
mroeschke Sep 12, 2021
4f2d298
Add a shared executer function
mroeschke Sep 13, 2021
8ada0be
Add stub of a numba apply function
mroeschke Sep 13, 2021
f201dbd
Hook in numba apply to mean
mroeschke Sep 13, 2021
bf22d88
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 16, 2021
9ec1ef0
Fix caching tests, don't parallelize when ineffective
mroeschke Sep 16, 2021
8132622
Add whatsnew and fix caching
mroeschke Sep 16, 2021
d9b39bd
add PR number
mroeschke Sep 16, 2021
9ddf423
Make _numba private
mroeschke Sep 16, 2021
68524fd
Switch args
mroeschke Sep 16, 2021
cc786dd
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 17, 2021
2d19aa0
Fix typing
mroeschke Sep 17, 2021
705bb8c
Tighten docstring
mroeschke Sep 17, 2021
3622700
Keep kernels in their own directory
mroeschke Sep 17, 2021
0fa551a
Add some typing
mroeschke Sep 17, 2021
675b5a1
Add Series test cases
mroeschke Sep 18, 2021
a169423
Type column looper
mroeschke Sep 18, 2021
2842199
Add name
mroeschke Sep 18, 2021
46f1b6b
Add __all__
mroeschke Sep 19, 2021
05341e3
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 19, 2021
f4f59c8
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 20, 2021
c4bd78c
Add dtype to empty result
mroeschke Sep 20, 2021
8801cf9
Change nobs to int
mroeschke Sep 20, 2021
ce6aafb
Simplify monitonically increasing for bounds
mroeschke Sep 20, 2021
f0b38fd
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 22, 2021
ca0653b
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 22, 2021
09e1eb2
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 23, 2021
fd595c5
Remove unused kwargs and args
mroeschke Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Performance improvements
- Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`)
- :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`)
- Performance improvement in :meth:`SparseArray.take` with ``allow_fill=False`` (:issue:`43654`)
-
- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:`43612`)

.. ---------------------------------------------------------------------------

Expand Down
Empty file added pandas/core/_numba/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

from typing import Callable

import numpy as np

from pandas._typing import Scalar
from pandas.compat._optional import import_optional_dependency

from pandas.core.util.numba_ import (
NUMBA_FUNC_CACHE,
get_jit_arguments,
)


def generate_shared_aggregator(
func: Callable[..., Scalar],
engine_kwargs: dict[str, bool] | None,
cache_key_str: str,
):
"""
Generate a Numba function that loops over the columns 2D object and applies
a 1D numba kernel over each column.

Parameters
----------
func : function
aggregation function to be applied to each column
engine_kwargs : dict
dictionary of arguments to be passed into numba.jit
cache_key_str: str
string to access the compiled function of the form
<caller_type>_<aggregation_type> e.g. rolling_mean, groupby_mean

Returns
-------
Numba function
"""
nopython, nogil, parallel = get_jit_arguments(engine_kwargs, None)

cache_key = (func, cache_key_str)
if cache_key in NUMBA_FUNC_CACHE:
return NUMBA_FUNC_CACHE[cache_key]

numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
jreback marked this conversation as resolved.
Show resolved Hide resolved
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
):
result = np.empty((len(start), values.shape[1]), dtype=np.float64)
for i in numba.prange(values.shape[1]):
mzeitlin11 marked this conversation as resolved.
Show resolved Hide resolved
result[:, i] = func(values[:, i], start, end, min_periods)
return result

return column_looper
3 changes: 3 additions & 0 deletions pandas/core/_numba/kernels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pandas.core._numba.kernels.mean_ import sliding_mean

__all__ = ["sliding_mean"]
119 changes: 119 additions & 0 deletions pandas/core/_numba/kernels/mean_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Numba 1D aggregation kernels that can be shared by
* Dataframe / Series
* groupby
* rolling / expanding

Mirrors pandas/_libs/window/aggregation.pyx
"""
from __future__ import annotations

import numba
import numpy as np


@numba.jit(nopython=True, nogil=True, parallel=False)
def is_monotonic_increasing(bounds: np.ndarray) -> bool:
"""Check if int64 values are monotonically increasing."""
n = len(bounds)
if n < 2:
return True
prev = bounds[0]
for i in range(1, n):
cur = bounds[i]
if cur < prev:
return False
prev = cur
return True


@numba.jit(nopython=True, nogil=True, parallel=False)
def add_mean(
val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float
) -> tuple[int, float, int, float]:
if not np.isnan(val):
nobs += 1
y = val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct += 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def remove_mean(
val: float, nobs: int, sum_x: float, neg_ct: int, compensation: float
) -> tuple[int, float, int, float]:
if not np.isnan(val):
nobs -= 1
y = -val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct -= 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_mean(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
) -> np.ndarray:
N = len(start)
nobs = 0
sum_x = 0.0
neg_ct = 0
compensation_add = 0.0
compensation_remove = 0.0

is_monotonic_increasing_bounds = is_monotonic_increasing(
start
) and is_monotonic_increasing(end)

output = np.empty(N, dtype=np.float64)

for i in range(N):
s = start[i]
e = end[i]
if i == 0 or not is_monotonic_increasing_bounds:
for j in range(s, e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)
else:
for j in range(start[i - 1], s):
val = values[j]
nobs, sum_x, neg_ct, compensation_remove = remove_mean(
val, nobs, sum_x, neg_ct, compensation_remove
)

for j in range(end[i - 1], e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)

if nobs >= min_periods and nobs > 0:
result = sum_x / nobs
if neg_ct == 0 and result < 0:
result = 0
elif neg_ct == nobs and result > 0:
result = 0
else:
result = np.nan

output[i] = result

if not is_monotonic_increasing_bounds:
nobs = 0
sum_x = 0.0
neg_ct = 0
compensation_remove = 0.0

return output
54 changes: 47 additions & 7 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from pandas.core.dtypes.missing import notna

from pandas.core._numba import executor
from pandas.core.algorithms import factorize
from pandas.core.apply import ResamplerWindowApply
from pandas.core.arrays import ExtensionArray
Expand Down Expand Up @@ -576,6 +577,44 @@ def calc(x):
else:
return self._apply_tablewise(homogeneous_func, name)

def _numba_apply(
self,
func: Callable[..., Any],
numba_cache_key_str: str,
engine_kwargs: dict[str, bool] | None = None,
):
window_indexer = self._get_window_indexer()
min_periods = (
self.min_periods
if self.min_periods is not None
else window_indexer.window_size
)
obj = self._create_data(self._selected_obj)
if self.axis == 1:
obj = obj.T
values = self._prep_values(obj.to_numpy())
if values.ndim == 1:
jreback marked this conversation as resolved.
Show resolved Hide resolved
values = values.reshape(-1, 1)
start, end = window_indexer.get_window_bounds(
num_values=len(values),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)
aggregator = executor.generate_shared_aggregator(
func, engine_kwargs, numba_cache_key_str
)
result = aggregator(values, start, end, min_periods)
NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator
result = result.T if self.axis == 1 else result
if obj.ndim == 1:
result = result.squeeze()
out = obj._constructor(result, index=obj.index, name=obj.name)
return out
else:
out = obj._constructor(result, index=obj.index, columns=obj.columns)
return self._resolve_output(out, obj)
jreback marked this conversation as resolved.
Show resolved Hide resolved

def aggregate(self, func, *args, **kwargs):
result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
if result is None:
Expand Down Expand Up @@ -1331,15 +1370,16 @@ def mean(
if maybe_use_numba(engine):
if self.method == "table":
func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
else:
func = np.nanmean
from pandas.core._numba.kernels import sliding_mean

return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
return self._numba_apply(sliding_mean, "rolling_mean", engine_kwargs)
window_func = window_aggregations.roll_mean
return self._apply(window_func, name="mean", **kwargs)

Expand Down
34 changes: 21 additions & 13 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,52 @@ def f(x, *args):
)
tm.assert_series_equal(result, expected)

@pytest.mark.parametrize(
"data", [DataFrame(np.eye(5)), Series(range(5), name="foo")]
)
def test_numba_vs_cython_rolling_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
roll = df.rolling(2)
roll = data.rolling(2)
result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(roll, method)(engine="cython")

# Check the cache
assert (getattr(np, f"nan{method}"), "Rolling_apply_single") in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Rolling_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))])
def test_numba_vs_cython_expanding_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
expand = df.expanding()
data = DataFrame(np.eye(5))
expand = data.expanding()
result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(expand, method)(engine="cython")

# Check the cache
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("jit", [True, False])
def test_cache_apply(self, jit, nogil, parallel, nopython):
Expand Down