From 4c8b03bdc2ffa03bc5721768e5547572ef1e65a5 Mon Sep 17 00:00:00 2001 From: joseph nowak Date: Thu, 14 Nov 2024 18:51:01 -0400 Subject: [PATCH 1/4] Optimize `ffill`, `bfill` with dask when `limit` is specified (#9771) * Reduce the number of tasks when the limit parameter is set on the push function * Reduce the number of tasks when the limit parameter is set on the push function, and incorporate the method parameter for the cumreduction on the push method * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update xarray/core/dask_array_ops.py Co-authored-by: Deepak Cherian * Use last instead of creating a custom function, and add a keepdims parameter for the last and first to make it compatible with the blelloch method * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove the keepdims on the last and first method and use the nanlast method directly, they already have the parameter * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Include the optimization of ffill and bfill on the whats-new.rst * Use map_overlap when the n is smaller than all the chunks * Avoid creating a numpy array to check if all the chunks are bigger than N on the push method * Updating the whats-new.rst * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Patrick Hoefler Co-authored-by: Deepak Cherian --- doc/whats-new.rst | 4 ++ xarray/core/dask_array_ops.py | 74 ++++++++++++++++++++--------- xarray/core/duck_array_ops.py | 6 ++- xarray/tests/test_duck_array_ops.py | 12 +++-- 4 files changed, 69 insertions(+), 27 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 7195edef42b..b74b0fb84de 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -29,6 +29,10 @@ New Features - Support lazy grouping by dask arrays, and allow specifying ordered groups with ``UniqueGrouper(labels=["a", "b", "c"])`` (:issue:`2852`, :issue:`757`). By `Deepak Cherian `_. +- Optimize ffill, bfill with dask when limit is specified + (:pull:`9771`). + By `Joseph Nowak `_, and + `Patrick Hoefler `. - Allow wrapping ``np.ndarray`` subclasses, e.g. ``astropy.units.Quantity`` (:issue:`9704`, :pull:`9760`). By `Sam Levang `_ and `Tien Vo `_. - Optimize :py:meth:`DataArray.polyfit` and :py:meth:`Dataset.polyfit` with dask, when used with diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 7a20728ae2e..8bf9c68b727 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -75,7 +75,7 @@ def least_squares(lhs, rhs, rcond=None, skipna=False): return coeffs, residuals -def push(array, n, axis): +def push(array, n, axis, method="blelloch"): """ Dask-aware bottleneck.push """ @@ -83,33 +83,63 @@ def push(array, n, axis): import numpy as np from xarray.core.duck_array_ops import _push + from xarray.core.nputils import nanlast + + if n is not None and all(n <= size for size in array.chunks[axis]): + return array.map_overlap(_push, depth={axis: (n, 0)}, n=n, axis=axis) + + # TODO: Replace all this function + # once https://github.com/pydata/xarray/issues/9229 being implemented def _fill_with_last_one(a, b): - # cumreduction apply the push func over all the blocks first so, the only missing part is filling - # the missing values using the last data of the previous chunk - return np.where(~np.isnan(b), b, a) + # cumreduction apply the push func over all the blocks first so, + # the only missing part is filling the missing values using the + # last data of the previous chunk + return np.where(np.isnan(b), a, b) - if n is not None and 0 < n < array.shape[axis] - 1: - arange = da.broadcast_to( - da.arange( - array.shape[axis], chunks=array.chunks[axis], dtype=array.dtype - ).reshape( - tuple(size if i == axis else 1 for i, size in enumerate(array.shape)) - ), - array.shape, - array.chunks, - ) - valid_arange = da.where(da.notnull(array), arange, np.nan) - valid_limits = (arange - push(valid_arange, None, axis)) <= n - # omit the forward fill that violate the limit - return da.where(valid_limits, push(array, None, axis), np.nan) - - # The method parameter makes that the tests for python 3.7 fails. - return da.reductions.cumreduction( - func=_push, + def _dtype_push(a, axis, dtype=None): + # Not sure why the blelloch algorithm force to receive a dtype + return _push(a, axis=axis) + + pushed_array = da.reductions.cumreduction( + func=_dtype_push, binop=_fill_with_last_one, ident=np.nan, x=array, axis=axis, dtype=array.dtype, + method=method, + preop=nanlast, ) + + if n is not None and 0 < n < array.shape[axis] - 1: + + def _reset_cumsum(a, axis, dtype=None): + cumsum = np.cumsum(a, axis=axis) + reset_points = np.maximum.accumulate(np.where(a == 0, cumsum, 0), axis=axis) + return cumsum - reset_points + + def _last_reset_cumsum(a, axis, keepdims=None): + # Take the last cumulative sum taking into account the reset + # This is useful for blelloch method + return np.take(_reset_cumsum(a, axis=axis), axis=axis, indices=[-1]) + + def _combine_reset_cumsum(a, b): + # It is going to sum the previous result until the first + # non nan value + bitmask = np.cumprod(b != 0, axis=axis) + return np.where(bitmask, b + a, b) + + valid_positions = da.reductions.cumreduction( + func=_reset_cumsum, + binop=_combine_reset_cumsum, + ident=0, + x=da.isnan(array, dtype=int), + axis=axis, + dtype=int, + method=method, + preop=_last_reset_cumsum, + ) + pushed_array = da.where(valid_positions <= n, pushed_array, np.nan) + + return pushed_array diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index 4d01e5bc345..77e62e4c71e 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -716,6 +716,7 @@ def first(values, axis, skipna=None): return chunked_nanfirst(values, axis) else: return nputils.nanfirst(values, axis) + return take(values, 0, axis=axis) @@ -729,6 +730,7 @@ def last(values, axis, skipna=None): return chunked_nanlast(values, axis) else: return nputils.nanlast(values, axis) + return take(values, -1, axis=axis) @@ -769,14 +771,14 @@ def _push(array, n: int | None = None, axis: int = -1): return bn.push(array, limit, axis) -def push(array, n, axis): +def push(array, n, axis, method="blelloch"): if not OPTIONS["use_bottleneck"] and not OPTIONS["use_numbagg"]: raise RuntimeError( "ffill & bfill requires bottleneck or numbagg to be enabled." " Call `xr.set_options(use_bottleneck=True)` or `xr.set_options(use_numbagg=True)` to enable one." ) if is_duck_dask_array(array): - return dask_array_ops.push(array, n, axis) + return dask_array_ops.push(array, n, axis, method=method) else: return _push(array, n, axis) diff --git a/xarray/tests/test_duck_array_ops.py b/xarray/tests/test_duck_array_ops.py index f9d0141ead3..a2f5631ce1b 100644 --- a/xarray/tests/test_duck_array_ops.py +++ b/xarray/tests/test_duck_array_ops.py @@ -1008,7 +1008,8 @@ def test_least_squares(use_dask, skipna): @requires_dask @requires_bottleneck -def test_push_dask(): +@pytest.mark.parametrize("method", ["sequential", "blelloch"]) +def test_push_dask(method): import bottleneck import dask.array @@ -1018,13 +1019,18 @@ def test_push_dask(): expected = bottleneck.push(array, axis=0, n=n) for c in range(1, 11): with raise_if_dask_computes(): - actual = push(dask.array.from_array(array, chunks=c), axis=0, n=n) + actual = push( + dask.array.from_array(array, chunks=c), axis=0, n=n, method=method + ) np.testing.assert_equal(actual, expected) # some chunks of size-1 with NaN with raise_if_dask_computes(): actual = push( - dask.array.from_array(array, chunks=(1, 2, 3, 2, 2, 1, 1)), axis=0, n=n + dask.array.from_array(array, chunks=(1, 2, 3, 2, 2, 1, 1)), + axis=0, + n=n, + method=method, ) np.testing.assert_equal(actual, expected) From 864b35a12ae890af540f35faee258df8e67d1e5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kai=20M=C3=BChlbauer?= Date: Fri, 15 Nov 2024 10:28:52 +0100 Subject: [PATCH 2/4] add 'User-Agent'-header to pooch.retrieve (#9782) * add 'User-Agent'-header to pooch.retrieve * try sys.modules * Apply suggestions from code review Co-authored-by: Mathias Hauser * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add whats-new.rst entry --------- Co-authored-by: Mathias Hauser Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- doc/whats-new.rst | 2 ++ xarray/tutorial.py | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index b74b0fb84de..ee826e6e56f 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -64,6 +64,8 @@ Bug fixes By `Pascal Bourgault `_. - Fix CF decoding of ``grid_mapping`` to allow all possible formats, add tests (:issue:`9761`, :pull:`9765`). By `Kai Mühlbauer `_. +- Add `User-Agent` to request-headers when retrieving tutorial data (:issue:`9774`, :pull:`9782`) + By `Kai Mühlbauer `_. Documentation ~~~~~~~~~~~~~ diff --git a/xarray/tutorial.py b/xarray/tutorial.py index ccdb0d8e031..9a5d52ed285 100644 --- a/xarray/tutorial.py +++ b/xarray/tutorial.py @@ -10,6 +10,7 @@ import os import pathlib +import sys from typing import TYPE_CHECKING import numpy as np @@ -157,8 +158,13 @@ def open_dataset( url = f"{base_url}/raw/{version}/{path.name}" + headers = {"User-Agent": f"xarray {sys.modules['xarray'].__version__}"} + downloader = pooch.HTTPDownloader(headers=headers) + # retrieve the file - filepath = pooch.retrieve(url=url, known_hash=None, path=cache_dir) + filepath = pooch.retrieve( + url=url, known_hash=None, path=cache_dir, downloader=downloader + ) ds = _open_dataset(filepath, engine=engine, **kws) if not cache: ds = ds.load() From 9f459c3f4eb38a016966c6dce64bd07ec44c06ab Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Fri, 15 Nov 2024 21:19:02 +0100 Subject: [PATCH 3/4] Fix open_mfdataset for list of fsspec files (#9785) * Fix open_mfdataset for list of fsspec files * Rewrite to for loop * Fixup --- xarray/backends/common.py | 17 +++++++++-------- xarray/tests/test_backends.py | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/xarray/backends/common.py b/xarray/backends/common.py index c51cf3d1964..534567221b5 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -136,14 +136,15 @@ def _find_absolute_paths( def _normalize_path_list( lpaths: NestedSequence[str | os.PathLike], ) -> NestedSequence[str]: - return [ - ( - _normalize_path(p) - if isinstance(p, str | os.PathLike) - else _normalize_path_list(p) - ) - for p in lpaths - ] + paths = [] + for p in lpaths: + if isinstance(p, str | os.PathLike): + paths.append(_normalize_path(p)) + elif isinstance(p, list): + paths.append(_normalize_path_list(p)) # type: ignore[arg-type] + else: + paths.append(p) # type: ignore[arg-type] + return paths return _normalize_path_list(paths) diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index 7be6eb5ed0d..c543333c61e 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -5511,6 +5511,8 @@ def test_source_encoding_always_present_with_fsspec() -> None: fs = fsspec.filesystem("file") with fs.open(tmp) as f, open_dataset(f) as ds: assert ds.encoding["source"] == tmp + with fs.open(tmp) as f, open_mfdataset([f]) as ds: + assert "foo" in ds def _assert_no_dates_out_of_range_warning(record): From d5f84dd1ef4c023cf2ea0a38866c9d9cd50487e7 Mon Sep 17 00:00:00 2001 From: Michael Niklas Date: Fri, 15 Nov 2024 22:03:23 +0100 Subject: [PATCH 4/4] Add download stats badges (#9786) --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bd62e5f2fd4..3e95d23fc04 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,11 @@ [![Code coverage](https://codecov.io/gh/pydata/xarray/branch/main/graph/badge.svg?flag=unittests)](https://codecov.io/gh/pydata/xarray) [![Docs](https://readthedocs.org/projects/xray/badge/?version=latest)](https://docs.xarray.dev/) [![Benchmarked with asv](https://img.shields.io/badge/benchmarked%20by-asv-green.svg?style=flat)](https://pandas.pydata.org/speed/xarray/) -[![Available on pypi](https://img.shields.io/pypi/v/xarray.svg)](https://pypi.python.org/pypi/xarray/) [![Formatted with black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/python/black) [![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/) +[![Available on pypi](https://img.shields.io/pypi/v/xarray.svg)](https://pypi.python.org/pypi/xarray/) +[![PyPI - Downloads](https://img.shields.io/pypi/dm/xarray)](https://pypistats.org/packages/xarray) +[![Conda - Downloads](https://img.shields.io/conda/dn/anaconda/xarray?label=conda%7Cdownloads)](https://anaconda.org/anaconda/xarray) [![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.11183201.svg)](https://doi.org/10.5281/zenodo.11183201) [![Examples on binder](https://img.shields.io/badge/launch-binder-579ACA.svg?logo=)](https://mybinder.org/v2/gh/pydata/xarray/main?urlpath=lab/tree/doc/examples/weather-data.ipynb) [![Twitter](https://img.shields.io/twitter/follow/xarray_dev?style=social)](https://twitter.com/xarray_dev)