Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the correct chunktype for Dask arrays #5801

Merged
merged 10 commits into from
Apr 26, 2024
3 changes: 2 additions & 1 deletion docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ This document explains the changes made to Iris for this release
🐛 Bugs Fixed
=============

#. N/A
#. `@bouweandela`_ updated the ``chunktype`` of Dask arrays, so it corresponds
to the array content. (:pull:`5801`)


💣 Incompatible Changes
Expand Down
8 changes: 4 additions & 4 deletions lib/iris/_concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from collections import defaultdict, namedtuple
import warnings

import dask.array as da
import numpy as np

from iris._lazy_data import concatenate as concatenate_arrays
import iris.coords
import iris.cube
import iris.exceptions
Expand Down Expand Up @@ -1112,7 +1112,7 @@ def _build_cell_measures(self):
skton.signature.cell_measures_and_dims[i].coord.data
for skton in skeletons
]
data = np.concatenate(tuple(data), axis=dim)
data = concatenate_arrays(tuple(data), axis=dim)

# Generate the associated metadata.
kwargs = cube_signature.cm_metadata[i].defn._asdict()
Expand Down Expand Up @@ -1152,7 +1152,7 @@ def _build_ancillary_variables(self):
skton.signature.ancillary_variables_and_dims[i].coord.data
for skton in skeletons
]
data = np.concatenate(tuple(data), axis=dim)
data = concatenate_arrays(tuple(data), axis=dim)

# Generate the associated metadata.
kwargs = cube_signature.av_metadata[i].defn._asdict()
Expand Down Expand Up @@ -1245,7 +1245,7 @@ def _build_data(self):
skeletons = self._skeletons
data = [skeleton.data for skeleton in skeletons]

data = da.concatenate(data, self.axis)
data = concatenate_arrays(data, self.axis)

return data

Expand Down
150 changes: 120 additions & 30 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

from functools import lru_cache, wraps
from typing import Sequence

import dask
import dask.array as da
Expand Down Expand Up @@ -41,14 +42,19 @@ def is_lazy_data(data):
return result


def is_masked_data(data: np.ndarray | da.Array) -> bool:
"""Return whether the argument is a masked array."""
return isinstance(da.utils.meta_from_array(data), np.ma.MaskedArray)


def is_lazy_masked_data(data):
"""Determine whether managed data is lazy and masked.

Return True if the argument is both an Iris 'lazy' data array and the
underlying array is of masked type. Otherwise return False.

"""
return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data))
return is_lazy_data(data) and is_masked_data(data)


@lru_cache
Expand Down Expand Up @@ -220,9 +226,7 @@ def _optimum_chunksize(
)


def as_lazy_data(
data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False
):
def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None):
"""Convert the input array `data` to a :class:`dask.array.Array`.

Parameters
Expand All @@ -232,17 +236,18 @@ def as_lazy_data(
This will be converted to a :class:`dask.array.Array`.
chunks : list of int, optional
If present, a source chunk shape, e.g. for a chunked netcdf variable.
If set to "auto", Iris chunking optimisation will be bypassed, and dask's
default chunking will be used instead.
asarray : bool, default=False
If True, then chunks will be converted to instances of `ndarray`.
Set to False (default) to pass passed chunks through unchanged.
meta : numpy.ndarray, optional
Empty ndarray created with same NumPy backend, ndim and dtype as the
Dask Array being created.
dims_fixed : list of bool, optional
If set, a list of values equal in length to 'chunks' or data.ndim.
'True' values indicate a dimension which can not be changed, i.e. the
result for that index must equal the value in 'chunks' or data.shape.
dask_chunking : bool, default=False
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
If True, Iris chunking optimisation will be bypassed, and dask's default
chunking will be used instead. Including a value for chunks while dask_chunking
is set to True will result in a failure.

Returns
-------
Expand All @@ -258,13 +263,16 @@ def as_lazy_data(
but reduced by a factor if that exceeds the dask default chunksize.

"""
if dask_chunking:
if chunks is not None:
raise ValueError(
f"Dask chunking chosen, but chunks already assigned value {chunks}"
)
lazy_params = {"asarray": asarray, "meta": np.ndarray}
else:
if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)

