Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type chunkmanagers #9227

Draft
wants to merge 61 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
3fdb52f
Type parallelcompat
Illviljan Jul 10, 2024
c24b7ae
Type daskmanager
Illviljan Jul 10, 2024
a932c71
Add chunks typing
Illviljan Jul 10, 2024
676f045
Update times.py
Illviljan Jul 10, 2024
6f79bdc
Update times.py
Illviljan Jul 10, 2024
3d48d44
Update _typing.py
Illviljan Jul 10, 2024
e7041e0
Update test_parallelcompat.py
Illviljan Jul 10, 2024
6f56dd8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 10, 2024
19b2674
Update indexing.py
Illviljan Jul 10, 2024
11aa840
Update core.py
Illviljan Jul 10, 2024
62635eb
Update variable.py
Illviljan Jul 10, 2024
9c996a8
Update computation.py
Illviljan Jul 10, 2024
6625874
Merge branch 'namedarray_chunkmanager' of https://github.com/Illvilja…
Illviljan Jul 10, 2024
81fcf85
Update variables.py
Illviljan Jul 10, 2024
c894c7b
Update dataset.py
Illviljan Jul 10, 2024
fe35554
Update _typing.py
Illviljan Jul 10, 2024
bc05489
Update parallelcompat.py
Illviljan Jul 10, 2024
cedff32
Update parallelcompat.py
Illviljan Jul 10, 2024
d43c0c1
Update test_coding_times.py
Illviljan Jul 10, 2024
6530440
Merge branch 'main' into namedarray_chunkmanager
Illviljan Jul 11, 2024
2f1a6ca
Merge branch 'main' into namedarray_chunkmanager
Illviljan Jul 11, 2024
d5d3e35
Merge branch 'main' into namedarray_chunkmanager
max-sixty Jul 13, 2024
116c920
Merge branch 'main' into namedarray_chunkmanager
Illviljan Aug 3, 2024
6399dab
Update _typing.py
Illviljan Aug 3, 2024
156a953
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 3, 2024
6a08740
Update times.py
Illviljan Aug 3, 2024
daba54e
Merge branch 'namedarray_chunkmanager' of https://github.com/Illvilja…
Illviljan Aug 3, 2024
eef8fc8
Update core.py
Illviljan Aug 4, 2024
db98984
Update daskmanager.py
Illviljan Aug 4, 2024
96e1a1d
Update test_coding_times.py
Illviljan Aug 4, 2024
bf83275
Update test_coding_times.py
Illviljan Aug 4, 2024
31e1895
test pd.index
Illviljan Aug 5, 2024
30f8cdd
Update test_namedarray.py
Illviljan Aug 5, 2024
6ed1ae6
Update test_namedarray.py
Illviljan Aug 5, 2024
cd05c67
Add more helpful error
Illviljan Aug 5, 2024
459378f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 5, 2024
7bc2c31
Merge branch 'main' into namedarray_chunkmanager
Illviljan Aug 6, 2024
389aca7
Merge branch 'main' into namedarray_chunkmanager
Illviljan Aug 13, 2024
254a6b3
Merge branch 'main' into namedarray_chunkmanager
Illviljan Aug 14, 2024
85f49eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 14, 2024
7ab9fda
Merge branch 'main' into namedarray_chunkmanager
Illviljan Nov 17, 2024
1b791b7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 17, 2024
7632f3f
Update core.py
Illviljan Nov 17, 2024
ac3102e
Merge branch 'namedarray_chunkmanager' of https://github.com/Illvilja…
Illviljan Nov 17, 2024
340b70f
Update daskmanager.py
Illviljan Nov 17, 2024
52aacfe
Update parallelcompat.py
Illviljan Nov 17, 2024
ebf415d
chunkedduckarray
Illviljan Nov 17, 2024
02e7b28
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 17, 2024
e71a36a
Update datatree.py
Illviljan Nov 17, 2024
aa214e4
Merge branch 'namedarray_chunkmanager' of https://github.com/Illvilja…
Illviljan Nov 17, 2024
d7da450
Update times.py
Illviljan Nov 17, 2024
df29319
Update times.py
Illviljan Nov 17, 2024
6735cf0
Update times.py
Illviljan Nov 17, 2024
feaed2c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 17, 2024
9d0c480
Update times.py
Illviljan Nov 17, 2024
3e6b5dd
add __array___ test
Illviljan Nov 17, 2024
fd891e9
Update test_namedarray.py
Illviljan Nov 17, 2024
76d9de6
Merge branch 'namedarray_chunkmanager' of https://github.com/Illvilja…
Illviljan Nov 17, 2024
fa148fa
Merge branch 'main' into namedarray_chunkmanager
Illviljan Nov 26, 2024
82e5f58
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 26, 2024
a4b4e7e
Merge branch 'main' into namedarray_chunkmanager
Illviljan Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 56 additions & 22 deletions xarray/coding/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Callable, Hashable
from datetime import datetime, timedelta
from functools import partial
from typing import TYPE_CHECKING, Literal, Union, cast
from typing import TYPE_CHECKING, Literal, Union, cast, overload

