Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify meta in dask.array.map_blocks #5989

Merged
merged 11 commits into from
Oct 28, 2024
2 changes: 1 addition & 1 deletion docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ This document explains the changes made to Iris for this release
=============

#. `@bouweandela`_ updated the ``chunktype`` of Dask arrays, so it corresponds
to the array content. (:pull:`5801`)
to the array content. (:pull:`5801`) and (:pull:`5989`)

#. `@rcomer`_ made the :obj:`~iris.analysis.WPERCENTILE` aggregator work with
:func:`~iris.cube.Cube.rolling_window`. (:issue:`5777`, :pull:`5825`)
Expand Down
19 changes: 14 additions & 5 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,12 @@ def lazy_elementwise(lazy_array, elementwise_op):
# This makes good practical sense for unit conversions, as a Unit.convert
# call may cast to float, or not, depending on unit equality : Thus, it's
# much safer to get udunits to decide that for us.
dtype = elementwise_op(np.zeros(1, lazy_array.dtype)).dtype
meta = da.utils.meta_from_array(lazy_array)
new_meta = elementwise_op(meta)
pp-mo marked this conversation as resolved.
Show resolved Hide resolved

return da.map_blocks(elementwise_op, lazy_array, dtype=dtype)
return da.map_blocks(
elementwise_op, lazy_array, dtype=new_meta.dtype, meta=new_meta
)


def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
Expand Down Expand Up @@ -596,8 +599,14 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
for dim, size in zip(dims, out_sizes):
out_chunks[dim] = size

result = data.map_blocks(
func, *args, chunks=out_chunks, dtype=src.dtype, **kwargs
)
# Assume operation does not change dtype and meta if not specified.
if "meta" not in kwargs:
kwargs["meta"] = da.utils.meta_from_array(data)
if "dtype" in kwargs:
kwargs["meta"] = kwargs["meta"].astype(kwargs["dtype"])
else:
kwargs["dtype"] = kwargs["meta"].dtype

result = data.map_blocks(func, *args, chunks=out_chunks, **kwargs)
pp-mo marked this conversation as resolved.
Show resolved Hide resolved

return result
5 changes: 4 additions & 1 deletion lib/iris/experimental/ugrid/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def fill_region(target, regiondata, regioninds):
# Notes on resultant calculation properties:
# 1. map_blocks is chunk-mapped, so it is parallelisable and space-saving
# 2. However, fetching less than a whole chunk is not efficient
meta = np.ma.array(
np.empty((0,) * result_array.ndim, dtype=result_array.dtype), mask=True
)
for cube in submesh_cubes:
# Lazy data array from the region cube
sub_data = cube.lazy_data()
Expand All @@ -300,7 +303,7 @@ def fill_region(target, regiondata, regioninds):
sub_data,
indarr,
dtype=result_array.dtype,
meta=np.ndarray,
meta=meta,
)

# Construct the result cube
Expand Down
13 changes: 13 additions & 0 deletions lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,24 @@ def setUp(self):
self.args = ("linear", "mask")
self.regridder = Regridder(self.cube, self.cube, *self.args)
self.lazy_cube = self.cube.copy(da.asarray(self.cube.data))
self.lazy_masked_cube = self.lazy_cube.copy(da.ma.masked_array(self.cube.data))
self.lazy_regridder = Regridder(self.lazy_cube, self.lazy_cube, *self.args)

def test_lazy_regrid(self):
result = self.lazy_regridder(self.lazy_cube)
self.assertTrue(result.has_lazy_data())
self.assertTrue(
isinstance(da.utils.meta_from_array(result.core_data()), np.ndarray)
)
expected = self.regridder(self.cube)
self.assertTrue(result == expected)

def test_lazy_masked_regrid(self):
result = self.lazy_regridder(self.lazy_masked_cube)
self.assertTrue(result.has_lazy_data())
self.assertTrue(
isinstance(da.utils.meta_from_array(result.core_data()), np.ma.MaskedArray)
)
expected = self.regridder(self.cube)
self.assertTrue(result == expected)

Expand Down
10 changes: 6 additions & 4 deletions lib/iris/tests/unit/analysis/test_PERCENTILE.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def test_default_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent)
result = self.agg_method(data, axis=axis, percent=percent)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key in ["alphap", "betap"]:
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], 1)

Expand All @@ -170,10 +170,12 @@ def test_chosen_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent, alphap=0.6, betap=0.5)
result = self.agg_method(
data, axis=axis, percent=percent, alphap=0.6, betap=0.5
)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key, val in zip(["alphap", "betap"], [0.6, 0.5]):
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], val)

Expand Down
55 changes: 54 additions & 1 deletion lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,19 @@ def create_mock_cube(array):
class Test_map_complete_blocks(tests.IrisTest):
def setUp(self):
self.array = np.arange(8).reshape(2, 4)
self.func = lambda chunk: chunk + 1

def func(chunk):
"""Use a function that cannot be 'sampled'.

To make sure the call to map_blocks is correct for any function,
we define this function that cannot be called with size 0 arrays
to infer the output meta.
"""
if chunk.size == 0:
raise ValueError
return chunk + 1

self.func = func
self.func_result = self.array + 1

def test_non_lazy_input(self):
Expand Down Expand Up @@ -61,6 +73,47 @@ def test_dask_array_input(self):
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)

def test_dask_masked_array_input(self):
array = da.ma.masked_array(np.arange(2), mask=np.arange(2))
result = map_complete_blocks(array, self.func, dims=tuple(), out_sizes=tuple())
self.assertTrue(is_lazy_data(result))
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertArrayEqual(result.compute(), np.ma.masked_array([1, 2], mask=[0, 1]))

def test_dask_array_input_with_meta(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (4,)))
meta = np.empty((), dtype=np.float32)

def func(chunk):
if chunk.size == 0:
raise ValueError
return (chunk + 1).astype(np.float32)

result = map_complete_blocks(
lazy_array, func, dims=(1,), out_sizes=(4,), meta=meta
)
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ndarray))
self.assertTrue(result.dtype == meta.dtype)
self.assertTrue(result.compute().dtype == meta.dtype)
self.assertArrayEqual(result.compute(), self.func_result)

def test_dask_array_input_with_dtype(self):
lazy_array = da.ma.masked_array(self.array, chunks=((1, 1), (4,)))
dtype = np.float32

def func(chunk):
if chunk.size == 0:
raise ValueError
return (chunk + 1).astype(np.float32)

result = map_complete_blocks(
lazy_array, func, dims=(1,), out_sizes=(4,), dtype=dtype
)
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertTrue(result.dtype == dtype)
self.assertTrue(result.compute().dtype == dtype)
self.assertArrayEqual(result.compute(), self.func_result)

def test_rechunk(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (2, 2)))
cube, _ = create_mock_cube(lazy_array)
Expand Down
Loading