if meta is None and not isinstance(data, (np.ndarray, da.Array)):
raise ValueError(
"For performance reasons, `meta` cannot be `None` if `data` is "
"anything other than a Numpy or Dask array."
)

if chunks != "auto":
if chunks is None:
# No existing chunks : Make a chunk the shape of the entire input array
# (but we will subdivide it if too big).
Expand All @@ -281,15 +289,9 @@ def as_lazy_data(
dtype=data.dtype,
dims_fixed=dims_fixed,
)
lazy_params = {
"chunks": chunks,
"asarray": asarray,
"meta": np.ndarray,
}
if isinstance(data, ma.core.MaskedConstant):
data = ma.masked_array(data.data, mask=data.mask)

if not is_lazy_data(data):
data = da.from_array(data, **lazy_params)
data = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta)
return data


Expand Down Expand Up @@ -351,31 +353,119 @@ def as_concrete_data(data):
return data


def multidim_lazy_stack(stack):
def _combine(
arrays: Sequence[da.Array | np.ndarray],
operation: str,
**kwargs,
) -> da.Array | np.ndarray:
"""Combine multiple arrays into a single array.

bouweandela marked this conversation as resolved.
Show resolved Hide resolved
Provides enhanced versions of :func:`~dask.array.concatenate` or :func:`~dask.array.stack`,
which ensure that all computed results are masked-array, if the combined .meta is masked.

Parameters
----------
arrays :
The arrays to combine.
operation :
The combination operation to apply.
**kwargs :
Any keyword arguments to pass to the combination operation.

"""
lazy = any(is_lazy_data(a) for a in arrays)
masked = any(is_masked_data(a) for a in arrays)

array_module = np
if masked:
if lazy:
# Avoid inconsistent array type when slicing resulting array
arrays = tuple(
a if is_lazy_masked_data(a) else da.ma.masked_array(a) for a in arrays
)
else:
# Avoid dropping the masks
array_module = np.ma

func = getattr(array_module, operation)
return func(arrays, **kwargs)


def concatenate(
arrays: Sequence[da.Array | np.ndarray],
axis: int = 0,
) -> da.Array | np.ndarray:
"""Concatenate a sequence of arrays along a new axis.

bouweandela marked this conversation as resolved.
Show resolved Hide resolved
Improves on the regular :func:`dask.array.concatenate` by always respecting a masked
``.meta``, as described for :func:`_combine`.

Parameters
----------
arrays :
The arrays must have the same shape, except in the dimension
corresponding to `axis` (the first, by default).
axis :
Dimension along which to align all of the arrays. If axis is None,
arrays are flattened before use.

Returns
-------
The concatenated array.

"""
return _combine(arrays, operation="concatenate", axis=axis)


def stack(
arrays: Sequence[da.Array | np.ndarray],
axis: int = 0,
) -> da.Array | np.ndarray:
"""Stack a sequence of arrays along a new axis.

bouweandela marked this conversation as resolved.
Show resolved Hide resolved
Improves on the regular :func:`dask.array.stack` by always respecting a masked
``.meta``, as described for :func:`_combine`.

Parameters
----------
arrays :
The arrays must have the same shape.
axis :
Dimension along which to align all of the arrays.

Returns
-------
The stacked array.

"""
return _combine(arrays, operation="stack", axis=axis)


def multidim_lazy_stack(arr):
"""Recursively build a multidimensional stacked dask array.

This is needed because :meth:`dask.array.Array.stack` only accepts a
1-dimensional list.

Parameters
----------
stack :
arr :
An ndarray of :class:`dask.array.Array`.

Returns
-------
The input array converted to a lazy :class:`dask.array.Array`.

"""
if stack.ndim == 0:
if arr.ndim == 0:
# A 0-d array cannot be stacked.
result = stack.item()
elif stack.ndim == 1:
result = arr.item()
elif arr.ndim == 1:
# Another base case : simple 1-d goes direct in dask.
result = da.stack(list(stack))
result = stack(list(arr))
else:
# Recurse because dask.stack does not do multi-dimensional.
result = da.stack([multidim_lazy_stack(subarray) for subarray in stack])
result = stack([multidim_lazy_stack(subarray) for subarray in arr])
return result


