Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: Improve performance in rolling.mean(engine="numba") #43612

Merged
merged 28 commits into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5e0b2cc
Add mean kernel
mroeschke Sep 12, 2021
4f2d298
Add a shared executer function
mroeschke Sep 13, 2021
8ada0be
Add stub of a numba apply function
mroeschke Sep 13, 2021
f201dbd
Hook in numba apply to mean
mroeschke Sep 13, 2021
bf22d88
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 16, 2021
9ec1ef0
Fix caching tests, don't parallelize when ineffective
mroeschke Sep 16, 2021
8132622
Add whatsnew and fix caching
mroeschke Sep 16, 2021
d9b39bd
add PR number
mroeschke Sep 16, 2021
9ddf423
Make _numba private
mroeschke Sep 16, 2021
68524fd
Switch args
mroeschke Sep 16, 2021
cc786dd
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 17, 2021
2d19aa0
Fix typing
mroeschke Sep 17, 2021
705bb8c
Tighten docstring
mroeschke Sep 17, 2021
3622700
Keep kernels in their own directory
mroeschke Sep 17, 2021
0fa551a
Add some typing
mroeschke Sep 17, 2021
675b5a1
Add Series test cases
mroeschke Sep 18, 2021
a169423
Type column looper
mroeschke Sep 18, 2021
2842199
Add name
mroeschke Sep 18, 2021
46f1b6b
Add __all__
mroeschke Sep 19, 2021
05341e3
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 19, 2021
f4f59c8
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 20, 2021
c4bd78c
Add dtype to empty result
mroeschke Sep 20, 2021
8801cf9
Change nobs to int
mroeschke Sep 20, 2021
ce6aafb
Simplify monitonically increasing for bounds
mroeschke Sep 20, 2021
f0b38fd
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 22, 2021
ca0653b
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 22, 2021
09e1eb2
Merge remote-tracking branch 'upstream/master' into kernels/mean_kernel
mroeschke Sep 23, 2021
fd595c5
Remove unused kwargs and args
mroeschke Sep 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ Performance improvements
- Performance improvement in indexing with a :class:`MultiIndex` indexer on another :class:`MultiIndex` (:issue:43370`)
- Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`)
- :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`)
-
- Performance improvement in :meth:`.Rolling.mean` and :meth:`.Expanding.mean` with ``engine="numba"`` (:issue:`43612`)

.. ---------------------------------------------------------------------------

Expand Down
Empty file added pandas/core/_numba/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions pandas/core/_numba/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

from typing import (
Any,
Callable,
)

import numpy as np

from pandas._typing import Scalar
from pandas.compat._optional import import_optional_dependency

from pandas.core.util.numba_ import (
NUMBA_FUNC_CACHE,
get_jit_arguments,
)


def generate_shared_aggregator(
func: Callable[..., Scalar],
kwargs: dict[str, Any],
engine_kwargs: dict[str, bool] | None,
cache_key_str: str,
):
"""
Generate a Numba function that loops over the columns 2D object and applies
a 1D numba kernel over each column.

Parameters
----------
func : function
aggregation function to be applied to each column
kwargs : dict
**kwargs to be passed into the function. Should be unused as not
supported by Numba
engine_kwargs : dict
dictionary of arguments to be passed into numba.jit
cache_key_str: str
string to access the compiled function of the form
<caller_type>_<aggregation_type> e.g. rolling_mean, groupby_mean

Returns
-------
Numba function
"""
nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs)

cache_key = (func, cache_key_str)
if cache_key in NUMBA_FUNC_CACHE:
return NUMBA_FUNC_CACHE[cache_key]

numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def column_looper(
jreback marked this conversation as resolved.
Show resolved Hide resolved
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
):
result = np.empty((len(start), values.shape[1]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can dtype by specified?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added dtype.

for i in numba.prange(values.shape[1]):
mzeitlin11 marked this conversation as resolved.
Show resolved Hide resolved
result[:, i] = func(values[:, i], start, end, min_periods)
return result

return column_looper
3 changes: 3 additions & 0 deletions pandas/core/_numba/kernels/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from pandas.core._numba.kernels.mean_ import sliding_mean

__all__ = ["sliding_mean"]
119 changes: 119 additions & 0 deletions pandas/core/_numba/kernels/mean_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Numba 1D aggregation kernels that can be shared by
* Dataframe / Series
* groupby
* rolling / expanding

Mirrors pandas/_libs/window/aggregation.pyx
"""
from __future__ import annotations

import numba
import numpy as np


@numba.jit(nopython=True, nogil=True, parallel=False)
def is_monotonic_increasing(bounds: np.ndarray) -> bool:
n = len(bounds)
if n == 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this block. n==1 and n < 2 -> n==0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point I was able to simplify this block.

return bounds[0] == bounds[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to stop single element NaN sequences from being monotonic increasing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so, yes. This snippet was taken from translating this function, but I was able to remove this condition since we know the inputs should be int64s will no NaNs

if n == 1:

elif n < 2:
return True
prev = bounds[0]
for i in range(1, n):
cur = bounds[i]
if cur < prev:
return False
return True


@numba.jit(nopython=True, nogil=True, parallel=False)
def add_mean(
val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float
) -> tuple[float, float, int, float]:
if not np.isnan(val):
nobs += 1
y = val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct += 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def remove_mean(
val: float, nobs: float, sum_x: float, neg_ct: int, compensation: float
) -> tuple[float, float, int, float]:
if not np.isnan(val):
nobs -= 1
y = -val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct -= 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=False)
def sliding_mean(
values: np.ndarray,
start: np.ndarray,
end: np.ndarray,
min_periods: int,
) -> np.ndarray:
N = len(start)
nobs = 0.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could nobs ever overflow int64 or uint64?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose with sufficient observations (nobs) this could overflow, but this value should be less than or equal to the window size so the user would also have to provide a window size that overflows u/int64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an actual build for the maximum size of a NumPy array, np.intp. This is int64 on Windows64 and Linux, and I suspect on OSX. It seems that nobs should be in integer which should be slightly faster than a float.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, changed nobs to a int.

sum_x = 0.0
neg_ct = 0
compensation_add = 0.0
compensation_remove = 0.0

is_monotonic_increasing_bounds = is_monotonic_increasing(
start
) and is_monotonic_increasing(end)

output = np.empty(N, dtype=np.float64)

for i in range(N):
s = start[i]
e = end[i]
if i == 0 or not is_monotonic_increasing_bounds:
for j in range(s, e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)
else:
for j in range(start[i - 1], s):
val = values[j]
nobs, sum_x, neg_ct, compensation_remove = remove_mean(
val, nobs, sum_x, neg_ct, compensation_remove
)

for j in range(end[i - 1], e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean(
val, nobs, sum_x, neg_ct, compensation_add
)

if nobs >= min_periods and nobs > 0:
result = sum_x / nobs
if neg_ct == 0 and result < 0:
result = 0
elif neg_ct == nobs and result > 0:
result = 0
else:
result = np.nan

output[i] = result

if not is_monotonic_increasing_bounds:
nobs = 0.0
sum_x = 0.0
neg_ct = 0
compensation_remove = 0.0

return output
56 changes: 49 additions & 7 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from pandas.core.dtypes.missing import notna

from pandas.core._numba import executor
from pandas.core.algorithms import factorize
from pandas.core.apply import ResamplerWindowApply
from pandas.core.arrays import ExtensionArray
Expand Down Expand Up @@ -568,6 +569,46 @@ def calc(x):
else:
return self._apply_tablewise(homogeneous_func, name)

def _numba_apply(
self,
func: Callable[..., Any],
numba_cache_key_str: str,
engine_kwargs: dict[str, bool] | None = None,
numba_args: tuple[Any, ...] = (),
**kwargs,
):
window_indexer = self._get_window_indexer()
min_periods = (
self.min_periods
if self.min_periods is not None
else window_indexer.window_size
)
obj = self._create_data(self._selected_obj)
if self.axis == 1:
obj = obj.T
values = self._prep_values(obj.to_numpy())
if values.ndim == 1:
jreback marked this conversation as resolved.
Show resolved Hide resolved
values = values.reshape(-1, 1)
start, end = window_indexer.get_window_bounds(
num_values=len(values),
min_periods=min_periods,
center=self.center,
closed=self.closed,
)
aggregator = executor.generate_shared_aggregator(
func, kwargs, engine_kwargs, numba_cache_key_str
)
result = aggregator(values, start, end, min_periods, *numba_args)
NUMBA_FUNC_CACHE[(func, numba_cache_key_str)] = aggregator
result = result.T if self.axis == 1 else result
if obj.ndim == 1:
result = result.squeeze()
out = obj._constructor(result, index=obj.index, name=obj.name)
return out
else:
out = obj._constructor(result, index=obj.index, columns=obj.columns)
return self._resolve_output(out, obj)
jreback marked this conversation as resolved.
Show resolved Hide resolved

def aggregate(self, func, *args, **kwargs):
result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg()
if result is None:
Expand Down Expand Up @@ -1323,15 +1364,16 @@ def mean(
if maybe_use_numba(engine):
if self.method == "table":
func = generate_manual_numpy_nan_agg_with_axis(np.nanmean)
return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
else:
func = np.nanmean
from pandas.core._numba.kernels import sliding_mean

return self.apply(
func,
raw=True,
engine=engine,
engine_kwargs=engine_kwargs,
)
return self._numba_apply(sliding_mean, "rolling_mean", engine_kwargs)
window_func = window_aggregations.roll_mean
return self._apply(window_func, name="mean", **kwargs)

Expand Down
34 changes: 21 additions & 13 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,52 @@ def f(x, *args):
)
tm.assert_series_equal(result, expected)

@pytest.mark.parametrize(
"data", [DataFrame(np.eye(5)), Series(range(5), name="foo")]
)
def test_numba_vs_cython_rolling_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
roll = df.rolling(2)
roll = data.rolling(2)
result = getattr(roll, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(roll, method)(engine="cython")

# Check the cache
assert (getattr(np, f"nan{method}"), "Rolling_apply_single") in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Rolling_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("data", [DataFrame(np.eye(5)), Series(range(5))])
def test_numba_vs_cython_expanding_methods(
self, nogil, parallel, nopython, arithmetic_numba_supported_operators
self, data, nogil, parallel, nopython, arithmetic_numba_supported_operators
):

method = arithmetic_numba_supported_operators

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(np.eye(5))
expand = df.expanding()
data = DataFrame(np.eye(5))
expand = data.expanding()
result = getattr(expand, method)(engine="numba", engine_kwargs=engine_kwargs)
expected = getattr(expand, method)(engine="cython")

# Check the cache
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE
if method != "mean":
assert (
getattr(np, f"nan{method}"),
"Expanding_apply_single",
) in NUMBA_FUNC_CACHE

tm.assert_frame_equal(result, expected)
tm.assert_equal(result, expected)

@pytest.mark.parametrize("jit", [True, False])
def test_cache_apply(self, jit, nogil, parallel, nopython):
Expand Down