import numpy as np
import pandas as pd
Expand All @@ -27,18 +27,24 @@
from xarray.core.pdcompat import nanosecond_precision_timestamp
from xarray.core.utils import attempt_import, emit_user_level_warning
from xarray.core.variable import Variable
from xarray.namedarray.parallelcompat import T_ChunkedArray, get_chunked_array_type
from xarray.namedarray.pycompat import is_chunked_array
from xarray.namedarray._typing import (
_chunkedarrayfunction_or_api,
chunkedduckarray,
duckarray,
)
from xarray.namedarray.parallelcompat import get_chunked_array_type
from xarray.namedarray.utils import is_duck_dask_array

try:
import cftime
except ImportError:
cftime = None

from xarray.core.types import CFCalendar, NPDatetimeUnitOptions, T_DuckArray
from xarray.core.types import CFCalendar, NPDatetimeUnitOptions

T_Name = Union[Hashable, None]
PandasTypes = Union[pd.Index, pd.DatetimeIndex]


# standard calendars recognized by cftime
_STANDARD_CALENDARS = {"standard", "gregorian", "proleptic_gregorian"}
Expand Down Expand Up @@ -716,12 +722,26 @@ def _cast_to_dtype_if_safe(num: np.ndarray, dtype: np.dtype) -> np.ndarray:
return cast_num