Expand Down
3 changes: 2 additions & 1 deletion lib/iris/aux_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import dask.array as da
import numpy as np

from iris._lazy_data import concatenate
from iris.common import CFVariableMixin, CoordMetadata, metadata_manager_factory
import iris.coords
from iris.warnings import IrisIgnoringBoundsWarning
Expand Down Expand Up @@ -1076,7 +1077,7 @@ def _derive(self, sigma, eta, depth, depth_c, zlev, nsigma, coord_dims_func):
result_rest_levs = zlev[z_slices_rest] * ones_full_result[z_slices_rest]

# Combine nsigma and 'rest' levels for the final result.
result = da.concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim)
result = concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim)
return result

def make_coord(self, coord_dims_func):
Expand Down
15 changes: 3 additions & 12 deletions lib/iris/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import zlib

from cf_units import Unit
import dask.array as da
import numpy as np
import numpy.ma as ma

Expand Down Expand Up @@ -3134,12 +3133,7 @@ def make_chunk(key):
result = chunks[0]
else:
chunk_data = [chunk.core_data() for chunk in chunks]
if self.has_lazy_data():
func = da.concatenate
else:
module = ma if ma.isMaskedArray(self.data) else np
func = module.concatenate
data = func(chunk_data, dim)
data = _lazy.concatenate(chunk_data, axis=dim)
result = iris.cube.Cube(data)
result.metadata = deepcopy(self.metadata)

Expand Down Expand Up @@ -4432,13 +4426,10 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs):

# Choose appropriate data and functions for data aggregation.
if aggregator.lazy_func is not None and self.has_lazy_data():
stack = da.stack
input_data = self.lazy_data()
agg_method = aggregator.lazy_aggregate
else:
input_data = self.data
# Note numpy.stack does not preserve masks.
stack = ma.stack if ma.isMaskedArray(input_data) else np.stack
agg_method = aggregator.aggregate

# Create data and weights slices.
Expand Down Expand Up @@ -4475,11 +4466,11 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs):
# before combining the different slices.
if return_weights:
result, weights_result = list(zip(*result))
aggregateby_weights = stack(weights_result, axis=dimension_to_groupby)
aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby)
else:
aggregateby_weights = None

aggregateby_data = stack(result, axis=dimension_to_groupby)
aggregateby_data = _lazy.stack(result, axis=dimension_to_groupby)
# Ensure plain ndarray is output if plain ndarray was input.
if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(input_data):
aggregateby_data = ma.getdata(aggregateby_data)
Expand Down
4 changes: 4 additions & 0 deletions lib/iris/fileformats/netcdf/_thread_safe_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ def ndim(self):
# noqa: D102
return len(self.shape)

@property
def dask_meta(self):
return np.ma.array(np.empty((0,) * self.ndim, dtype=self.dtype), mask=True)

def __getitem__(self, keys):
# Using a DatasetWrapper causes problems with invalid ID's and the
# netCDF4 library, presumably because __getitem__ gets called so many
Expand Down
7 changes: 5 additions & 2 deletions lib/iris/fileformats/netcdf/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _get_cf_var_data(cf_var, filename):
# Get the chunking specified for the variable : this is either a shape, or
# maybe the string "contiguous".
if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK:
result = as_lazy_data(proxy, chunks=None, dask_chunking=True)
result = as_lazy_data(proxy, meta=proxy.dask_meta, chunks="auto")
else:
chunks = cf_var.cf_data.chunking()
if chunks is None:
Expand Down Expand Up @@ -285,7 +285,10 @@ def _get_cf_var_data(cf_var, filename):
if dims_fixed is None:
dims_fixed = [dims_fixed]
result = as_lazy_data(
proxy, chunks=chunks, dims_fixed=tuple(dims_fixed)
proxy,
meta=proxy.dask_meta,
chunks=chunks,
dims_fixed=tuple(dims_fixed),
)
return result

Expand Down
Loading
Loading