Skip to content

Commit

Permalink
Add engine="numbagg" (#72)
Browse files Browse the repository at this point in the history
* Add engine="numbagg"

* Fix.

* fix CI

* Add nanlen

* fix env

* Add numbagg

* Bettter numbagg benchmarks?

* Update ci/environment.yml

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* cleanup

* Error on dtype specified

* Don't shadow sum, mean, sum_of_squares

* more skip

* Fix backup npg aggregations

* xfail nanmean bool

* ignore numbagg for mypy

* Add to upstream-dev CI

* Add to optional dependencies

* Fix bool reductions

* fix mypy ignore

* reintroduce engines

* Update docstring

* Update docs.

* Support more aggregations

* More aggregations

* back to nancount

* Add any, all

* promote in nanstd too

* Add ddof in anticipation of numbagg/numbagg#138

* Add more benchmarks

* reorder benchmark table

* Fix numba compilation setup?

* More benchmarks

* Rework benchmarks

* small docstring update

* ignore asv typing

* fix type ignoring

* Guard against numbagg failures

* Use released numbagg

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Maximilian Roos <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2023
1 parent 68b122e commit 9f82e19
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 32 deletions.
1 change: 1 addition & 0 deletions asv_bench/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
// followed by the pip installed packages).
//
"matrix": {
"numbagg": [""],
"numpy_groupies": [""],
"numpy": [""],
"pandas": [""],
Expand Down
101 changes: 83 additions & 18 deletions asv_bench/benchmarks/reduce.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,86 @@
import numpy as np
import numpy_groupies as npg
import pandas as pd
from asv_runner.benchmarks.mark import parameterize, skip_for_params

import flox
import flox.aggregations

from . import parameterized
N = 3000
funcs = ["sum", "nansum", "mean", "nanmean", "max", "nanmax", "var", "count", "all"]
engines = ["flox", "numpy", "numbagg"]
expected_groups = {
"None": None,
"RangeIndex": pd.RangeIndex(5),
"bins": pd.IntervalIndex.from_breaks([1, 2, 4]),
}
expected_names = tuple(expected_groups)

N = 1000
funcs = ["sum", "nansum", "mean", "nanmean", "max", "var", "nanvar", "count"]
engines = ["flox", "numpy"]
expected_groups = [None, pd.IntervalIndex.from_breaks([1, 2, 4])]
NUMBAGG_FUNCS = ["nansum", "nanmean", "nanmax", "count", "all"]

numbagg_skip = [
(func, expected_names[0], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS
] + [(func, expected_names[1], "numbagg") for func in funcs if func not in NUMBAGG_FUNCS]


def setup_jit():
# pre-compile jitted funcs
labels = np.ones((N), dtype=int)
array1 = np.ones((N), dtype=float)
array2 = np.ones((N, N), dtype=float)

if "numba" in engines:
for func in funcs:
method = getattr(flox.aggregate_npg, func)
method(labels, array1, engine="numba")
if "numbagg" in engines:
for func in set(NUMBAGG_FUNCS) & set(funcs):
flox.groupby_reduce(array1, labels, func=func, engine="numbagg")
flox.groupby_reduce(array2, labels, func=func, engine="numbagg")


class ChunkReduce:
"""Time the core reduction function."""

min_run_count = 5
warmup_time = 1

def setup(self, *args, **kwargs):
# pre-compile jitted funcs
if "numba" in engines:
for func in funcs:
npg.aggregate_numba.aggregate(
np.ones((100,), dtype=int), np.ones((100,), dtype=int), func=func
)
raise NotImplementedError

@parameterized("func, engine, expected_groups", [funcs, engines, expected_groups])
def time_reduce(self, func, engine, expected_groups):
@skip_for_params(numbagg_skip)
@parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
def time_reduce(self, func, expected_name, engine):
flox.groupby_reduce(
self.array,
self.labels,
func=func,
engine=engine,
axis=self.axis,
expected_groups=expected_groups,
expected_groups=expected_groups[expected_name],
)

@parameterized("func, engine, expected_groups", [funcs, engines, expected_groups])
def peakmem_reduce(self, func, engine, expected_groups):
@parameterize({"func": ["nansum", "nanmean", "nanmax", "count"], "engine": engines})
def time_reduce_bare(self, func, engine):
flox.aggregations.generic_aggregate(
self.labels,
self.array,
axis=-1,
size=5,
func=func,
engine=engine,
fill_value=0,
)

@skip_for_params(numbagg_skip)
@parameterize({"func": funcs, "expected_name": expected_names, "engine": engines})
def peakmem_reduce(self, func, expected_name, engine):
flox.groupby_reduce(
self.array,
self.labels,
func=func,
engine=engine,
axis=self.axis,
expected_groups=expected_groups,
expected_groups=expected_groups[expected_name],
)


Expand All @@ -52,17 +89,45 @@ def setup(self, *args, **kwargs):
self.array = np.ones((N,))
self.labels = np.repeat(np.arange(5), repeats=N // 5)
self.axis = -1
if "numbagg" in args:
setup_jit()


class ChunkReduce1DUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N,))
self.labels = np.random.permutation(np.repeat(np.arange(5), repeats=N // 5))
self.axis = -1
setup_jit()


class ChunkReduce2D(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.repeat(np.arange(N // 5), repeats=5)
self.axis = -1
setup_jit()


class ChunkReduce2DUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5))
self.axis = -1
setup_jit()


class ChunkReduce2DAllAxes(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.repeat(np.arange(N // 5), repeats=5)
self.axis = None
setup_jit()


class ChunkReduce2DAllAxesUnsorted(ChunkReduce):
def setup(self, *args, **kwargs):
self.array = np.ones((N, N))
self.labels = np.random.permutation(np.repeat(np.arange(N // 5), repeats=5))
self.axis = None
setup_jit()
2 changes: 2 additions & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ dependencies:
- toolz
- numba
- scipy
- pip:
- numbagg>=0.3
1 change: 1 addition & 0 deletions ci/upstream-dev-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ dependencies:
- git+https://github.com/pandas-dev/pandas
- git+https://github.com/dask/dask
- git+https://github.com/ml31415/numpy-groupies
- git+https://github.com/numbagg/numbagg
8 changes: 5 additions & 3 deletions docs/source/engines.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
1. `engine="numba"` wraps `numpy_groupies.aggregate_numba`. This uses `numba` kernels for the core aggregation.
1. `engine="flox"` uses the `ufunc.reduceat` method after first argsorting the array so that all group members occur sequentially. This was copied from
a [gist by Stephan Hoyer](https://gist.github.com/shoyer/f538ac78ae904c936844)
1. `engine="numbagg"` uses the reductions available in [`numbagg.grouped`](https://github.com/numbagg/numbagg/blob/main/numbagg/grouped.py)
from the [numbagg](https://github.com/numbagg/numbagg) project.

See [](arrays) for more details.

## Tradeoffs

For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="flox"` *can* be faster.
For the common case of reducing a nD array by a 1D array of group labels (e.g. `groupby("time.month")`), `engine="numbagg"` is almost always faster, and `engine="flox"` *can* be faster.

The reason is that `numpy_groupies` converts all groupby problems to a 1D problem, this can involve [some overhead](https://github.com/ml31415/numpy-groupies/pull/46).
It is possible to optimize this a bit in `flox` or `numpy_groupies`, but the work has not been done yet.
The advantage of `engine="numpy"` is that it tends to work for more array types, since it appears to be more common to implement `np.bincount`, and not `np.add.reduceat`.

```{tip}
Other potential engines we could add are [`numbagg`](https://github.com/numbagg/numbagg) ([stalled PR here](https://github.com/xarray-contrib/flox/pull/72)) and [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
Both use numba for high-performance aggregations. Contributions or discussion is very welcome!
One other potential engine we could add is [`datashader`](https://github.com/xarray-contrib/flox/issues/142).
Contributions or discussion is very welcome!
```
100 changes: 100 additions & 0 deletions flox/aggregate_numbagg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from functools import partial

import numbagg
import numbagg.grouped
import numpy as np


def _numbagg_wrapper(
group_idx,
array,
*,
axis=-1,
func="sum",
size=None,
fill_value=None,
dtype=None,
numbagg_func=None,
):
return numbagg_func(
array,
group_idx,
axis=axis,
num_labels=size,
# The following are unsupported
# fill_value=fill_value,
# dtype=dtype,
)


def nansum(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
if np.issubdtype(array.dtype, np.bool_):
array = array.astype(np.in64)
return numbagg.grouped.group_nansum(
array,
group_idx,
axis=axis,
num_labels=size,
# fill_value=fill_value,
# dtype=dtype,
)


def nanmean(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanmean(
array,
group_idx,
axis=axis,
num_labels=size,
# fill_value=fill_value,
# dtype=dtype,
)


def nanvar(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
assert ddof != 0
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanvar(
array,
group_idx,
axis=axis,
num_labels=size,
# ddof=0,
# fill_value=fill_value,
# dtype=dtype,
)


def nanstd(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None, ddof=0):
assert ddof != 0
if np.issubdtype(array.dtype, np.int_):
array = array.astype(np.float64)
return numbagg.grouped.group_nanstd(
array,
group_idx,
axis=axis,
num_labels=size,
# ddof=0,
# fill_value=fill_value,
# dtype=dtype,
)


nansum_of_squares = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nansum_of_squares)
nanlen = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nancount)
nanprod = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanprod)
nanfirst = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanfirst)
nanlast = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanlast)
# nanargmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmax)
# nanargmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanargmin)
nanmax = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmax)
nanmin = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanmin)
any = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanany)
all = partial(_numbagg_wrapper, numbagg_func=numbagg.grouped.group_nanall)

# sum = nansum
# mean = nanmean
# sum_of_squares = nansum_of_squares
18 changes: 17 additions & 1 deletion flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,28 @@ def generic_aggregate(
except AttributeError:
method = get_npg_aggregation(func, engine="numpy")

elif engine == "numbagg":
from . import aggregate_numbagg

try:
if (
# numabgg hardcodes ddof=1
("var" in func or "std" in func)
and kwargs.get("ddof", 0) == 0
):
method = get_npg_aggregation(func, engine="numpy")

else:
method = getattr(aggregate_numbagg, func)
except AttributeError:
method = get_npg_aggregation(func, engine="numpy")

elif engine in ["numpy", "numba"]:
method = get_npg_aggregation(func, engine=engine)

else:
raise ValueError(
f"Expected engine to be one of ['flox', 'numpy', 'numba']. Received {engine} instead."
f"Expected engine to be one of ['flox', 'numpy', 'numba', 'numbagg']. Received {engine} instead."
)

group_idx = np.asarray(group_idx, like=array)
Expand Down
Loading

0 comments on commit 9f82e19

Please sign in to comment.