@overload
def encode_cf_datetime(
dates: chunkedduckarray,
units: str | None = None,
calendar: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[chunkedduckarray, str, str]: ...
@overload
def encode_cf_datetime(
dates: duckarray | PandasTypes,
units: str | None = None,
calendar: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[duckarray, str, str]: ...
def encode_cf_datetime(
dates: T_DuckArray, # type: ignore[misc]
dates: duckarray | PandasTypes | chunkedduckarray,
units: str | None = None,
calendar: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[T_DuckArray, str, str]:
) -> tuple[duckarray | PandasTypes | chunkedduckarray, str, str]:
"""Given an array of datetime objects, returns the tuple `(num, units,
calendar)` suitable for a CF compliant time variable.

Expand All @@ -732,19 +752,19 @@ def encode_cf_datetime(
cftime.date2num
"""
dates = asarray(dates)
if is_chunked_array(dates):
if isinstance(dates, _chunkedarrayfunction_or_api):
return _lazily_encode_cf_datetime(dates, units, calendar, dtype)
else:
return _eagerly_encode_cf_datetime(dates, units, calendar, dtype)


def _eagerly_encode_cf_datetime(
dates: T_DuckArray, # type: ignore[misc]
dates: duckarray | PandasTypes,
units: str | None = None,
calendar: str | None = None,
dtype: np.dtype | None = None,
allow_units_modification: bool = True,
) -> tuple[T_DuckArray, str, str]:
) -> tuple[duckarray | PandasTypes, str, str]:
dates = asarray(dates)

data_units = infer_datetime_units(dates)
Expand Down Expand Up @@ -822,23 +842,23 @@ def _eagerly_encode_cf_datetime(


def _encode_cf_datetime_within_map_blocks(
dates: T_DuckArray, # type: ignore[misc]
dates: duckarray,
units: str,
calendar: str,
dtype: np.dtype,
) -> T_DuckArray:
) -> duckarray | PandasTypes:
num, *_ = _eagerly_encode_cf_datetime(
dates, units, calendar, dtype, allow_units_modification=False
)
return num


def _lazily_encode_cf_datetime(
dates: T_ChunkedArray,
dates: chunkedduckarray,
units: str | None = None,
calendar: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[T_ChunkedArray, str, str]:
) -> tuple[chunkedduckarray, str, str]:
if calendar is None:
# This will only trigger minor compute if dates is an object dtype array.
calendar = infer_calendar_name(dates)
Expand Down Expand Up @@ -871,24 +891,36 @@ def _lazily_encode_cf_datetime(
return num, units, calendar


@overload
def encode_cf_timedelta(
timedeltas: T_DuckArray, # type: ignore[misc]
timedeltas: chunkedduckarray,
units: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[T_DuckArray, str]:
) -> tuple[chunkedduckarray, str]: ...
@overload
def encode_cf_timedelta(
timedeltas: duckarray | PandasTypes,
units: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[duckarray, str]: ...
def encode_cf_timedelta(
timedeltas: chunkedduckarray | duckarray | PandasTypes,
units: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[chunkedduckarray | duckarray, str]:
timedeltas = asarray(timedeltas)
if is_chunked_array(timedeltas):
if isinstance(timedeltas, _chunkedarrayfunction_or_api):
return _lazily_encode_cf_timedelta(timedeltas, units, dtype)
else:
return _eagerly_encode_cf_timedelta(timedeltas, units, dtype)


def _eagerly_encode_cf_timedelta(
timedeltas: T_DuckArray, # type: ignore[misc]
timedeltas: duckarray,
units: str | None = None,
dtype: np.dtype | None = None,
allow_units_modification: bool = True,
) -> tuple[T_DuckArray, str]:
) -> tuple[duckarray, str]:
data_units = infer_timedelta_units(timedeltas)

if units is None:
Expand Down Expand Up @@ -936,19 +968,21 @@ def _eagerly_encode_cf_timedelta(


def _encode_cf_timedelta_within_map_blocks(
timedeltas: T_DuckArray, # type: ignore[misc]
timedeltas: duckarray,
units: str,
dtype: np.dtype,
) -> T_DuckArray:
) -> duckarray:
num, _ = _eagerly_encode_cf_timedelta(
timedeltas, units, dtype, allow_units_modification=False
)
return num


def _lazily_encode_cf_timedelta(
timedeltas: T_ChunkedArray, units: str | None = None, dtype: np.dtype | None = None
) -> tuple[T_ChunkedArray, str]:
timedeltas: chunkedduckarray,
units: str | None = None,
dtype: np.dtype | None = None,
) -> tuple[chunkedduckarray, str]:
if units is None and dtype is None:
units = "nanoseconds"
dtype = np.dtype("int64")
Expand Down
2 changes: 1 addition & 1 deletion xarray/coding/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def lazy_elemwise_func(array, func: Callable, dtype: np.typing.DTypeLike):
if is_chunked_array(array):
chunkmanager = get_chunked_array_type(array)

return chunkmanager.map_blocks(func, array, dtype=dtype) # type: ignore[arg-type]
return chunkmanager.map_blocks(func, array, dtype=dtype)
else:
return _ElementwiseFunctionArray(array, func, dtype)

Expand Down
2 changes: 2 additions & 0 deletions xarray/core/computation.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from xarray.core.types import Dims, T_DataArray
from xarray.core.utils import is_dict_like, is_scalar, parse_dims_as_set, result_name
from xarray.core.variable import Variable
from xarray.namedarray._typing import chunkedduckarray
from xarray.namedarray.parallelcompat import get_chunked_array_type
from xarray.namedarray.pycompat import is_chunked_array
from xarray.util.deprecation_helpers import deprecate_dims
Expand Down Expand Up @@ -793,6 +794,7 @@ def apply_variable_ufunc(
)

def func(*arrays):
res: chunkedduckarray | tuple[chunkedduckarray, ...]
res = chunkmanager.apply_gufunc(
numpy_func,
signature.to_gufunc_string(exclude_dims),
Expand Down
4 changes: 3 additions & 1 deletion xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
)
from xarray.core.weighted import DatasetWeighted
from xarray.groupers import Grouper, Resampler
from xarray.namedarray._typing import chunkedduckarray, duckarray
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint


Expand Down Expand Up @@ -897,7 +898,7 @@ def load(self, **kwargs) -> Self:
chunkmanager = get_chunked_array_type(*lazy_data.values())

# evaluate all the chunked arrays simultaneously
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
evaluated_data: tuple[duckarray[Any, Any], ...] = chunkmanager.compute(
*lazy_data.values(), **kwargs
)

Expand Down Expand Up @@ -1082,6 +1083,7 @@ def _persist_inplace(self, **kwargs) -> Self:
chunkmanager = get_chunked_array_type(*lazy_data.values())

# evaluate all the dask arrays simultaneously
evaluated_data: tuple[chunkedduckarray, ...]
evaluated_data = chunkmanager.persist(*lazy_data.values(), **kwargs)

for k, data in zip(lazy_data, evaluated_data, strict=False):
Expand Down
7 changes: 4 additions & 3 deletions xarray/core/datatree.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
T_ChunksFreq,
ZarrWriteModes,
)
from xarray.namedarray._typing import chunkedduckarray, duckarray
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint

# """
Expand Down Expand Up @@ -1954,9 +1955,8 @@ def load(self, **kwargs) -> Self:
chunkmanager = get_chunked_array_type(*flat_lazy_data.values())

# evaluate all the chunked arrays simultaneously
evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
*flat_lazy_data.values(), **kwargs
)
evaluated_data: tuple[duckarray[Any, Any], ...]
evaluated_data = chunkmanager.compute(*flat_lazy_data.values(), **kwargs)

