diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index c1934de1..599312a4 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -77,7 +77,7 @@ jobs: --ignore flox/tests \ --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests @@ -131,7 +131,7 @@ jobs: python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report - name: Upload mypy coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.4.1 with: file: mypy_report/cobertura.xml flags: mypy diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 00cbcd73..2f330846 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -64,7 +64,7 @@ jobs: run: | pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests @@ -115,7 +115,7 @@ jobs: run: | python -m pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests diff --git a/asv_bench/asv.conf.json b/asv_bench/asv.conf.json index 207572fc..9178209d 100644 --- a/asv_bench/asv.conf.json +++ b/asv_bench/asv.conf.json @@ -27,6 +27,11 @@ // "python setup.py build", // "PIP_NO_BUILD_ISOLATION=false python -mpip wheel --no-deps --no-index -w {build_cache_dir} {build_dir}" // ], + // + "build_command": [ + "python setup.py build", + "python -mpip wheel --no-deps --no-build-isolation --no-index -w {build_cache_dir} {build_dir}" + ], // List of branches to benchmark. If not provided, defaults to "master" // (for git) or "default" (for mercurial). diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 7d74f884..f827f6e0 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -1,3 +1,5 @@ +from functools import cached_property + import dask import numpy as np import pandas as pd @@ -11,11 +13,14 @@ class Cohorts: def setup(self, *args, **kwargs): raise NotImplementedError + @cached_property + def result(self): + return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0] + def containment(self): asfloat = self.bitmask().astype(float) chunks_per_label = asfloat.sum(axis=0) containment = (asfloat.T @ asfloat) / chunks_per_label - print(containment.nnz / np.prod(containment.shape)) return containment.todense() def chunks_cohorts(self): @@ -43,26 +48,17 @@ def time_find_group_cohorts(self): pass def time_graph_construct(self): - flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") + flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis) def track_num_tasks(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.to_dict()) + return len(self.result.dask.to_dict()) def track_num_tasks_optimized(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - (opt,) = dask.optimize(result) + (opt,) = dask.optimize(self.result) return len(opt.dask.to_dict()) def track_num_layers(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.layers) + return len(self.result.dask.layers) track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy @@ -193,6 +189,19 @@ def setup(self, *args, **kwargs): self.expected = pd.RangeIndex(self.by.max() + 1) +class SingleChunk(Cohorts): + """Single chunk along reduction axis: always blockwise.""" + + def setup(self, *args, **kwargs): + index = pd.date_range("1959-01-01", freq="D", end="1962-12-31") + self.time = pd.Series(index) + TIME = len(self.time) + self.axis = (2,) + self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, -1)) + self.by = codes_for_resampling(index, freq="5D") + self.expected = pd.RangeIndex(self.by.max() + 1) + + class OISST(Cohorts): def setup(self, *args, **kwargs): self.array = dask.array.ones((1, 14532), chunks=(1, 10)) diff --git a/ci/docs.yml b/ci/docs.yml index 860ea27c..1ad62512 100644 --- a/ci/docs.yml +++ b/ci/docs.yml @@ -2,6 +2,8 @@ name: flox-doc channels: - conda-forge dependencies: + - cubed>=0.14.3 + - cubed-xarray - dask-core - pip - xarray diff --git a/ci/environment.yml b/ci/environment.yml index f9ca001c..5a6b09df 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -6,7 +6,7 @@ dependencies: - cachey - cftime - codecov - - cubed>=0.14.2 + - cubed>=0.14.3 - dask-core - pandas - numpy>=1.22 diff --git a/docs/source/user-stories.md b/docs/source/user-stories.md index 0241e01d..eb0cf333 100644 --- a/docs/source/user-stories.md +++ b/docs/source/user-stories.md @@ -7,6 +7,7 @@ user-stories/overlaps.md user-stories/climatology.ipynb user-stories/climatology-hourly.ipynb + user-stories/climatology-hourly-cubed.ipynb user-stories/custom-aggregations.ipynb user-stories/nD-bins.ipynb ``` diff --git a/docs/source/user-stories/climatology-hourly-cubed.ipynb b/docs/source/user-stories/climatology-hourly-cubed.ipynb new file mode 100644 index 00000000..be80b4f9 --- /dev/null +++ b/docs/source/user-stories/climatology-hourly-cubed.ipynb @@ -0,0 +1,137 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# More climatology reductions using Cubed\n", + "\n", + "This is the Cubed equivalent of [More climatology reductions](climatology-hourly.ipynb).\n", + "\n", + "The task is to compute an hourly climatology from an hourly dataset with 744 hours in each chunk, using the \"map-reduce\" strategy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import cubed\n", + "import cubed.array_api as xp\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xarray as xr\n", + "\n", + "import flox.xarray" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "## Create data\n", + "\n", + "Note that we use fewer lat/long points so the computation can be run locally." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "spec = cubed.Spec(allowed_mem=\"2GB\")\n", + "ds = xr.Dataset(\n", + " {\n", + " \"tp\": (\n", + " (\"time\", \"latitude\", \"longitude\"),\n", + " xp.ones((8760, 72, 144), chunks=(744, 5, 144), dtype=np.float32, spec=spec),\n", + " )\n", + " },\n", + " coords={\"time\": pd.date_range(\"2021-01-01\", \"2021-12-31 23:59\", freq=\"h\")},\n", + ")\n", + "ds" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "## Computation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "hourly = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func=\"mean\", reindex=True)\n", + "hourly" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "hourly.compute()" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Other climatologies: resampling by month\n", + "\n", + "This uses the \"blockwise\" strategy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n", + "monthly" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "monthly.compute()" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/docs/source/user-stories/climatology.ipynb b/docs/source/user-stories/climatology.ipynb index 30bf38b7..3cbf8b80 100644 --- a/docs/source/user-stories/climatology.ipynb +++ b/docs/source/user-stories/climatology.ipynb @@ -61,7 +61,9 @@ "source": [ "To account for Feb-29 being present in some years, we'll construct a time vector to group by as \"mmm-dd\" string.\n", "\n", - "For more options, see https://strftime.org/" + "```{seealso}\n", + "For more options, see [this great website](https://strftime.org/).\n", + "```" ] }, { @@ -80,7 +82,7 @@ "id": "6", "metadata": {}, "source": [ - "## map-reduce\n", + "## First, `method=\"map-reduce\"`\n", "\n", "The default\n", "[method=\"map-reduce\"](https://flox.readthedocs.io/en/latest/implementation.html#method-map-reduce)\n", @@ -110,7 +112,7 @@ "id": "8", "metadata": {}, "source": [ - "## Rechunking for map-reduce\n", + "### Rechunking for map-reduce\n", "\n", "We can split each chunk along the `lat`, `lon` dimensions to make sure the\n", "output chunk sizes are more reasonable\n" @@ -139,7 +141,7 @@ "But what if we didn't want to rechunk the dataset so drastically (note the 10x\n", "increase in tasks). For that let's try `method=\"cohorts\"`\n", "\n", - "## method=cohorts\n", + "## `method=\"cohorts\"`\n", "\n", "We can take advantage of patterns in the groups here \"day of year\".\n", "Specifically:\n", @@ -271,7 +273,7 @@ "id": "21", "metadata": {}, "source": [ - "And now our cohorts contain more than one group\n" + "And now our cohorts contain more than one group, *and* there is a substantial reduction in number of cohorts **162 -> 12**\n" ] }, { @@ -281,7 +283,7 @@ "metadata": {}, "outputs": [], "source": [ - "preferrd_method, new_cohorts = flox.core.find_group_cohorts(\n", + "preferred_method, new_cohorts = flox.core.find_group_cohorts(\n", " labels=codes,\n", " chunks=(rechunked.chunksizes[\"time\"],),\n", ")\n", @@ -295,13 +297,23 @@ "id": "23", "metadata": {}, "outputs": [], + "source": [ + "preferred_method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], "source": [ "new_cohorts.values()" ] }, { "cell_type": "markdown", - "id": "24", + "id": "25", "metadata": {}, "source": [ "Now the groupby reduction **looks OK** in terms of number of tasks but remember\n", @@ -311,7 +323,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -320,7 +332,25 @@ }, { "cell_type": "markdown", - "id": "26", + "id": "27", + "metadata": {}, + "source": [ + "flox's heuristics will choose `\"cohorts\"` automatically!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28", + "metadata": {}, + "outputs": [], + "source": [ + "flox.xarray.xarray_reduce(rechunked, day, func=\"mean\")" + ] + }, + { + "cell_type": "markdown", + "id": "29", "metadata": {}, "source": [ "## How about other climatologies?\n", @@ -331,7 +361,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "30", "metadata": {}, "outputs": [], "source": [ @@ -340,7 +370,7 @@ }, { "cell_type": "markdown", - "id": "28", + "id": "31", "metadata": {}, "source": [ "This looks great. Why?\n", diff --git a/flox/core.py b/flox/core.py index 7946d1e7..091b39e1 100644 --- a/flox/core.py +++ b/flox/core.py @@ -8,6 +8,7 @@ import warnings from collections import namedtuple from collections.abc import Sequence +from concurrent.futures import ThreadPoolExecutor from functools import partial, reduce from itertools import product from numbers import Integral @@ -17,6 +18,7 @@ Callable, Literal, TypedDict, + TypeVar, Union, overload, ) @@ -96,6 +98,7 @@ T_MethodOpt = None | Literal["map-reduce", "blockwise", "cohorts"] T_IsBins = Union[bool | Sequence[bool]] +T = TypeVar("T") IntermediateDict = dict[Union[str, Callable], Any] FinalResultsDict = dict[str, Union["DaskArray", "CubedArray", np.ndarray]] @@ -140,6 +143,10 @@ def _postprocess_numbagg(result, *, func, fill_value, size, seen_groups): return result +def identity(x: T) -> T: + return x + + def _issorted(arr: np.ndarray) -> bool: return bool((arr[:-1] <= arr[1:]).all()) @@ -248,34 +255,77 @@ def slices_from_chunks(chunks): def _compute_label_chunk_bitmask(labels, chunks, nlabels): + def make_bitmask(rows, cols): + data = np.broadcast_to(np.array(1, dtype=np.uint8), rows.shape) + return csc_array((data, (rows, cols)), dtype=bool, shape=(nchunks, nlabels)) + assert isinstance(labels, np.ndarray) shape = tuple(sum(c) for c in chunks) nchunks = math.prod(len(c) for c in chunks) + approx_chunk_size = math.prod(c[0] for c in chunks) - labels = np.broadcast_to(labels, shape[-labels.ndim :]) + # Shortcut for 1D with size-1 chunks + if shape == (nchunks,): + rows_array = np.arange(nchunks) + cols_array = labels + mask = labels >= 0 + return make_bitmask(rows_array[mask], cols_array[mask]) + labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] - # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) - for region in slices_from_chunks(chunks): + + def chunk_unique(labels, slicer, nlabels, label_is_present=None): + if label_is_present is None: + label_is_present = np.empty((nlabels + 1,), dtype=bool) + label_is_present[:] = False + subset = labels[slicer] # This is a quite fast way to find unique integers, when we know how many there are # inspired by a similar idea in numpy_groupies for first, last # instead of explicitly finding uniques, repeatedly write True to the same location - subset = labels[region] - # The reshape is not strictly necessary but is about 100ms faster on a test problem. label_is_present[subset.reshape(-1)] = True # skip the -1 sentinel by slicing # Faster than np.argwhere by a lot uniques = ilabels[label_is_present[:-1]] - cols.append(uniques) - label_is_present[:] = False + return uniques + + # TODO: refine this heuristic. + # The general idea is that with the threadpool, we repeatedly allocate memory + # for `label_is_present`. We trade that off against the parallelism across number of chunks. + # For large enough number of chunks (relative to number of labels), it makes sense to + # suffer the extra allocation in exchange for parallelism. + THRESHOLD = 2 + if nlabels < THRESHOLD * approx_chunk_size: + logger.debug( + "Using threadpool since num_labels %s < %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(chunk_unique, labels, slicer, nlabels) + for slicer in slices_from_chunks(chunks) + ] + cols = tuple(f.result() for f in futures) + + else: + logger.debug( + "Using serial loop since num_labels %s > %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) + cols = [] + # Add one to handle the -1 sentinel value + label_is_present = np.empty((nlabels + 1,), dtype=bool) + for region in slices_from_chunks(chunks): + uniques = chunk_unique(labels, region, nlabels, label_is_present) + cols.append(uniques) rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) cols_array = np.concatenate(cols) - data = np.broadcast_to(np.array(1, dtype=np.uint8), rows_array.shape) - bitmask = csc_array((data, (rows_array, cols_array)), dtype=bool, shape=(nchunks, nlabels)) - return bitmask + return make_bitmask(rows_array, cols_array) # @memoize @@ -312,6 +362,7 @@ def find_group_cohorts( labels = np.asarray(labels) shape = tuple(sum(c) for c in chunks) + nchunks = math.prod(len(c) for c in chunks) # assumes that `labels` are factorized if expected_groups is None: @@ -319,6 +370,10 @@ def find_group_cohorts( else: nlabels = expected_groups[-1] + 1 + # 1. Single chunk, blockwise always + if nchunks == 1: + return "blockwise", {(0,): list(range(nlabels))} + labels = np.broadcast_to(labels, shape[-labels.ndim :]) bitmask = _compute_label_chunk_bitmask(labels, chunks, nlabels) @@ -346,21 +401,21 @@ def invert(x) -> tuple[np.ndarray, ...]: chunks_cohorts = tlz.groupby(invert, label_chunks.keys()) - # 1. Every group is contained to one block, use blockwise here. + # 2. Every group is contained to one block, use blockwise here. if bitmask.shape[CHUNK_AXIS] == 1 or (chunks_per_label == 1).all(): logger.debug("find_group_cohorts: blockwise is preferred.") return "blockwise", chunks_cohorts - # 2. Perfectly chunked so there is only a single cohort + # 3. Perfectly chunked so there is only a single cohort if len(chunks_cohorts) == 1: logger.debug("Only found a single cohort. 'map-reduce' is preferred.") return "map-reduce", chunks_cohorts if merge else {} - # 3. Our dataset has chunksize one along the axis, + # 4. Our dataset has chunksize one along the axis, single_chunks = all(all(a == 1 for a in ac) for ac in chunks) - # 4. Every chunk only has a single group, but that group might extend across multiple chunks + # 5. Every chunk only has a single group, but that group might extend across multiple chunks one_group_per_chunk = (bitmask.sum(axis=LABEL_AXIS) == 1).all() - # 5. Existing cohorts don't overlap, great for time grouping with perfect chunking + # 6. Existing cohorts don't overlap, great for time grouping with perfect chunking no_overlapping_cohorts = (np.bincount(np.concatenate(tuple(chunks_cohorts.keys()))) == 1).all() if one_group_per_chunk or single_chunks or no_overlapping_cohorts: logger.debug("find_group_cohorts: cohorts is preferred, chunking is perfect.") @@ -393,6 +448,7 @@ def invert(x) -> tuple[np.ndarray, ...]: sparsity, MAX_SPARSITY_FOR_COHORTS ) ) + # 7. Groups seem fairly randomly distributed, use "map-reduce". if sparsity > MAX_SPARSITY_FOR_COHORTS: if not merge: logger.debug( @@ -1424,7 +1480,10 @@ def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple: def subset_to_blocks( - array: DaskArray, flatblocks: Sequence[int], blkshape: tuple[int] | None = None + array: DaskArray, + flatblocks: Sequence[int], + blkshape: tuple[int] | None = None, + reindexer=identity, ) -> DaskArray: """ Advanced indexing of .blocks such that we always get a regular array back. @@ -1450,20 +1509,21 @@ def subset_to_blocks( index = _normalize_indexes(array, flatblocks, blkshape) if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index): - return array + return dask.array.map_blocks(reindexer, array, meta=array._meta) # These rest is copied from dask.array.core.py with slight modifications index = normalize_index(index, array.numblocks) index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index) - name = "blocks-" + tokenize(array, index) + name = "groupby-cohort-" + tokenize(array, index) new_keys = array._key_array[index] squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index) chunks = tuple(tuple(np.array(c)[i].tolist()) for c, i in zip(array.chunks, squeezed)) keys = itertools.product(*(range(len(c)) for c in chunks)) - layer: Graph = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} + layer: Graph = {(name,) + key: (reindexer, tuple(new_keys[key].tolist())) for key in keys} + graph = HighLevelGraph.from_collections(name, layer, dependencies=[array]) return dask.array.Array(graph, name, chunks, meta=array) @@ -1637,26 +1697,26 @@ def dask_groupby_agg( elif method == "cohorts": assert chunks_cohorts + block_shape = array.blocks.shape[-len(axis) :] + reduced_ = [] groups_ = [] for blks, cohort in chunks_cohorts.items(): - index = pd.Index(cohort) - subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :]) - reindexed = dask.array.map_blocks( - reindex_intermediates, subset, agg, index, meta=subset._meta - ) + cohort_index = pd.Index(cohort) + reindexer = partial(reindex_intermediates, agg=agg, unique_groups=cohort_index) + reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer) # now that we have reindexed, we can set reindex=True explicitlly reduced_.append( tree_reduce( reindexed, combine=partial(combine, agg=agg, reindex=True), - aggregate=partial(aggregate, expected_groups=index, reindex=True), + aggregate=partial(aggregate, expected_groups=cohort_index, reindex=True), ) ) # This is done because pandas promotes to 64-bit types when an Index is created # So we use the index to generate the return value for consistency with "map-reduce" # This is important on windows - groups_.append(index.values) + groups_.append(cohort_index.values) reduced = dask.array.concatenate(reduced_, axis=-1) groups = (np.concatenate(groups_),) @@ -1738,87 +1798,123 @@ def cubed_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - inds = tuple(range(array.ndim)) + if method == "blockwise": + assert by.ndim == 1 + assert expected_groups is not None - by_input = by + def _reduction_func(a, by, axis, start_group, num_groups): + # adjust group labels to start from 0 for each chunk + by_for_chunk = by - start_group + expected_groups_for_chunk = pd.RangeIndex(num_groups) - # Unifying chunks is necessary for argreductions. - # We need to rechunk before zipping up with the index - # let's always do it anyway - if not is_chunked_array(by): - # chunk numpy arrays like the input array - chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)) + axis = (axis,) # convert integral axis to tuple - by = cubed.from_array(by, chunks=chunks, spec=array.spec) - _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + blockwise_method = partial( + _reduce_blockwise, + agg=agg, + axis=axis, + expected_groups=expected_groups_for_chunk, + fill_value=fill_value, + engine=engine, + sort=sort, + reindex=reindex, + ) + out = blockwise_method(a, by_for_chunk) + return out[agg.name] - # Cubed's groupby_reduction handles the generation of "intermediates", and the - # "map-reduce" combination step, so we don't have to do that here. - # Only the equivalent of "_simple_combine" is supported, there is no - # support for "_grouped_combine". - labels_are_unknown = is_chunked_array(by_input) and expected_groups is None - do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + num_groups = len(expected_groups) + result = cubed.core.groupby.groupby_blockwise( + array, by, axis=axis, func=_reduction_func, num_groups=num_groups + ) + groups = (expected_groups.to_numpy(),) + return (result, groups) - assert do_simple_combine - assert method == "map-reduce" - assert expected_groups is not None - assert reindex is True - assert len(axis) == 1 # one axis/grouping + else: + inds = tuple(range(array.ndim)) - def _groupby_func(a, by, axis, intermediate_dtype, num_groups): - blockwise_method = partial( - _get_chunk_reduction(agg.reduction_type), - func=agg.chunk, - fill_value=agg.fill_value["intermediate"], - dtype=agg.dtype["intermediate"], - reindex=reindex, - user_dtype=agg.dtype["user"], + by_input = by + + # Unifying chunks is necessary for argreductions. + # We need to rechunk before zipping up with the index + # let's always do it anyway + if not is_chunked_array(by): + # chunk numpy arrays like the input array + chunks = tuple( + array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0) + ) + + by = cubed.from_array(by, chunks=chunks, spec=array.spec) + _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + + # Cubed's groupby_reduction handles the generation of "intermediates", and the + # "map-reduce" combination step, so we don't have to do that here. + # Only the equivalent of "_simple_combine" is supported, there is no + # support for "_grouped_combine". + labels_are_unknown = is_chunked_array(by_input) and expected_groups is None + do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + + assert do_simple_combine + assert method == "map-reduce" + assert expected_groups is not None + assert reindex is True + assert len(axis) == 1 # one axis/grouping + + def _groupby_func(a, by, axis, intermediate_dtype, num_groups): + blockwise_method = partial( + _get_chunk_reduction(agg.reduction_type), + func=agg.chunk, + fill_value=agg.fill_value["intermediate"], + dtype=agg.dtype["intermediate"], + reindex=reindex, + user_dtype=agg.dtype["user"], + axis=axis, + expected_groups=expected_groups, + engine=engine, + sort=sort, + ) + out = blockwise_method(a, by) + # Convert dict to one that cubed understands, dropping groups since they are + # known, and the same for every block. + return { + f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"]) + } + + def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): + # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed + # only combine over the dummy axis, to preserve grouping along 'axis' + dtype = dict(dtype) + out = {} + for idx, combine in enumerate(agg.simple_combine): + field = f"f{idx}" + out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) + return out + + def _groupby_aggregate(a): + # Convert cubed dict to one that _finalize_results works with + results = {"groups": expected_groups, "intermediates": a.values()} + out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) + return out[agg.name] + + # convert list of dtypes to a structured dtype for cubed + intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] + dtype = agg.dtype["final"] + num_groups = len(expected_groups) + + result = cubed.core.groupby.groupby_reduction( + array, + by, + func=_groupby_func, + combine_func=_groupby_combine, + aggregate_func=_groupby_aggregate, axis=axis, - expected_groups=expected_groups, - engine=engine, - sort=sort, + intermediate_dtype=intermediate_dtype, + dtype=dtype, + num_groups=num_groups, ) - out = blockwise_method(a, by) - # Convert dict to one that cubed understands, dropping groups since they are - # known, and the same for every block. - return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])} - - def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): - # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed - # only combine over the dummy axis, to preserve grouping along 'axis' - dtype = dict(dtype) - out = {} - for idx, combine in enumerate(agg.simple_combine): - field = f"f{idx}" - out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) - return out - - def _groupby_aggregate(a): - # Convert cubed dict to one that _finalize_results works with - results = {"groups": expected_groups, "intermediates": a.values()} - out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) - return out[agg.name] - - # convert list of dtypes to a structured dtype for cubed - intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] - dtype = agg.dtype["final"] - num_groups = len(expected_groups) - - result = cubed.core.groupby.groupby_reduction( - array, - by, - func=_groupby_func, - combine_func=_groupby_combine, - aggregate_func=_groupby_aggregate, - axis=axis, - intermediate_dtype=intermediate_dtype, - dtype=dtype, - num_groups=num_groups, - ) - groups = (expected_groups.to_numpy(),) + groups = (expected_groups.to_numpy(),) - return (result, groups) + return (result, groups) def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray: @@ -2264,6 +2360,7 @@ def groupby_reduce( nby = len(bys) by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) + provided_expected = expected_groups is not None if ( engine == "numbagg" @@ -2380,7 +2477,7 @@ def groupby_reduce( # The only way to do this consistently is mask out using min_count # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 if min_count is None: - if nax < by_.ndim or fill_value is not None: + if nax < by_.ndim or (fill_value is not None and provided_expected): min_count_: int = 1 else: min_count_ = 0 @@ -2406,9 +2503,9 @@ def groupby_reduce( if method is None: method = "map-reduce" - if method != "map-reduce": + if method not in ("map-reduce", "blockwise"): raise NotImplementedError( - "Reduction for Cubed arrays is only implemented for method 'map-reduce'." + "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'." ) partial_agg = partial(cubed_groupby_agg, **kwargs) diff --git a/flox/xrutils.py b/flox/xrutils.py index c6925798..915b6ee0 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -56,6 +56,37 @@ def module_available(module: str, minversion: Optional[str] = None) -> bool: dask_array_type = () # type: ignore[assignment, misc] +def module_available(module: str, minversion: Optional[str] = None) -> bool: + """Checks whether a module is installed without importing it. + + Use this for a lightweight check and lazy imports. + + Parameters + ---------- + module : str + Name of the module. + + Returns + ------- + available : bool + Whether the module is installed. + """ + has = importlib.util.find_spec(module) is not None + if has: + mod = importlib.import_module(module) + return Version(mod.__version__) >= Version(minversion) if minversion is not None else True + else: + return False + + +if module_available("numpy", minversion="2.0.0"): + from numpy.lib.array_utils import ( # type: ignore[import-not-found] + normalize_axis_index, + ) +else: + from numpy.core.numeric import normalize_axis_index # type: ignore[attr-defined] + + def asarray(data, xp=np): return data if is_duck_array(data) else xp.asarray(data) diff --git a/tests/test_core.py b/tests/test_core.py index 243aa9a6..6872a9ec 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -947,12 +947,12 @@ def test_verify_complex_cohorts(chunksize: int) -> None: @pytest.mark.parametrize("chunksize", (12,) + tuple(range(1, 13)) + (-1,)) def test_method_guessing(chunksize): # just a regression test - labels = np.tile(np.arange(1, 13), 30) + labels = np.tile(np.arange(0, 12), 30) by = dask.array.from_array(labels, chunks=chunksize) - 1 preferred_method, chunks_cohorts = find_group_cohorts(labels, by.chunks[slice(-1, None)]) if chunksize == -1: assert preferred_method == "blockwise" - assert chunks_cohorts == {(0,): list(range(1, 13))} + assert chunks_cohorts == {(0,): list(range(12))} elif chunksize in (1, 2, 3, 4, 6): assert preferred_method == "cohorts" assert len(chunks_cohorts) == 12 // chunksize @@ -961,6 +961,21 @@ def test_method_guessing(chunksize): assert chunks_cohorts == {} +@requires_dask +@pytest.mark.parametrize("ndim", [1, 2, 3]) +def test_single_chunk_method_is_blockwise(ndim): + for by_ndim in range(1, ndim + 1): + chunks = (5,) * (ndim - by_ndim) + (-1,) * by_ndim + assert len(chunks) == ndim + array = dask.array.ones(shape=(10,) * ndim, chunks=chunks) + by = np.zeros(shape=(10,) * by_ndim, dtype=int) + method, chunks_cohorts = find_group_cohorts( + by, chunks=[array.chunks[ax] for ax in range(-by.ndim, 0)] + ) + assert method == "blockwise" + assert chunks_cohorts == {(0,): [0]} + + @requires_dask @pytest.mark.parametrize( "chunk_at,expected", @@ -1195,6 +1210,40 @@ def test_group_by_datetime(engine, method): assert_equal(expected, actual) +@requires_cubed +@pytest.mark.parametrize("method", ["blockwise", "map-reduce"]) +def test_group_by_datetime_cubed(engine, method): + kwargs = dict( + func="mean", + method=method, + engine=engine, + ) + t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series() + data = t.dt.dayofyear + cubedarray = cubed.from_array(data.values, chunks=30) + + actual, _ = groupby_reduce(cubedarray, t, **kwargs) + expected = data.to_numpy().astype(float) + assert_equal(expected, actual) + + edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy() + actual, _ = groupby_reduce( + cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs + ) + expected = data.resample("ME").mean().to_numpy() + assert_equal(expected, actual) + + actual, _ = groupby_reduce( + cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])), + t.to_numpy(), + isbin=True, + expected_groups=edges, + **kwargs, + ) + expected = np.broadcast_to(expected, (2, 3, expected.shape[-1])) + assert_equal(expected, actual) + + def test_factorize_values_outside_bins(): # pd.factorize returns intp vals = factorize_( @@ -1455,14 +1504,18 @@ def test_normalize_block_indexing_2d(flatblocks, expected): @requires_dask def test_subset_block_passthrough(): + from flox.core import identity + # full slice pass through array = dask.array.ones((5,), chunks=(1,)) + expected = dask.array.map_blocks(identity, array) subset = subset_to_blocks(array, np.arange(5)) - assert subset.name == array.name + assert subset.name == expected.name array = dask.array.ones((5, 5), chunks=1) + expected = dask.array.map_blocks(identity, array) subset = subset_to_blocks(array, np.arange(25)) - assert subset.name == array.name + assert subset.name == expected.name @requires_dask