Skip to content

Commit

Permalink
Merge branch 'main' into hypothesis
Browse files Browse the repository at this point in the history
* main:
  Bump codecov/codecov-action from 4.3.1 to 4.4.1 (#366)
  Cubed blockwise (#357)
  Remove errant print statement
  import `normalize_axis_index` from `numpy.lib` on `numpy>=2` (#364)
  Optimize `min_count` when `expected_groups` is not provided. (#236)
  Use threadpool for finding labels in chunk (#327)
  Manually fuse reindexing intermediates with blockwise reduction for cohorts. (#300)
  Bump codecov/codecov-action from 4.1.1 to 4.3.1 (#362)
  Add cubed notebook for hourly climatology example using "map-reduce" method (#356)
  Optimize bitmask finding for chunk size 1 and single chunk cases (#360)
  Edits to climatology doc (#361)
  Fix benchmarks (#358)
  • Loading branch information
dcherian committed Jun 30, 2024
2 parents 240587f + 07ad826 commit 4259634
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 135 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-additional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
--ignore flox/tests \
--cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v4.1.1
uses: codecov/codecov-action@v4.4.1
with:
file: ./coverage.xml
flags: unittests
Expand Down Expand Up @@ -131,7 +131,7 @@ jobs:
python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
uses: codecov/codecov-action@v4.1.1
uses: codecov/codecov-action@v4.4.1
with:
file: mypy_report/cobertura.xml
flags: mypy
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
run: |
pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v4.1.1
uses: codecov/codecov-action@v4.4.1
with:
file: ./coverage.xml
flags: unittests
Expand Down Expand Up @@ -115,7 +115,7 @@ jobs:
run: |
python -m pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v4.1.1
uses: codecov/codecov-action@v4.4.1
with:
file: ./coverage.xml
flags: unittests
Expand Down
5 changes: 5 additions & 0 deletions asv_bench/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
// "python setup.py build",
// "PIP_NO_BUILD_ISOLATION=false python -mpip wheel --no-deps --no-index -w {build_cache_dir} {build_dir}"
// ],
//
"build_command": [
"python setup.py build",
"python -mpip wheel --no-deps --no-build-isolation --no-index -w {build_cache_dir} {build_dir}"
],

// List of branches to benchmark. If not provided, defaults to "master"
// (for git) or "default" (for mercurial).
Expand Down
37 changes: 23 additions & 14 deletions asv_bench/benchmarks/cohorts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import cached_property

import dask
import numpy as np
import pandas as pd
Expand All @@ -11,11 +13,14 @@ class Cohorts:
def setup(self, *args, **kwargs):
raise NotImplementedError

@cached_property
def result(self):
return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0]

def containment(self):
asfloat = self.bitmask().astype(float)
chunks_per_label = asfloat.sum(axis=0)
containment = (asfloat.T @ asfloat) / chunks_per_label
print(containment.nnz / np.prod(containment.shape))
return containment.todense()

def chunks_cohorts(self):
Expand Down Expand Up @@ -43,26 +48,17 @@ def time_find_group_cohorts(self):
pass

def time_graph_construct(self):
flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts")
flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)

def track_num_tasks(self):
result = flox.groupby_reduce(
self.array, self.by, func="sum", axis=self.axis, method="cohorts"
)[0]
return len(result.dask.to_dict())
return len(self.result.dask.to_dict())

def track_num_tasks_optimized(self):
result = flox.groupby_reduce(
self.array, self.by, func="sum", axis=self.axis, method="cohorts"
)[0]
(opt,) = dask.optimize(result)
(opt,) = dask.optimize(self.result)
return len(opt.dask.to_dict())

def track_num_layers(self):
result = flox.groupby_reduce(
self.array, self.by, func="sum", axis=self.axis, method="cohorts"
)[0]
return len(result.dask.layers)
return len(self.result.dask.layers)

track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy
track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy
Expand Down Expand Up @@ -193,6 +189,19 @@ def setup(self, *args, **kwargs):
self.expected = pd.RangeIndex(self.by.max() + 1)


class SingleChunk(Cohorts):
"""Single chunk along reduction axis: always blockwise."""

def setup(self, *args, **kwargs):
index = pd.date_range("1959-01-01", freq="D", end="1962-12-31")
self.time = pd.Series(index)
TIME = len(self.time)
self.axis = (2,)
self.array = dask.array.ones((721, 1440, TIME), chunks=(-1, -1, -1))
self.by = codes_for_resampling(index, freq="5D")
self.expected = pd.RangeIndex(self.by.max() + 1)


class OISST(Cohorts):
def setup(self, *args, **kwargs):
self.array = dask.array.ones((1, 14532), chunks=(1, 10))
Expand Down
2 changes: 2 additions & 0 deletions ci/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: flox-doc
channels:
- conda-forge
dependencies:
- cubed>=0.14.3
- cubed-xarray
- dask-core
- pip
- xarray
Expand Down
2 changes: 1 addition & 1 deletion ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- cachey
- cftime
- codecov
- cubed>=0.14.2
- cubed>=0.14.3
- dask-core
- pandas
- numpy>=1.22
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-stories.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
user-stories/overlaps.md
user-stories/climatology.ipynb
user-stories/climatology-hourly.ipynb
user-stories/climatology-hourly-cubed.ipynb
user-stories/custom-aggregations.ipynb
user-stories/nD-bins.ipynb
```
137 changes: 137 additions & 0 deletions docs/source/user-stories/climatology-hourly-cubed.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "0",
"metadata": {},
"source": [
"# More climatology reductions using Cubed\n",
"\n",
"This is the Cubed equivalent of [More climatology reductions](climatology-hourly.ipynb).\n",
"\n",
"The task is to compute an hourly climatology from an hourly dataset with 744 hours in each chunk, using the \"map-reduce\" strategy."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"import cubed\n",
"import cubed.array_api as xp\n",
"import numpy as np\n",
"import pandas as pd\n",
"import xarray as xr\n",
"\n",
"import flox.xarray"
]
},
{
"cell_type": "markdown",
"id": "2",
"metadata": {},
"source": [
"## Create data\n",
"\n",
"Note that we use fewer lat/long points so the computation can be run locally."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"spec = cubed.Spec(allowed_mem=\"2GB\")\n",
"ds = xr.Dataset(\n",
" {\n",
" \"tp\": (\n",
" (\"time\", \"latitude\", \"longitude\"),\n",
" xp.ones((8760, 72, 144), chunks=(744, 5, 144), dtype=np.float32, spec=spec),\n",
" )\n",
" },\n",
" coords={\"time\": pd.date_range(\"2021-01-01\", \"2021-12-31 23:59\", freq=\"h\")},\n",
")\n",
"ds"
]
},
{
"cell_type": "markdown",
"id": "4",
"metadata": {},
"source": [
"## Computation"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5",
"metadata": {},
"outputs": [],
"source": [
"hourly = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func=\"mean\", reindex=True)\n",
"hourly"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6",
"metadata": {},
"outputs": [],
"source": [
"hourly.compute()"
]
},
{
"cell_type": "markdown",
"id": "7",
"metadata": {},
"source": [
"## Other climatologies: resampling by month\n",
"\n",
"This uses the \"blockwise\" strategy."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8",
"metadata": {},
"outputs": [],
"source": [
"monthly = ds.tp.resample(time=\"ME\").sum(method=\"blockwise\")\n",
"monthly"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9",
"metadata": {},
"outputs": [],
"source": [
"monthly.compute()"
]
}
],
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading

0 comments on commit 4259634

Please sign in to comment.