for (path, var_name), data in zip(
flat_lazy_data, evaluated_data, strict=False
Expand Down Expand Up @@ -2018,6 +2018,7 @@ def _persist_inplace(self, **kwargs) -> Self:
chunkmanager = get_chunked_array_type(*flat_lazy_data.values())

# evaluate all the dask arrays simultaneously
evaluated_data: tuple[chunkedduckarray[Any, Any], ...]
evaluated_data = chunkmanager.persist(*flat_lazy_data.values(), **kwargs)

for (path, var_name), data in zip(
Expand Down
2 changes: 1 addition & 1 deletion xarray/core/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ def _arrayize_vectorized_indexer(


def _chunked_array_with_chunks_hint(
array, chunks, chunkmanager: ChunkManagerEntrypoint[Any]
array, chunks, chunkmanager: ChunkManagerEntrypoint
):
"""Create a chunked array using the chunks hint for dimensions of size > 1."""

Expand Down
2 changes: 1 addition & 1 deletion xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2600,7 +2600,7 @@ def chunk( # type: ignore[override]
name: str | None = None,
lock: bool | None = None,
inline_array: bool | None = None,
chunked_array_type: str | ChunkManagerEntrypoint[Any] | None = None,
chunked_array_type: str | ChunkManagerEntrypoint | None = None,
from_array_kwargs: Any = None,
**chunks_kwargs: Any,
) -> Self:
Expand Down
23 changes: 19 additions & 4 deletions xarray/namedarray/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,16 @@ def dtype(self) -> _DType_co: ...
_Axes = tuple[_Axis, ...]
_AxisLike = Union[_Axis, _Axes]

_Chunks = tuple[_Shape, ...]
_NormalizedChunks = tuple[tuple[int, ...], ...]
_Chunk = tuple[int, ...]
_Chunks = tuple[_Chunk, ...]
_ChunksLike = Union[
int, Literal["auto"], None, _Chunk, _Chunks
] # TODO: Literal["auto"]
_ChunksType = TypeVar("_ChunksType", bound=_Chunks)

# FYI in some cases we don't allow `None`, which this doesn't take account of.
# # FYI the `str` is for a size string, e.g. "16MB", supported by dask.
T_ChunkDim: TypeAlias = str | int | Literal["auto"] | None | tuple[int, ...]
T_ChunkDim: TypeAlias = str | int | Literal["auto"] | None | _Chunk
# We allow the tuple form of this (though arguably we could transition to named dims only)
T_Chunks: TypeAlias = T_ChunkDim | Mapping[Any, T_ChunkDim]

Expand Down Expand Up @@ -238,7 +243,7 @@ def chunks(self) -> _Chunks: ...

@runtime_checkable
class _chunkedarrayfunction(
_arrayfunction[_ShapeType_co, _DType_co], Protocol[_ShapeType_co, _DType_co]
_arrayfunction[_ShapeType, _DType_co], Protocol[_ShapeType, _DType_co]
):
"""
Chunked duck array supporting NEP 18.
Expand All @@ -249,6 +254,11 @@ class _chunkedarrayfunction(
@property
def chunks(self) -> _Chunks: ...

def rechunk(
self,
chunks: _ChunksLike,
) -> _chunkedarrayfunction[_ShapeType, _DType_co]: ...


@runtime_checkable
class _chunkedarrayapi(
Expand All @@ -263,6 +273,11 @@ class _chunkedarrayapi(
@property
def chunks(self) -> _Chunks: ...

def rechunk(
self,
chunks: _ChunksLike,
) -> _chunkedarrayapi[_ShapeType_co, _DType_co]: ...


# NamedArray can most likely use both __array_function__ and __array_namespace__:
_chunkedarrayfunction_or_api = (_chunkedarrayfunction, _chunkedarrayapi)
Expand Down
5 changes: 3 additions & 2 deletions xarray/namedarray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,8 @@ def sizes(self) -> dict[_Dim, _IntOrUnknown]:

def chunk(
self,
chunks: T_Chunks = {}, # noqa: B006 # even though it's unsafe, it is being used intentionally here (#4667)
chunked_array_type: str | ChunkManagerEntrypoint[Any] | None = None,
chunks: T_Chunks = {}, # noqa: B006 # even though it's unsafe, it is being used intentionally here (#4667)
chunked_array_type: str | ChunkManagerEntrypoint | None = None,
from_array_kwargs: Any = None,
**chunks_kwargs: Any,
) -> Self:
Expand Down Expand Up @@ -831,6 +831,7 @@ def chunk(
chunkmanager = guess_chunkmanager(chunked_array_type)

data_old = self._data
data_chunked: _chunkedarray[Any, _DType_co]
if chunkmanager.is_chunked_array(data_old):
data_chunked = chunkmanager.rechunk(data_old, chunks) # type: ignore[arg-type]
else:
Expand Down
Loading
Loading