From 497e7bc107f7bf12ff5d28a449ded927ebaabf16 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 25 Apr 2024 09:52:17 -0600 Subject: [PATCH 01/12] Fix benchmarks (#358) https://github.com/airspeed-velocity/asv/issues/1389#issuecomment-2076131185 --- asv_bench/asv.conf.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/asv_bench/asv.conf.json b/asv_bench/asv.conf.json index 207572fce..9178209dc 100644 --- a/asv_bench/asv.conf.json +++ b/asv_bench/asv.conf.json @@ -27,6 +27,11 @@ // "python setup.py build", // "PIP_NO_BUILD_ISOLATION=false python -mpip wheel --no-deps --no-index -w {build_cache_dir} {build_dir}" // ], + // + "build_command": [ + "python setup.py build", + "python -mpip wheel --no-deps --no-build-isolation --no-index -w {build_cache_dir} {build_dir}" + ], // List of branches to benchmark. If not provided, defaults to "master" // (for git) or "default" (for mercurial). From 13cb2297a619d867feb32b24f5b13a37fe35ff50 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 26 Apr 2024 16:47:57 -0600 Subject: [PATCH 02/12] Edits to climatology doc (#361) --- docs/source/user-stories/climatology.ipynb | 52 +++++++++++++++++----- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/docs/source/user-stories/climatology.ipynb b/docs/source/user-stories/climatology.ipynb index 30bf38b75..3cbf8b805 100644 --- a/docs/source/user-stories/climatology.ipynb +++ b/docs/source/user-stories/climatology.ipynb @@ -61,7 +61,9 @@ "source": [ "To account for Feb-29 being present in some years, we'll construct a time vector to group by as \"mmm-dd\" string.\n", "\n", - "For more options, see https://strftime.org/" + "```{seealso}\n", + "For more options, see [this great website](https://strftime.org/).\n", + "```" ] }, { @@ -80,7 +82,7 @@ "id": "6", "metadata": {}, "source": [ - "## map-reduce\n", + "## First, `method=\"map-reduce\"`\n", "\n", "The default\n", "[method=\"map-reduce\"](https://flox.readthedocs.io/en/latest/implementation.html#method-map-reduce)\n", @@ -110,7 +112,7 @@ "id": "8", "metadata": {}, "source": [ - "## Rechunking for map-reduce\n", + "### Rechunking for map-reduce\n", "\n", "We can split each chunk along the `lat`, `lon` dimensions to make sure the\n", "output chunk sizes are more reasonable\n" @@ -139,7 +141,7 @@ "But what if we didn't want to rechunk the dataset so drastically (note the 10x\n", "increase in tasks). For that let's try `method=\"cohorts\"`\n", "\n", - "## method=cohorts\n", + "## `method=\"cohorts\"`\n", "\n", "We can take advantage of patterns in the groups here \"day of year\".\n", "Specifically:\n", @@ -271,7 +273,7 @@ "id": "21", "metadata": {}, "source": [ - "And now our cohorts contain more than one group\n" + "And now our cohorts contain more than one group, *and* there is a substantial reduction in number of cohorts **162 -> 12**\n" ] }, { @@ -281,7 +283,7 @@ "metadata": {}, "outputs": [], "source": [ - "preferrd_method, new_cohorts = flox.core.find_group_cohorts(\n", + "preferred_method, new_cohorts = flox.core.find_group_cohorts(\n", " labels=codes,\n", " chunks=(rechunked.chunksizes[\"time\"],),\n", ")\n", @@ -295,13 +297,23 @@ "id": "23", "metadata": {}, "outputs": [], + "source": [ + "preferred_method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], "source": [ "new_cohorts.values()" ] }, { "cell_type": "markdown", - "id": "24", + "id": "25", "metadata": {}, "source": [ "Now the groupby reduction **looks OK** in terms of number of tasks but remember\n", @@ -311,7 +323,7 @@ { "cell_type": "code", "execution_count": null, - "id": "25", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -320,7 +332,25 @@ }, { "cell_type": "markdown", - "id": "26", + "id": "27", + "metadata": {}, + "source": [ + "flox's heuristics will choose `\"cohorts\"` automatically!" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28", + "metadata": {}, + "outputs": [], + "source": [ + "flox.xarray.xarray_reduce(rechunked, day, func=\"mean\")" + ] + }, + { + "cell_type": "markdown", + "id": "29", "metadata": {}, "source": [ "## How about other climatologies?\n", @@ -331,7 +361,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "30", "metadata": {}, "outputs": [], "source": [ @@ -340,7 +370,7 @@ }, { "cell_type": "markdown", - "id": "28", + "id": "31", "metadata": {}, "source": [ "This looks great. Why?\n", From 627bf2b6818d8dd04051b13093a0d9c4f4a1df70 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 26 Apr 2024 22:48:32 -0600 Subject: [PATCH 03/12] Optimize bitmask finding for chunk size 1 and single chunk cases (#360) * Optimize bitmask finding for chunk size 1. * Fix benchmark. * bugfix * Add single chunk benchmark * Optimize single chunk case. * Add test --- asv_bench/benchmarks/cohorts.py | 38 +++++++++++++++++++++------------ flox/core.py | 32 +++++++++++++++++++-------- tests/test_core.py | 19 +++++++++++++++-- 3 files changed, 64 insertions(+), 25 deletions(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 7d74f8849..8cb80119e 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -1,3 +1,5 @@ +from functools import cached_property + import dask import numpy as np import pandas as pd @@ -11,6 +13,10 @@ class Cohorts: def setup(self, *args, **kwargs): raise NotImplementedError + @cached_property + def dask(self): + return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0].dask + def containment(self): asfloat = self.bitmask().astype(float) chunks_per_label = asfloat.sum(axis=0) @@ -43,26 +49,17 @@ def time_find_group_cohorts(self): pass def time_graph_construct(self): - flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") + flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis) def track_num_tasks(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.to_dict()) + return len(self.dask.to_dict()) def track_num_tasks_optimized(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - (opt,) = dask.optimize(result) - return len(opt.dask.to_dict()) + (opt,) = dask.optimize(self.dask) + return len(opt.to_dict()) def track_num_layers(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.layers) + return len(self.dask.layers) track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy @@ -193,6 +190,19 @@ def setup(self, *args, **kwargs): self.expected = pd.RangeIndex(self.by.max() + 1) +class SingleChunk(Cohorts): + """Single chunk along reduction axis: always blockwise.""" + + def setup(self, *args, **kwargs): + index = pd.date_range("1959-01-01", freq="D", end="1962-12-31") + self.time = pd.Series(index) + TIME = len(self.time) + self.axis = (2,) + self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, -1)) + self.by = codes_for_resampling(index, freq="5D") + self.expected = pd.RangeIndex(self.by.max() + 1) + + class OISST(Cohorts): def setup(self, *args, **kwargs): self.array = dask.array.ones((1, 14532), chunks=(1, 10)) diff --git a/flox/core.py b/flox/core.py index 7d82b6f44..25a3495f5 100644 --- a/flox/core.py +++ b/flox/core.py @@ -248,12 +248,22 @@ def slices_from_chunks(chunks): def _compute_label_chunk_bitmask(labels, chunks, nlabels): + def make_bitmask(rows, cols): + data = np.broadcast_to(np.array(1, dtype=np.uint8), rows.shape) + return csc_array((data, (rows, cols)), dtype=bool, shape=(nchunks, nlabels)) + assert isinstance(labels, np.ndarray) shape = tuple(sum(c) for c in chunks) nchunks = math.prod(len(c) for c in chunks) - labels = np.broadcast_to(labels, shape[-labels.ndim :]) + # Shortcut for 1D with size-1 chunks + if shape == (nchunks,): + rows_array = np.arange(nchunks) + cols_array = labels + mask = labels >= 0 + return make_bitmask(rows_array[mask], cols_array[mask]) + labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] # Add one to handle the -1 sentinel value label_is_present = np.zeros((nlabels + 1,), dtype=bool) @@ -272,10 +282,8 @@ def _compute_label_chunk_bitmask(labels, chunks, nlabels): label_is_present[:] = False rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) cols_array = np.concatenate(cols) - data = np.broadcast_to(np.array(1, dtype=np.uint8), rows_array.shape) - bitmask = csc_array((data, (rows_array, cols_array)), dtype=bool, shape=(nchunks, nlabels)) - return bitmask + return make_bitmask(rows_array, cols_array) # @memoize @@ -312,6 +320,7 @@ def find_group_cohorts( labels = np.asarray(labels) shape = tuple(sum(c) for c in chunks) + nchunks = math.prod(len(c) for c in chunks) # assumes that `labels` are factorized if expected_groups is None: @@ -319,6 +328,10 @@ def find_group_cohorts( else: nlabels = expected_groups[-1] + 1 + # 1. Single chunk, blockwise always + if nchunks == 1: + return "blockwise", {(0,): list(range(nlabels))} + labels = np.broadcast_to(labels, shape[-labels.ndim :]) bitmask = _compute_label_chunk_bitmask(labels, chunks, nlabels) @@ -346,21 +359,21 @@ def invert(x) -> tuple[np.ndarray, ...]: chunks_cohorts = tlz.groupby(invert, label_chunks.keys()) - # 1. Every group is contained to one block, use blockwise here. + # 2. Every group is contained to one block, use blockwise here. if bitmask.shape[CHUNK_AXIS] == 1 or (chunks_per_label == 1).all(): logger.info("find_group_cohorts: blockwise is preferred.") return "blockwise", chunks_cohorts - # 2. Perfectly chunked so there is only a single cohort + # 3. Perfectly chunked so there is only a single cohort if len(chunks_cohorts) == 1: logger.info("Only found a single cohort. 'map-reduce' is preferred.") return "map-reduce", chunks_cohorts if merge else {} - # 3. Our dataset has chunksize one along the axis, + # 4. Our dataset has chunksize one along the axis, single_chunks = all(all(a == 1 for a in ac) for ac in chunks) - # 4. Every chunk only has a single group, but that group might extend across multiple chunks + # 5. Every chunk only has a single group, but that group might extend across multiple chunks one_group_per_chunk = (bitmask.sum(axis=LABEL_AXIS) == 1).all() - # 5. Existing cohorts don't overlap, great for time grouping with perfect chunking + # 6. Existing cohorts don't overlap, great for time grouping with perfect chunking no_overlapping_cohorts = (np.bincount(np.concatenate(tuple(chunks_cohorts.keys()))) == 1).all() if one_group_per_chunk or single_chunks or no_overlapping_cohorts: logger.info("find_group_cohorts: cohorts is preferred, chunking is perfect.") @@ -393,6 +406,7 @@ def invert(x) -> tuple[np.ndarray, ...]: sparsity, MAX_SPARSITY_FOR_COHORTS ) ) + # 7. Groups seem fairly randomly distributed, use "map-reduce". if sparsity > MAX_SPARSITY_FOR_COHORTS: if not merge: logger.info( diff --git a/tests/test_core.py b/tests/test_core.py index 890f86566..1ec2db78c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -946,12 +946,12 @@ def test_verify_complex_cohorts(chunksize: int) -> None: @pytest.mark.parametrize("chunksize", (12,) + tuple(range(1, 13)) + (-1,)) def test_method_guessing(chunksize): # just a regression test - labels = np.tile(np.arange(1, 13), 30) + labels = np.tile(np.arange(0, 12), 30) by = dask.array.from_array(labels, chunks=chunksize) - 1 preferred_method, chunks_cohorts = find_group_cohorts(labels, by.chunks[slice(-1, None)]) if chunksize == -1: assert preferred_method == "blockwise" - assert chunks_cohorts == {(0,): list(range(1, 13))} + assert chunks_cohorts == {(0,): list(range(12))} elif chunksize in (1, 2, 3, 4, 6): assert preferred_method == "cohorts" assert len(chunks_cohorts) == 12 // chunksize @@ -960,6 +960,21 @@ def test_method_guessing(chunksize): assert chunks_cohorts == {} +@requires_dask +@pytest.mark.parametrize("ndim", [1, 2, 3]) +def test_single_chunk_method_is_blockwise(ndim): + for by_ndim in range(1, ndim + 1): + chunks = (5,) * (ndim - by_ndim) + (-1,) * by_ndim + assert len(chunks) == ndim + array = dask.array.ones(shape=(10,) * ndim, chunks=chunks) + by = np.zeros(shape=(10,) * by_ndim, dtype=int) + method, chunks_cohorts = find_group_cohorts( + by, chunks=[array.chunks[ax] for ax in range(-by.ndim, 0)] + ) + assert method == "blockwise" + assert chunks_cohorts == {(0,): [0]} + + @requires_dask @pytest.mark.parametrize( "chunk_at,expected", From efd88e1b117db5f18ee1354d2edadafc9005b1b3 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 29 Apr 2024 18:08:23 +0100 Subject: [PATCH 04/12] Add cubed notebook for hourly climatology example using "map-reduce" method (#356) * Add cubed notebook for hourly climatology example using "map-reduce" method * Add cubed dependencies to docs build * Use Cubed version with HTML repr fix --- ci/docs.yml | 2 + docs/source/user-stories.md | 1 + .../climatology-hourly-cubed.ipynb | 106 ++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 docs/source/user-stories/climatology-hourly-cubed.ipynb diff --git a/ci/docs.yml b/ci/docs.yml index 860ea27c4..1ad62512e 100644 --- a/ci/docs.yml +++ b/ci/docs.yml @@ -2,6 +2,8 @@ name: flox-doc channels: - conda-forge dependencies: + - cubed>=0.14.3 + - cubed-xarray - dask-core - pip - xarray diff --git a/docs/source/user-stories.md b/docs/source/user-stories.md index 0241e01dc..eb0cf3334 100644 --- a/docs/source/user-stories.md +++ b/docs/source/user-stories.md @@ -7,6 +7,7 @@ user-stories/overlaps.md user-stories/climatology.ipynb user-stories/climatology-hourly.ipynb + user-stories/climatology-hourly-cubed.ipynb user-stories/custom-aggregations.ipynb user-stories/nD-bins.ipynb ``` diff --git a/docs/source/user-stories/climatology-hourly-cubed.ipynb b/docs/source/user-stories/climatology-hourly-cubed.ipynb new file mode 100644 index 000000000..2277ed7f2 --- /dev/null +++ b/docs/source/user-stories/climatology-hourly-cubed.ipynb @@ -0,0 +1,106 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# More climatology reductions using Cubed\n", + "\n", + "This is the Cubed equivalent of [More climatology reductions](climatology-hourly.ipynb).\n", + "\n", + "The task is to compute an hourly climatology from an hourly dataset with 744 hours in each chunk, using the \"map-reduce\" strategy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "import cubed\n", + "import cubed.array_api as xp\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xarray as xr\n", + "\n", + "import flox.xarray" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "## Create data\n", + "\n", + "Note that we use fewer lat/long points so the computation can be run locally." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "spec = cubed.Spec(allowed_mem=\"2GB\")\n", + "ds = xr.Dataset(\n", + " {\n", + " \"tp\": (\n", + " (\"time\", \"latitude\", \"longitude\"),\n", + " xp.ones((8760, 72, 144), chunks=(744, 5, 144), dtype=np.float32, spec=spec),\n", + " )\n", + " },\n", + " coords={\"time\": pd.date_range(\"2021-01-01\", \"2021-12-31 23:59\", freq=\"h\")},\n", + ")\n", + "ds" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "## Computation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "hourly = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func=\"mean\", reindex=True)\n", + "hourly" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "hourly.compute()" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 2439c5cb5d38906f77234ae26f207e7f21e650a3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 1 May 2024 06:03:51 -0600 Subject: [PATCH 05/12] Bump codecov/codecov-action from 4.1.1 to 4.3.1 (#362) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4.1.1 to 4.3.1. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v4.1.1...v4.3.1) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci-additional.yaml | 4 ++-- .github/workflows/ci.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index c1934de16..2a7b07d4d 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -77,7 +77,7 @@ jobs: --ignore flox/tests \ --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.3.1 with: file: ./coverage.xml flags: unittests @@ -131,7 +131,7 @@ jobs: python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report - name: Upload mypy coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.3.1 with: file: mypy_report/cobertura.xml flags: mypy diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index d02170823..12b5e9cc7 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -52,7 +52,7 @@ jobs: run: | pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.3.1 with: file: ./coverage.xml flags: unittests @@ -94,7 +94,7 @@ jobs: run: | python -m pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.1.1 + uses: codecov/codecov-action@v4.3.1 with: file: ./coverage.xml flags: unittests From eb3c0ef7b4d89028b7e2955941b7a3970164df5f Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:46:15 -0600 Subject: [PATCH 06/12] Manually fuse reindexing intermediates with blockwise reduction for cohorts. (#300) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Manually fuse reindexing intermediates with blockwise reduction for cohorts. ``` | Change | Before [627bf2b6]
| After [9d710529] | Ratio | Benchmark (Parameter) | |----------|----------------------------|---------------------------------------------|---------|-------------------------------------------------| | - | 3.39±0.02ms | 2.98±0.01ms | 0.88 | cohorts.PerfectMonthly.time_graph_construct | | - | 20 | 17 | 0.85 | cohorts.PerfectMonthly.track_num_layers | | - | 23.0±0.07ms | 19.0±0.1ms | 0.83 | cohorts.ERA5Google.time_graph_construct | | - | 4878 | 3978 | 0.82 | cohorts.ERA5Google.track_num_tasks | | - | 179±0.8ms | 147±0.5ms | 0.82 | cohorts.OISST.time_graph_construct | | - | 159 | 128 | 0.81 | cohorts.ERA5Google.track_num_layers | | - | 936 | 762 | 0.81 | cohorts.PerfectMonthly.track_num_tasks | | - | 1221 | 978 | 0.8 | cohorts.OISST.track_num_layers | | - | 4929 | 3834 | 0.78 | cohorts.ERA5DayOfYear.track_num_tasks | | - | 351 | 274 | 0.78 | cohorts.NWMMidwest.track_num_layers | | - | 4562 | 3468 | 0.76 | cohorts.ERA5DayOfYear.track_num_tasks_optimized | | - | 164±1ms | 118±0.4ms | 0.72 | cohorts.ERA5DayOfYear.time_graph_construct | | - | 1100 | 735 | 0.67 | cohorts.ERA5DayOfYear.track_num_layers | | - | 3930 | 2605 | 0.66 | cohorts.NWMMidwest.track_num_tasks | | - | 3715 | 2409 | 0.65 | cohorts.NWMMidwest.track_num_tasks_optimized | | - | 28952 | 18798 | 0.65 | cohorts.OISST.track_num_tasks | | - | 27010 | 16858 | 0.62 | cohorts.OISST.track_num_tasks_optimized | ``` * fix typing --- asv_bench/benchmarks/cohorts.py | 12 ++++++------ flox/core.py | 32 +++++++++++++++++++++----------- tests/test_core.py | 8 ++++++-- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 8cb80119e..da12dfdf0 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -14,8 +14,8 @@ def setup(self, *args, **kwargs): raise NotImplementedError @cached_property - def dask(self): - return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0].dask + def result(self): + return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0] def containment(self): asfloat = self.bitmask().astype(float) @@ -52,14 +52,14 @@ def time_graph_construct(self): flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis) def track_num_tasks(self): - return len(self.dask.to_dict()) + return len(self.result.dask.to_dict()) def track_num_tasks_optimized(self): - (opt,) = dask.optimize(self.dask) - return len(opt.to_dict()) + (opt,) = dask.optimize(self.result) + return len(opt.dask.to_dict()) def track_num_layers(self): - return len(self.dask.layers) + return len(self.result.dask.layers) track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy diff --git a/flox/core.py b/flox/core.py index 25a3495f5..308e05d95 100644 --- a/flox/core.py +++ b/flox/core.py @@ -17,6 +17,7 @@ Callable, Literal, TypedDict, + TypeVar, Union, overload, ) @@ -96,6 +97,7 @@ T_MethodOpt = None | Literal["map-reduce", "blockwise", "cohorts"] T_IsBins = Union[bool | Sequence[bool]] +T = TypeVar("T") IntermediateDict = dict[Union[str, Callable], Any] FinalResultsDict = dict[str, Union["DaskArray", "CubedArray", np.ndarray]] @@ -140,6 +142,10 @@ def _postprocess_numbagg(result, *, func, fill_value, size, seen_groups): return result +def identity(x: T) -> T: + return x + + def _issorted(arr: np.ndarray) -> bool: return bool((arr[:-1] <= arr[1:]).all()) @@ -1438,7 +1444,10 @@ def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple: def subset_to_blocks( - array: DaskArray, flatblocks: Sequence[int], blkshape: tuple[int] | None = None + array: DaskArray, + flatblocks: Sequence[int], + blkshape: tuple[int] | None = None, + reindexer=identity, ) -> DaskArray: """ Advanced indexing of .blocks such that we always get a regular array back. @@ -1464,20 +1473,21 @@ def subset_to_blocks( index = _normalize_indexes(array, flatblocks, blkshape) if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index): - return array + return dask.array.map_blocks(reindexer, array, meta=array._meta) # These rest is copied from dask.array.core.py with slight modifications index = normalize_index(index, array.numblocks) index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index) - name = "blocks-" + tokenize(array, index) + name = "groupby-cohort-" + tokenize(array, index) new_keys = array._key_array[index] squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index) chunks = tuple(tuple(np.array(c)[i].tolist()) for c, i in zip(array.chunks, squeezed)) keys = itertools.product(*(range(len(c)) for c in chunks)) - layer: Graph = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} + layer: Graph = {(name,) + key: (reindexer, tuple(new_keys[key].tolist())) for key in keys} + graph = HighLevelGraph.from_collections(name, layer, dependencies=[array]) return dask.array.Array(graph, name, chunks, meta=array) @@ -1651,26 +1661,26 @@ def dask_groupby_agg( elif method == "cohorts": assert chunks_cohorts + block_shape = array.blocks.shape[-len(axis) :] + reduced_ = [] groups_ = [] for blks, cohort in chunks_cohorts.items(): - index = pd.Index(cohort) - subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :]) - reindexed = dask.array.map_blocks( - reindex_intermediates, subset, agg, index, meta=subset._meta - ) + cohort_index = pd.Index(cohort) + reindexer = partial(reindex_intermediates, agg=agg, unique_groups=cohort_index) + reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer) # now that we have reindexed, we can set reindex=True explicitlly reduced_.append( tree_reduce( reindexed, combine=partial(combine, agg=agg, reindex=True), - aggregate=partial(aggregate, expected_groups=index, reindex=True), + aggregate=partial(aggregate, expected_groups=cohort_index, reindex=True), ) ) # This is done because pandas promotes to 64-bit types when an Index is created # So we use the index to generate the return value for consistency with "map-reduce" # This is important on windows - groups_.append(index.values) + groups_.append(cohort_index.values) reduced = dask.array.concatenate(reduced_, axis=-1) groups = (np.concatenate(groups_),) diff --git a/tests/test_core.py b/tests/test_core.py index 1ec2db78c..75c4d9ff7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1465,14 +1465,18 @@ def test_normalize_block_indexing_2d(flatblocks, expected): @requires_dask def test_subset_block_passthrough(): + from flox.core import identity + # full slice pass through array = dask.array.ones((5,), chunks=(1,)) + expected = dask.array.map_blocks(identity, array) subset = subset_to_blocks(array, np.arange(5)) - assert subset.name == array.name + assert subset.name == expected.name array = dask.array.ones((5, 5), chunks=1) + expected = dask.array.map_blocks(identity, array) subset = subset_to_blocks(array, np.arange(25)) - assert subset.name == array.name + assert subset.name == expected.name @requires_dask From c398f4eb324395139d0ac862b1694c9155d965dd Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 2 May 2024 00:17:08 -0600 Subject: [PATCH 07/12] Use threadpool for finding labels in chunk (#327) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use threadpool for finding labels in chunk Great when we have lots of decent size chunks, particularly the NWM county groupby: 600ms -> 400ms. ``` | Before [0cccb903] | After [38fe8a6c] | Ratio | Benchmark (Parameter) | |--------------------------------------|---------------------------------|---------|---------------------------------------------| | 3.50±0.2ms | 2.93±0.07ms | 0.84 | cohorts.PerfectMonthly.time_graph_construct | | 20.0±1ms | 9.66±1ms | 0.48 | cohorts.NWMMidwest.time_find_group_cohorts | ``` * Add threshold * Fix + comment * Fix benchmark. * Tweak threshold * Small cleanup * Comment * Try single allocation * Revert "Try single allocation" This reverts commit c6b93367e2024e60d77af24a69d177670a040dfc. * cleanup --- flox/core.py | 50 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/flox/core.py b/flox/core.py index 308e05d95..0107b356b 100644 --- a/flox/core.py +++ b/flox/core.py @@ -8,6 +8,7 @@ import warnings from collections import namedtuple from collections.abc import Sequence +from concurrent.futures import ThreadPoolExecutor from functools import partial, reduce from itertools import product from numbers import Integral @@ -261,6 +262,7 @@ def make_bitmask(rows, cols): assert isinstance(labels, np.ndarray) shape = tuple(sum(c) for c in chunks) nchunks = math.prod(len(c) for c in chunks) + approx_chunk_size = math.prod(c[0] for c in chunks) # Shortcut for 1D with size-1 chunks if shape == (nchunks,): @@ -271,21 +273,55 @@ def make_bitmask(rows, cols): labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] - # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) - for region in slices_from_chunks(chunks): + + def chunk_unique(labels, slicer, nlabels, label_is_present=None): + if label_is_present is None: + label_is_present = np.empty((nlabels + 1,), dtype=bool) + label_is_present[:] = False + subset = labels[slicer] # This is a quite fast way to find unique integers, when we know how many there are # inspired by a similar idea in numpy_groupies for first, last # instead of explicitly finding uniques, repeatedly write True to the same location - subset = labels[region] - # The reshape is not strictly necessary but is about 100ms faster on a test problem. label_is_present[subset.reshape(-1)] = True # skip the -1 sentinel by slicing # Faster than np.argwhere by a lot uniques = ilabels[label_is_present[:-1]] - cols.append(uniques) - label_is_present[:] = False + return uniques + + # TODO: refine this heuristic. + # The general idea is that with the threadpool, we repeatedly allocate memory + # for `label_is_present`. We trade that off against the parallelism across number of chunks. + # For large enough number of chunks (relative to number of labels), it makes sense to + # suffer the extra allocation in exchange for parallelism. + THRESHOLD = 2 + if nlabels < THRESHOLD * approx_chunk_size: + logger.debug( + "Using threadpool since num_labels %s < %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(chunk_unique, labels, slicer, nlabels) + for slicer in slices_from_chunks(chunks) + ] + cols = tuple(f.result() for f in futures) + + else: + logger.debug( + "Using serial loop since num_labels %s > %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) + cols = [] + # Add one to handle the -1 sentinel value + label_is_present = np.empty((nlabels + 1,), dtype=bool) + for region in slices_from_chunks(chunks): + uniques = chunk_unique(labels, region, nlabels, label_is_present) + cols.append(uniques) rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) cols_array = np.concatenate(cols) From 0083ab284ff4d231f5dd07be4a1503041e4bdf03 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 2 May 2024 08:43:04 -0600 Subject: [PATCH 08/12] Optimize `min_count` when `expected_groups` is not provided. (#236) * Optimize min_count for all numpy For pure numpy arrays, min_count=1 (xarray default) is the same as min_count=None, with the right fill_value. This avoids one useless pass over the data, and one useless copy. We need to always accumulate count with dask, to make sure we get the right values at the end. * Better? --- flox/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 0107b356b..d84639bcd 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2324,6 +2324,7 @@ def groupby_reduce( nby = len(bys) by_is_dask = tuple(is_duck_dask_array(b) for b in bys) any_by_dask = any(by_is_dask) + provided_expected = expected_groups is not None if ( engine == "numbagg" @@ -2440,7 +2441,7 @@ def groupby_reduce( # The only way to do this consistently is mask out using min_count # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 if min_count is None: - if nax < by_.ndim or fill_value is not None: + if nax < by_.ndim or (fill_value is not None and provided_expected): min_count_: int = 1 else: min_count_ = 0 From fb180e8462dd240e27f63d31ecf19ed0558c368c Mon Sep 17 00:00:00 2001 From: Justus Magin Date: Thu, 2 May 2024 16:43:35 +0200 Subject: [PATCH 09/12] import `normalize_axis_index` from `numpy.lib` on `numpy>=2` (#364) * import `normalize_axis_index` from `numpy.lib` on `numpy>=2` * import the right thing --- flox/xrutils.py | 55 ++++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/flox/xrutils.py b/flox/xrutils.py index fe9a5c858..c959b2ae9 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -8,7 +8,6 @@ import numpy as np import pandas as pd -from numpy.core.multiarray import normalize_axis_index # type: ignore[attr-defined] from packaging.version import Version try: @@ -25,6 +24,37 @@ dask_array_type = () # type: ignore[assignment, misc] +def module_available(module: str, minversion: Optional[str] = None) -> bool: + """Checks whether a module is installed without importing it. + + Use this for a lightweight check and lazy imports. + + Parameters + ---------- + module : str + Name of the module. + + Returns + ------- + available : bool + Whether the module is installed. + """ + has = importlib.util.find_spec(module) is not None + if has: + mod = importlib.import_module(module) + return Version(mod.__version__) >= Version(minversion) if minversion is not None else True + else: + return False + + +if module_available("numpy", minversion="2.0.0"): + from numpy.lib.array_utils import ( # type: ignore[import-not-found] + normalize_axis_index, + ) +else: + from numpy.core.numeric import normalize_axis_index # type: ignore[attr-defined] + + def asarray(data, xp=np): return data if is_duck_array(data) else xp.asarray(data) @@ -349,26 +379,3 @@ def nanlast(values, axis, keepdims=False): return np.expand_dims(result, axis=axis) else: return result - - -def module_available(module: str, minversion: Optional[str] = None) -> bool: - """Checks whether a module is installed without importing it. - - Use this for a lightweight check and lazy imports. - - Parameters - ---------- - module : str - Name of the module. - - Returns - ------- - available : bool - Whether the module is installed. - """ - has = importlib.util.find_spec(module) is not None - if has: - mod = importlib.import_module(module) - return Version(mod.__version__) >= Version(minversion) if minversion is not None else True - else: - return False From d68d83795da88eedb4dfa01c2691ffd67597df22 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 21 May 2024 11:48:13 -0600 Subject: [PATCH 10/12] Remove errant print statement --- asv_bench/benchmarks/cohorts.py | 1 - 1 file changed, 1 deletion(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index da12dfdf0..f827f6e0e 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -21,7 +21,6 @@ def containment(self): asfloat = self.bitmask().astype(float) chunks_per_label = asfloat.sum(axis=0) containment = (asfloat.T @ asfloat) / chunks_per_label - print(containment.nnz / np.prod(containment.shape)) return containment.todense() def chunks_cohorts(self): From 7166db391c4fd4f4c7909cd6aff2143a15134c53 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 29 May 2024 16:15:24 +0100 Subject: [PATCH 11/12] Cubed blockwise (#357) * Initial minimal working Cubed example for "blockwise" * Update minimum cubed version that includes https://github.com/cubed-dev/cubed/pull/448 * Fix mypy errors * Update documentation with a 'blockwise' example for Cubed --- ci/environment.yml | 2 +- .../climatology-hourly-cubed.ipynb | 31 +++ flox/core.py | 182 +++++++++++------- tests/test_core.py | 34 ++++ 4 files changed, 175 insertions(+), 74 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index 9685a90b2..f974053a7 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -6,7 +6,7 @@ dependencies: - cachey - cftime - codecov - - cubed>=0.14.2 + - cubed>=0.14.3 - dask-core - pandas - numpy>=1.22 diff --git a/docs/source/user-stories/climatology-hourly-cubed.ipynb b/docs/source/user-stories/climatology-hourly-cubed.ipynb index 2277ed7f2..be80b4f98 100644 --- a/docs/source/user-stories/climatology-hourly-cubed.ipynb +++ b/docs/source/user-stories/climatology-hourly-cubed.ipynb @@ -86,6 +86,37 @@ "source": [ "hourly.compute()" ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "## Other climatologies: resampling by month\n", + "\n", + "This uses the \"blockwise\" strategy." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n", + "monthly" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "monthly.compute()" + ] } ], "metadata": { diff --git a/flox/core.py b/flox/core.py index d84639bcd..72f098997 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1798,87 +1798,123 @@ def cubed_groupby_agg( assert isinstance(axis, Sequence) assert all(ax >= 0 for ax in axis) - inds = tuple(range(array.ndim)) + if method == "blockwise": + assert by.ndim == 1 + assert expected_groups is not None - by_input = by + def _reduction_func(a, by, axis, start_group, num_groups): + # adjust group labels to start from 0 for each chunk + by_for_chunk = by - start_group + expected_groups_for_chunk = pd.RangeIndex(num_groups) - # Unifying chunks is necessary for argreductions. - # We need to rechunk before zipping up with the index - # let's always do it anyway - if not is_chunked_array(by): - # chunk numpy arrays like the input array - chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0)) + axis = (axis,) # convert integral axis to tuple - by = cubed.from_array(by, chunks=chunks, spec=array.spec) - _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + blockwise_method = partial( + _reduce_blockwise, + agg=agg, + axis=axis, + expected_groups=expected_groups_for_chunk, + fill_value=fill_value, + engine=engine, + sort=sort, + reindex=reindex, + ) + out = blockwise_method(a, by_for_chunk) + return out[agg.name] - # Cubed's groupby_reduction handles the generation of "intermediates", and the - # "map-reduce" combination step, so we don't have to do that here. - # Only the equivalent of "_simple_combine" is supported, there is no - # support for "_grouped_combine". - labels_are_unknown = is_chunked_array(by_input) and expected_groups is None - do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + num_groups = len(expected_groups) + result = cubed.core.groupby.groupby_blockwise( + array, by, axis=axis, func=_reduction_func, num_groups=num_groups + ) + groups = (expected_groups.to_numpy(),) + return (result, groups) - assert do_simple_combine - assert method == "map-reduce" - assert expected_groups is not None - assert reindex is True - assert len(axis) == 1 # one axis/grouping + else: + inds = tuple(range(array.ndim)) - def _groupby_func(a, by, axis, intermediate_dtype, num_groups): - blockwise_method = partial( - _get_chunk_reduction(agg.reduction_type), - func=agg.chunk, - fill_value=agg.fill_value["intermediate"], - dtype=agg.dtype["intermediate"], - reindex=reindex, - user_dtype=agg.dtype["user"], + by_input = by + + # Unifying chunks is necessary for argreductions. + # We need to rechunk before zipping up with the index + # let's always do it anyway + if not is_chunked_array(by): + # chunk numpy arrays like the input array + chunks = tuple( + array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0) + ) + + by = cubed.from_array(by, chunks=chunks, spec=array.spec) + _, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :]) + + # Cubed's groupby_reduction handles the generation of "intermediates", and the + # "map-reduce" combination step, so we don't have to do that here. + # Only the equivalent of "_simple_combine" is supported, there is no + # support for "_grouped_combine". + labels_are_unknown = is_chunked_array(by_input) and expected_groups is None + do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown + + assert do_simple_combine + assert method == "map-reduce" + assert expected_groups is not None + assert reindex is True + assert len(axis) == 1 # one axis/grouping + + def _groupby_func(a, by, axis, intermediate_dtype, num_groups): + blockwise_method = partial( + _get_chunk_reduction(agg.reduction_type), + func=agg.chunk, + fill_value=agg.fill_value["intermediate"], + dtype=agg.dtype["intermediate"], + reindex=reindex, + user_dtype=agg.dtype["user"], + axis=axis, + expected_groups=expected_groups, + engine=engine, + sort=sort, + ) + out = blockwise_method(a, by) + # Convert dict to one that cubed understands, dropping groups since they are + # known, and the same for every block. + return { + f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"]) + } + + def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): + # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed + # only combine over the dummy axis, to preserve grouping along 'axis' + dtype = dict(dtype) + out = {} + for idx, combine in enumerate(agg.simple_combine): + field = f"f{idx}" + out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) + return out + + def _groupby_aggregate(a): + # Convert cubed dict to one that _finalize_results works with + results = {"groups": expected_groups, "intermediates": a.values()} + out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) + return out[agg.name] + + # convert list of dtypes to a structured dtype for cubed + intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] + dtype = agg.dtype["final"] + num_groups = len(expected_groups) + + result = cubed.core.groupby.groupby_reduction( + array, + by, + func=_groupby_func, + combine_func=_groupby_combine, + aggregate_func=_groupby_aggregate, axis=axis, - expected_groups=expected_groups, - engine=engine, - sort=sort, + intermediate_dtype=intermediate_dtype, + dtype=dtype, + num_groups=num_groups, ) - out = blockwise_method(a, by) - # Convert dict to one that cubed understands, dropping groups since they are - # known, and the same for every block. - return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])} - - def _groupby_combine(a, axis, dummy_axis, dtype, keepdims): - # this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed - # only combine over the dummy axis, to preserve grouping along 'axis' - dtype = dict(dtype) - out = {} - for idx, combine in enumerate(agg.simple_combine): - field = f"f{idx}" - out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims) - return out - - def _groupby_aggregate(a): - # Convert cubed dict to one that _finalize_results works with - results = {"groups": expected_groups, "intermediates": a.values()} - out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex) - return out[agg.name] - - # convert list of dtypes to a structured dtype for cubed - intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])] - dtype = agg.dtype["final"] - num_groups = len(expected_groups) - - result = cubed.core.groupby.groupby_reduction( - array, - by, - func=_groupby_func, - combine_func=_groupby_combine, - aggregate_func=_groupby_aggregate, - axis=axis, - intermediate_dtype=intermediate_dtype, - dtype=dtype, - num_groups=num_groups, - ) - groups = (expected_groups.to_numpy(),) + groups = (expected_groups.to_numpy(),) - return (result, groups) + return (result, groups) def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray: @@ -2467,9 +2503,9 @@ def groupby_reduce( if method is None: method = "map-reduce" - if method != "map-reduce": + if method not in ("map-reduce", "blockwise"): raise NotImplementedError( - "Reduction for Cubed arrays is only implemented for method 'map-reduce'." + "Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'." ) partial_agg = partial(cubed_groupby_agg, **kwargs) diff --git a/tests/test_core.py b/tests/test_core.py index 75c4d9ff7..7cedc06d4 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1205,6 +1205,40 @@ def test_group_by_datetime(engine, method): assert_equal(expected, actual) +@requires_cubed +@pytest.mark.parametrize("method", ["blockwise", "map-reduce"]) +def test_group_by_datetime_cubed(engine, method): + kwargs = dict( + func="mean", + method=method, + engine=engine, + ) + t = pd.date_range("2000-01-01", "2000-12-31", freq="D").to_series() + data = t.dt.dayofyear + cubedarray = cubed.from_array(data.values, chunks=30) + + actual, _ = groupby_reduce(cubedarray, t, **kwargs) + expected = data.to_numpy().astype(float) + assert_equal(expected, actual) + + edges = pd.date_range("1999-12-31", "2000-12-31", freq="ME").to_series().to_numpy() + actual, _ = groupby_reduce( + cubedarray, t.to_numpy(), isbin=True, expected_groups=edges, **kwargs + ) + expected = data.resample("ME").mean().to_numpy() + assert_equal(expected, actual) + + actual, _ = groupby_reduce( + cubed.array_api.broadcast_to(cubedarray, (2, 3, cubedarray.shape[-1])), + t.to_numpy(), + isbin=True, + expected_groups=edges, + **kwargs, + ) + expected = np.broadcast_to(expected, (2, 3, expected.shape[-1])) + assert_equal(expected, actual) + + def test_factorize_values_outside_bins(): # pd.factorize returns intp vals = factorize_( From 07ad8263386fbaa25f18709036cd20ec91233386 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 1 Jun 2024 10:01:27 -0600 Subject: [PATCH 12/12] Bump codecov/codecov-action from 4.3.1 to 4.4.1 (#366) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4.3.1 to 4.4.1. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v4.3.1...v4.4.1) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci-additional.yaml | 4 ++-- .github/workflows/ci.yaml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index 2a7b07d4d..599312a4e 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -77,7 +77,7 @@ jobs: --ignore flox/tests \ --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.3.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests @@ -131,7 +131,7 @@ jobs: python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report - name: Upload mypy coverage to Codecov - uses: codecov/codecov-action@v4.3.1 + uses: codecov/codecov-action@v4.4.1 with: file: mypy_report/cobertura.xml flags: mypy diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 12b5e9cc7..9c54fc49c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -52,7 +52,7 @@ jobs: run: | pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.3.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests @@ -94,7 +94,7 @@ jobs: run: | python -m pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v4.3.1 + uses: codecov/codecov-action@v4.4.1 with: file: ./coverage.xml flags: unittests