From 0c9177c1bb64d810f8a78e8d0a5d39d3b68e800e Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 19:56:00 -0600 Subject: [PATCH 01/21] Fix subset_to_blocks --- flox/core.py | 54 +++++++++++++++++++++++++--------------------- tests/test_core.py | 32 +++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/flox/core.py b/flox/core.py index 0e2b73ac9..8fe7581e8 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1046,6 +1046,30 @@ def _reduce_blockwise( return result +def _normalize_indexes(array, flatblocks, blkshape): + unraveled = np.unravel_index(flatblocks, blkshape) + + normalized: list[Union[int, np.ndarray, slice]] = [] + for ax, idx in enumerate(unraveled): + i = _unique(idx).squeeze() + if i.ndim == 0: + normalized.append(i.item()) + else: + if np.array_equal(i, np.arange(blkshape[ax])): + normalized.append(slice(None)) + elif np.array_equal(i, np.arange(i[0], i[-1] + 1)): + normalized.append(slice(i[0], i[-1] + 1)) + else: + normalized.append(i) + full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized) + + # has no iterables + noiter = tuple(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized) + # has all iterables + alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")} + return alliter, noiter + + def subset_to_blocks( array: DaskArray, flatblocks: Sequence[int], blkshape: tuple[int] | None = None ) -> DaskArray: @@ -1065,33 +1089,13 @@ def subset_to_blocks( if blkshape is None: blkshape = array.blocks.shape - unraveled = np.unravel_index(flatblocks, blkshape) - normalized: list[Union[int, np.ndarray, slice]] = [] - for ax, idx in enumerate(unraveled): - i = _unique(idx).squeeze() - if i.ndim == 0: - normalized.append(i.item()) - else: - if np.array_equal(i, np.arange(blkshape[ax])): - normalized.append(slice(None)) - elif np.array_equal(i, np.arange(i[0], i[-1] + 1)): - normalized.append(slice(i[0], i[-1] + 1)) - else: - normalized.append(i) - full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized) - - # has no iterables - noiter = tuple(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized) - # has all iterables - alliter = { - ax: i if hasattr(i, "__len__") else slice(None) for ax, i in enumerate(full_normalized) - } + alliter, noiter = _normalize_indexes(array, flatblocks, blkshape) # apply everything but the iterables - if all(i == slice(None) for i in noiter): - return array - - subset = array.blocks[noiter] + if not all(i == slice(None) for i in noiter): + subset = array.blocks[noiter] + else: + subset = array for ax, inds in alliter.items(): if isinstance(inds, slice): diff --git a/tests/test_core.py b/tests/test_core.py index f9d412182..6ce49f190 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -12,11 +12,13 @@ from flox.core import ( _convert_expected_groups_to_index, _get_optimal_chunks_for_groups, + _normalize_indexes, factorize_, find_group_cohorts, groupby_reduce, rechunk_for_cohorts, reindex_, + subset_to_blocks, ) from . import assert_equal, engine, has_dask, raise_if_dask_computes, requires_dask @@ -1035,3 +1037,33 @@ def test_dtype(func, dtype, engine): labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) actual, _ = groupby_reduce(arr, labels, func=func, dtype=np.float64) assert actual.dtype == np.dtype("float64") + + +@requires_dask +def test_subset_blocks(): + array = dask.array.random.random((120,), chunks=(4,)) + + blockid = (0, 3, 6, 9, 12, 15, 18, 21, 24, 27) + subset = subset_to_blocks(array, blockid) + assert subset.blocks.shape == (len(blockid),) + + +@pytest.mark.parametrize( + "flatblocks, expected", + ( + ((0, 1, 2, 3, 4), [{}, (slice(None),)]), + ((1, 2, 3), [{}, (slice(1, 4),)]), + ((1, 3), [{-1: (1, 3)}, tuple()]), + ), +) +def test_normalize_block_indexing(flatblocks, expected): + ndim = 1 + nblocks = 5 + array = dask.array.ones((nblocks,) * ndim, chunks=(1,) * ndim) + alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) + + if expected[0]: + expected[0] = {ndim - 1: v for k, v in expected[0].items() if k == -1} + + assert alliter == expected[0] + assert noiter == expected[1] From 4842e680fd7ce2b19e4a4661c6c2182b6e5786ed Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 21:08:42 -0600 Subject: [PATCH 02/21] Better tests --- tests/test_core.py | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 6ce49f190..ae8e8456c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1053,7 +1053,7 @@ def test_subset_blocks(): ( ((0, 1, 2, 3, 4), [{}, (slice(None),)]), ((1, 2, 3), [{}, (slice(1, 4),)]), - ((1, 3), [{-1: (1, 3)}, tuple()]), + ((1, 3), [{-1: (1, 3)}, (slice(None),)]), ), ) def test_normalize_block_indexing(flatblocks, expected): @@ -1065,5 +1065,31 @@ def test_normalize_block_indexing(flatblocks, expected): if expected[0]: expected[0] = {ndim - 1: v for k, v in expected[0].items() if k == -1} - assert alliter == expected[0] + assert alliter.keys() == expected[0].keys() + for actual, expect in zip(alliter.values(), expected[0].values()): + np.testing.assert_array_equal(actual, expect) assert noiter == expected[1] + + +def test_subset_block_fastpath(): + # full slice pass through + array = dask.array.ones((5,), chunks=(1,)) + subset = subset_to_blocks(array, np.arange(5)) + assert subset.name == array.name + + array = dask.array.ones((5, 5), chunks=1) + subset = subset_to_blocks(array, np.arange(25)) + assert subset.name == array.name + + # two slices become one block layer + subset = subset_to_blocks(array, np.arange(10)) + assert len(subset.dask.layers) == 2 + + # no overlap in range along the two dimensions + # Add two new layers + subset = subset_to_blocks(array, [0, 7]) + assert len(subset.dask.layers) == 3 + + # one slice, one iterable + subset = subset_to_blocks(array, np.arange(7)) + assert len(subset.dask.layers) == 2 From ea666680cc68ef484e1211abcefc43b50ba00665 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 13 Oct 2022 21:11:10 -0600 Subject: [PATCH 03/21] Add requires_dask --- tests/test_core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index ae8e8456c..15e10d534 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1048,6 +1048,7 @@ def test_subset_blocks(): assert subset.blocks.shape == (len(blockid),) +@requires_dask @pytest.mark.parametrize( "flatblocks, expected", ( @@ -1071,6 +1072,7 @@ def test_normalize_block_indexing(flatblocks, expected): assert noiter == expected[1] +@requires_dask def test_subset_block_fastpath(): # full slice pass through array = dask.array.ones((5,), chunks=(1,)) From 8ce649950f1d0eaf468a0a26152740d83e3129c6 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 07:06:32 -0600 Subject: [PATCH 04/21] Add comment. --- flox/core.py | 9 +++++++++ tests/test_core.py | 7 ++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 8fe7581e8..6b92f3b72 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1047,6 +1047,15 @@ def _reduce_blockwise( def _normalize_indexes(array, flatblocks, blkshape): + """ + .blocks accessor can only accept one iterable at a time, + but can handle multiple slices. + To minimize tasks and layers, we normalize to produce slices + along as many axes as possible, and then repeatedly apply + any remaining iterables in a loop. + + TODO: move this upstream + """ unraveled = np.unravel_index(flatblocks, blkshape) normalized: list[Union[int, np.ndarray, slice]] = [] diff --git a/tests/test_core.py b/tests/test_core.py index 15e10d534..3ee192617 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1073,12 +1073,13 @@ def test_normalize_block_indexing(flatblocks, expected): @requires_dask -def test_subset_block_fastpath(): +def test_subset_block_minimizes_layers(): # full slice pass through array = dask.array.ones((5,), chunks=(1,)) subset = subset_to_blocks(array, np.arange(5)) assert subset.name == array.name + # another full slice pass through array = dask.array.ones((5, 5), chunks=1) subset = subset_to_blocks(array, np.arange(25)) assert subset.name == array.name @@ -1087,6 +1088,10 @@ def test_subset_block_fastpath(): subset = subset_to_blocks(array, np.arange(10)) assert len(subset.dask.layers) == 2 + # should become two slices and one block layer + subset = subset_to_blocks(array, np.arange(8)) + assert len(subset.dask.layers) == 2 + # no overlap in range along the two dimensions # Add two new layers subset = subset_to_blocks(array, [0, 7]) From b6b5b70917f1ab6fd9e7de883d148bdb301cd99d Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 10:57:17 -0600 Subject: [PATCH 05/21] More optimization - Apply one iterable, if present, along with slices. --- flox/core.py | 13 ++++++++++--- tests/test_core.py | 21 ++++++++++++++++++++- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 6b92f3b72..d13368437 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1069,14 +1069,21 @@ def _normalize_indexes(array, flatblocks, blkshape): elif np.array_equal(i, np.arange(i[0], i[-1] + 1)): normalized.append(slice(i[0], i[-1] + 1)) else: - normalized.append(i) + normalized.append(list(i)) full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized) # has no iterables - noiter = tuple(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized) + noiter = list(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized) # has all iterables alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")} - return alliter, noiter + + # merge in one iterable with noiter to reduce blocks layer by 1. + for k, v in alliter.items(): + if noiter[k] == slice(None): + noiter[k] = alliter.pop(k) + break + + return alliter, tuple(noiter) def subset_to_blocks( diff --git a/tests/test_core.py b/tests/test_core.py index 3ee192617..c03cb2e82 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1093,8 +1093,27 @@ def test_subset_block_minimizes_layers(): assert len(subset.dask.layers) == 2 # no overlap in range along the two dimensions - # Add two new layers + # but should be optimized to a single .blocks call + # same column + subset = subset_to_blocks(array, [0, 10]) + assert len(subset.dask.layers) == 2 + + # different rows and columns + # but optimization to slice possible subset = subset_to_blocks(array, [0, 7]) + assert len(subset.dask.layers) == 2 + + subset = subset_to_blocks(array, [0, 7, 9]) + assert len(subset.dask.layers) == 2 + + subset = subset_to_blocks(array, [0, 7, 8, 9]) + assert len(subset.dask.layers) == 2 + + subset = subset_to_blocks(array, [0, 6, 12, 14]) + assert len(subset.dask.layers) == 2 + + # no optimizations possible + subset = subset_to_blocks(array, [0, 12, 14, 19]) assert len(subset.dask.layers) == 3 # one slice, one iterable From 5b4ac42a4743507e961b7e4e6333dd286f5cdb58 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 11:24:29 -0600 Subject: [PATCH 06/21] More benchmarks --- asv_bench/benchmarks/cohorts.py | 34 ++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 9062611f1..91c1e5db9 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -45,16 +45,36 @@ def setup(self, *args, **kwargs): self.axis = (-2, -1) -class ERA5(Cohorts): +class ERA5Dataset: """ERA5""" - def setup(self, *args, **kwargs): - time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H")) - - self.by = time.dt.dayofyear.values + def __init__(self, *args, **kwargs): + self.time = pd.Series(pd.date_range("2016-01-01", "2018-12-31 23:59", freq="H")) self.axis = (-1,) + self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 48)) - array = dask.array.random.random((721, 1440, len(time)), chunks=(-1, -1, 48)) + def rechunk(self): self.array = flox.core.rechunk_for_cohorts( - array, -1, self.by, force_new_chunk_at=[1], chunksize=48, ignore_old_chunks=True + self.array, -1, self.by, force_new_chunk_at=[1], chunksize=48, ignore_old_chunks=True + ) + + +class ERA5DayOfYear(ERA5Dataset, Cohorts): + def setup(self, *args, **kwargs): + super().__init__() + self.by = self.time.dt.dayofyear.values + super().rechunk() + + +class Era5MonthHour(ERA5Dataset, Cohorts): + def setup(self, *args, **kwargs): + super().__init__() + by = (self.time.dt.month.values, self.time.dt.hour.values) + ret = flox.core._factorize_multiple( + by, + expected_groups=(pd.Index(np.arange(1, 13)), pd.Index(np.arange(1, 25))), + by_is_dask=False, + reindex=False, ) + self.by = ret[0][0] + # self.rechunk() From 5e5d9510e6a31d7a37e8e71a002e3e8ade65dab9 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 11:50:13 -0600 Subject: [PATCH 07/21] Update test --- tests/test_core.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_core.py b/tests/test_core.py index c03cb2e82..a112b4554 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1048,6 +1048,26 @@ def test_subset_blocks(): assert subset.blocks.shape == (len(blockid),) +@requires_dask +@pytest.mark.parametrize( + "flatblocks, expected", + ( + ((0, 1, 2, 3, 4), (slice(None),)), + ((1, 2, 3), (slice(1, 4),)), + # gets optimized + ((1, 3), ([1, 3],)), + # gets optimized + ((0, 1, 3), ([0, 1, 3],)), + ), +) +def test_normalize_block_indexing_1d(flatblocks, expected): + nblocks = 5 + array = dask.array.ones((nblocks,), chunks=(1,)) + alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) + assert alliter == {} + assert noiter == expected + + @requires_dask @pytest.mark.parametrize( "flatblocks, expected", From 6916f89ee02e3db001faaff2616dc0a7330bf743 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 12:21:00 -0600 Subject: [PATCH 08/21] WIP commit to fix tests --- tests/test_core.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index a112b4554..d856b7291 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1069,17 +1069,21 @@ def test_normalize_block_indexing_1d(flatblocks, expected): @requires_dask +@pytest.mark.xfail @pytest.mark.parametrize( "flatblocks, expected", ( ((0, 1, 2, 3, 4), [{}, (slice(None),)]), ((1, 2, 3), [{}, (slice(1, 4),)]), - ((1, 3), [{-1: (1, 3)}, (slice(None),)]), + # gets optimized + ((1, 3), [{}, ([1, 3],)]), + # gets optimized + ((0, 1, 3), [{}, ([0, 1, 3],)]), ), ) -def test_normalize_block_indexing(flatblocks, expected): - ndim = 1 +def test_normalize_block_indexing_2d(flatblocks, expected): nblocks = 5 + ndim = 2 array = dask.array.ones((nblocks,) * ndim, chunks=(1,) * ndim) alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) From 236e37f0ae1481bcbc036c826f8ad839faa58446 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 13:38:26 -0600 Subject: [PATCH 09/21] Fix rechunk_to_cohorts bug --- flox/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index d13368437..683e16548 100644 --- a/flox/core.py +++ b/flox/core.py @@ -288,7 +288,7 @@ def rechunk_for_cohorts( divisions = [] counter = 1 for idx, lab in enumerate(labels): - if lab in force_new_chunk_at: + if lab in force_new_chunk_at or idx == 0: divisions.append(idx) counter = 1 continue @@ -305,6 +305,7 @@ def rechunk_for_cohorts( divisions.append(idx) counter = 1 continue + counter += 1 divisions.append(len(labels)) @@ -313,6 +314,9 @@ def rechunk_for_cohorts( print(labels_at_breaks[:40]) newchunks = tuple(np.diff(divisions)) + if debug: + print(divisions[:10], newchunks[:10]) + print(divisions[-10:], newchunks[-10:]) assert sum(newchunks) == len(labels) if newchunks == array.chunks[axis]: From 13624ed83a333cd4899829401db10834ce681470 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 13:38:41 -0600 Subject: [PATCH 10/21] Add rechunked cohorts benchmarks --- asv_bench/benchmarks/cohorts.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 91c1e5db9..28c75f949 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -63,10 +63,15 @@ class ERA5DayOfYear(ERA5Dataset, Cohorts): def setup(self, *args, **kwargs): super().__init__() self.by = self.time.dt.dayofyear.values + + +class ERA5DayOfYearRechunked(ERA5DayOfYear, Cohorts): + def setup(self, *args, **kwargs): + super().setup() super().rechunk() -class Era5MonthHour(ERA5Dataset, Cohorts): +class ERA5MonthHour(ERA5Dataset, Cohorts): def setup(self, *args, **kwargs): super().__init__() by = (self.time.dt.month.values, self.time.dt.hour.values) @@ -76,5 +81,11 @@ def setup(self, *args, **kwargs): by_is_dask=False, reindex=False, ) - self.by = ret[0][0] - # self.rechunk() + # Add one so the rechunk code is simpler and makes sense + self.by = ret[0][0] + 1 + + +class ERA5MonthHourRechunked(ERA5MonthHour, Cohorts): + def setup(self, *args, **kwargs): + super().setup() + super().rechunk() From 1a12676b2dbfba6816d89def863e24ad6bee3f34 Mon Sep 17 00:00:00 2001 From: dcherian Date: Fri, 14 Oct 2022 13:46:13 -0600 Subject: [PATCH 11/21] Track num tasks after optimization --- asv_bench/benchmarks/cohorts.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 28c75f949..f848a7d7b 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -29,6 +29,13 @@ def track_num_tasks(self): )[0] return len(result.dask.to_dict()) + def track_num_tasks_optimized(self): + result = flox.groupby_reduce( + self.array, self.by, func="sum", axis=self.axis, method="cohorts" + )[0] + (opt,) = dask.optimize(result) + return len(opt.dask.to_dict()) + track_num_tasks.unit = "tasks" From e2ea072c0cbb5e74baeb22921bc451e64f2aa73f Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 06:54:45 -0600 Subject: [PATCH 12/21] update 2D tests --- tests/test_core.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index d856b7291..0fc9fa848 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1069,16 +1069,19 @@ def test_normalize_block_indexing_1d(flatblocks, expected): @requires_dask -@pytest.mark.xfail @pytest.mark.parametrize( "flatblocks, expected", ( - ((0, 1, 2, 3, 4), [{}, (slice(None),)]), - ((1, 2, 3), [{}, (slice(1, 4),)]), + ((0, 1, 2, 3, 4), [{}, (0, slice(None))]), + ((1, 2, 3), [{}, (0, slice(1, 4))]), + # gets optimized + ((1, 3), [{}, (0, [1, 3])]), # gets optimized - ((1, 3), [{}, ([1, 3],)]), + ((0, 1, 3), [{}, (0, [0, 1, 3])]), # gets optimized - ((0, 1, 3), [{}, ([0, 1, 3],)]), + (tuple(range(10)), [{}, (slice(0, 2), slice(None))]), + ((0, 1, 3, 5, 6, 8), [{}, (slice(0, 2), [0, 1, 3])]), + ((0, 3, 4, 5, 6, 8, 24), [{1: [0, 1, 3, 4]}, ([0, 1, 4], slice(None))]), ), ) def test_normalize_block_indexing_2d(flatblocks, expected): @@ -1088,7 +1091,7 @@ def test_normalize_block_indexing_2d(flatblocks, expected): alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) if expected[0]: - expected[0] = {ndim - 1: v for k, v in expected[0].items() if k == -1} + expected[0] = {(ndim - 1 if k == -1 else k): v for k, v in expected[0].items()} assert alliter.keys() == expected[0].keys() for actual, expect in zip(alliter.values(), expected[0].values()): From cccf1cfcc059ac8642f194db32bb4bf57256f70a Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 07:00:04 -0600 Subject: [PATCH 13/21] Add PerfectMonthly benchmark --- asv_bench/benchmarks/cohorts.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index f848a7d7b..d0833a6e9 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -96,3 +96,19 @@ class ERA5MonthHourRechunked(ERA5MonthHour, Cohorts): def setup(self, *args, **kwargs): super().setup() super().rechunk() + + +class PerfectMonthly(Cohorts): + """Perfectly chunked for a "cohorts" monthly mean climatology""" + + def setup(self, *args, **kwargs): + self.time = pd.Series(pd.date_range("2001-01-01", "2018-12-31 23:59", freq="M")) + self.axis = (-1,) + self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4)) + self.by = self.time.dt.month.values + + +class PerfectMonthlyRechunked(PerfectMonthly): + def setup(self, *args, **kwargs): + super().setup() + super().rechunk() From 74eecd2b06d1bc6854b1e414f54a9672edb5bcb2 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 19:57:35 -0600 Subject: [PATCH 14/21] Update benchmark --- asv_bench/benchmarks/cohorts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index d0833a6e9..c3ce1ec5d6 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -102,7 +102,7 @@ class PerfectMonthly(Cohorts): """Perfectly chunked for a "cohorts" monthly mean climatology""" def setup(self, *args, **kwargs): - self.time = pd.Series(pd.date_range("2001-01-01", "2018-12-31 23:59", freq="M")) + self.time = pd.Series(pd.date_range("1961-01-01", "2018-12-31 23:59", freq="M")) self.axis = (-1,) self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4)) self.by = self.time.dt.month.values From f0a814af3b1b28f2a905ee2c9546024598eaf141 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 19:58:22 -0600 Subject: [PATCH 15/21] Orthogonal indexing of blocks Reduces num_tasks some more NWMMidwest --- flox/core.py | 49 ++++++++++++++++++++++++++++------------------ tests/test_core.py | 2 +- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/flox/core.py b/flox/core.py index 683e16548..04bfb4a54 100644 --- a/flox/core.py +++ b/flox/core.py @@ -6,6 +6,7 @@ import operator from collections import namedtuple from functools import partial, reduce +from numbers import Number from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Sequence, Union import numpy as np @@ -1081,13 +1082,11 @@ def _normalize_indexes(array, flatblocks, blkshape): # has all iterables alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")} - # merge in one iterable with noiter to reduce blocks layer by 1. - for k, v in alliter.items(): - if noiter[k] == slice(None): - noiter[k] = alliter.pop(k) - break + mesh = dict(zip(alliter.keys(), np.ix_(*alliter.values()))) - return alliter, tuple(noiter) + full_tuple = tuple(i if ax not in mesh else mesh[ax] for ax, i in enumerate(noiter)) + + return full_tuple, alliter, tuple(noiter) def subset_to_blocks( @@ -1106,25 +1105,37 @@ def subset_to_blocks( ------- dask.array """ + import dask.array + from dask.array.slicing import normalize_index + from dask.base import tokenize + from dask.highlevelgraph import HighLevelGraph + if blkshape is None: blkshape = array.blocks.shape - alliter, noiter = _normalize_indexes(array, flatblocks, blkshape) + index, alliter, noiter = _normalize_indexes(array, flatblocks, blkshape) - # apply everything but the iterables - if not all(i == slice(None) for i in noiter): - subset = array.blocks[noiter] - else: - subset = array + if all(i == slice(None) for i in noiter) and not alliter: + return array - for ax, inds in alliter.items(): - if isinstance(inds, slice): - continue - idxr = [slice(None, None)] * array.ndim - idxr[ax] = inds - subset = subset.blocks[tuple(idxr)] + index = normalize_index(index, array.numblocks) + index = tuple(slice(k, k + 1) if isinstance(k, Number) else k for k in index) + + name = "blocks-" + tokenize(array, index) + + squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index) + + new_keys = array._key_array[index] + + chunks = tuple(tuple(np.array(c)[i].tolist()) for c, i in zip(array.chunks, squeezed)) + + keys = itertools.product(*(range(len(c)) for c in chunks)) + + layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} + + graph = HighLevelGraph.from_collections(name, layer, dependencies=[array]) - return subset + return dask.array.Array(graph, name, chunks, meta=array) def _extract_unknown_groups(reduced, group_chunks, dtype) -> tuple[DaskArray]: diff --git a/tests/test_core.py b/tests/test_core.py index 0fc9fa848..e2092bd80 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1141,7 +1141,7 @@ def test_subset_block_minimizes_layers(): # no optimizations possible subset = subset_to_blocks(array, [0, 12, 14, 19]) - assert len(subset.dask.layers) == 3 + assert len(subset.dask.layers) == 2 # one slice, one iterable subset = subset_to_blocks(array, np.arange(7)) From b0dfdb04a3cf7e65eb06d46918578e7f432fab39 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 20:14:24 -0600 Subject: [PATCH 16/21] Track number of layers --- asv_bench/benchmarks/cohorts.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index c3ce1ec5d6..0e09b3c2d 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -36,7 +36,15 @@ def track_num_tasks_optimized(self): (opt,) = dask.optimize(result) return len(opt.dask.to_dict()) + def track_num_layers(self): + result = flox.groupby_reduce( + self.array, self.by, func="sum", axis=self.axis, method="cohorts" + )[0] + return len(result.dask.layers) + track_num_tasks.unit = "tasks" + track_num_tasks_optimized.unit = "tasks" + track_num_layers.unit = "layers" class NWMMidwest(Cohorts): From 1d517606d93bd488b61a0c4e602cdd5b5fcbee04 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sat, 15 Oct 2022 21:05:37 -0600 Subject: [PATCH 17/21] Fix benchmark --- asv_bench/benchmarks/cohorts.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 0e09b3c2d..2c19c881f 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -115,6 +115,11 @@ def setup(self, *args, **kwargs): self.array = dask.array.random.random((721, 1440, len(self.time)), chunks=(-1, -1, 4)) self.by = self.time.dt.month.values + def rechunk(self): + self.array = flox.core.rechunk_for_cohorts( + self.array, -1, self.by, force_new_chunk_at=[1], chunksize=4, ignore_old_chunks=True + ) + class PerfectMonthlyRechunked(PerfectMonthly): def setup(self, *args, **kwargs): From c71a84c4023b47976506045df7ca2cf60ebd9a19 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 16 Oct 2022 13:09:55 -0600 Subject: [PATCH 18/21] Rework indexing --- flox/core.py | 12 ++---- tests/__init__.py | 12 ++++++ tests/test_core.py | 97 +++++++++++++++++++--------------------------- 3 files changed, 55 insertions(+), 66 deletions(-) diff --git a/flox/core.py b/flox/core.py index 04bfb4a54..ff0780104 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1086,7 +1086,7 @@ def _normalize_indexes(array, flatblocks, blkshape): full_tuple = tuple(i if ax not in mesh else mesh[ax] for ax, i in enumerate(noiter)) - return full_tuple, alliter, tuple(noiter) + return full_tuple def subset_to_blocks( @@ -1113,26 +1113,22 @@ def subset_to_blocks( if blkshape is None: blkshape = array.blocks.shape - index, alliter, noiter = _normalize_indexes(array, flatblocks, blkshape) + index = _normalize_indexes(array, flatblocks, blkshape) - if all(i == slice(None) for i in noiter) and not alliter: + if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index): return array index = normalize_index(index, array.numblocks) index = tuple(slice(k, k + 1) if isinstance(k, Number) else k for k in index) name = "blocks-" + tokenize(array, index) - - squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index) - new_keys = array._key_array[index] + squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index) chunks = tuple(tuple(np.array(c)[i].tolist()) for c, i in zip(array.chunks, squeezed)) keys = itertools.product(*(range(len(c)) for c in chunks)) - layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} - graph = HighLevelGraph.from_collections(name, layer, dependencies=[array]) return dask.array.Array(graph, name, chunks, meta=array) diff --git a/tests/__init__.py b/tests/__init__.py index 0cd967d11..b1a266652 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -115,6 +115,18 @@ def assert_equal(a, b, tolerance=None): np.testing.assert_allclose(a, b, equal_nan=True, **tolerance) +def assert_equal_tuple(a, b): + """assert_equal for .blocks indexing tuples""" + assert len(a) == len(b) + + for a_, b_ in zip(a, b): + assert type(a_) == type(b_) + if isinstance(a_, np.ndarray): + np.testing.assert_array_equal(a_, b_) + else: + assert a_ == b_ + + @pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) def engine(request): if request.param == "numba": diff --git a/tests/test_core.py b/tests/test_core.py index e2092bd80..bfb8ef52d 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -21,7 +21,14 @@ subset_to_blocks, ) -from . import assert_equal, engine, has_dask, raise_if_dask_computes, requires_dask +from . import ( + assert_equal, + assert_equal_tuple, + engine, + has_dask, + raise_if_dask_computes, + requires_dask, +) labels = np.array([0, 0, 2, 2, 2, 1, 1, 2, 2, 1, 1, 0]) nan_labels = labels.astype(float) # copy @@ -1063,86 +1070,60 @@ def test_subset_blocks(): def test_normalize_block_indexing_1d(flatblocks, expected): nblocks = 5 array = dask.array.ones((nblocks,), chunks=(1,)) - alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) - assert alliter == {} - assert noiter == expected + expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected) + actual = _normalize_indexes(array, flatblocks, array.blocks.shape) + assert_equal_tuple(expected, actual) @requires_dask @pytest.mark.parametrize( "flatblocks, expected", ( - ((0, 1, 2, 3, 4), [{}, (0, slice(None))]), - ((1, 2, 3), [{}, (0, slice(1, 4))]), - # gets optimized - ((1, 3), [{}, (0, [1, 3])]), - # gets optimized - ((0, 1, 3), [{}, (0, [0, 1, 3])]), - # gets optimized - (tuple(range(10)), [{}, (slice(0, 2), slice(None))]), - ((0, 1, 3, 5, 6, 8), [{}, (slice(0, 2), [0, 1, 3])]), - ((0, 3, 4, 5, 6, 8, 24), [{1: [0, 1, 3, 4]}, ([0, 1, 4], slice(None))]), + ((0, 1, 2, 3, 4), (0, slice(None))), + ((1, 2, 3), (0, slice(1, 4))), + ((1, 3), (0, [1, 3])), + ((0, 1, 3), (0, [0, 1, 3])), + (tuple(range(10)), (slice(0, 2), slice(None))), + ((0, 1, 3, 5, 6, 8), (slice(0, 2), [0, 1, 3])), + ((0, 3, 4, 5, 6, 8, 24), np.ix_([0, 1, 4], [0, 1, 3, 4])), ), ) def test_normalize_block_indexing_2d(flatblocks, expected): nblocks = 5 ndim = 2 array = dask.array.ones((nblocks,) * ndim, chunks=(1,) * ndim) - alliter, noiter = _normalize_indexes(array, flatblocks, array.blocks.shape) - - if expected[0]: - expected[0] = {(ndim - 1 if k == -1 else k): v for k, v in expected[0].items()} - - assert alliter.keys() == expected[0].keys() - for actual, expect in zip(alliter.values(), expected[0].values()): - np.testing.assert_array_equal(actual, expect) - assert noiter == expected[1] + expected = tuple(np.array(i) if isinstance(i, list) else i for i in expected) + actual = _normalize_indexes(array, flatblocks, array.blocks.shape) + assert_equal_tuple(expected, actual) @requires_dask -def test_subset_block_minimizes_layers(): +def test_subset_block_passthrough(): # full slice pass through array = dask.array.ones((5,), chunks=(1,)) subset = subset_to_blocks(array, np.arange(5)) assert subset.name == array.name - # another full slice pass through array = dask.array.ones((5, 5), chunks=1) subset = subset_to_blocks(array, np.arange(25)) assert subset.name == array.name - # two slices become one block layer - subset = subset_to_blocks(array, np.arange(10)) - assert len(subset.dask.layers) == 2 - - # should become two slices and one block layer - subset = subset_to_blocks(array, np.arange(8)) - assert len(subset.dask.layers) == 2 - - # no overlap in range along the two dimensions - # but should be optimized to a single .blocks call - # same column - subset = subset_to_blocks(array, [0, 10]) - assert len(subset.dask.layers) == 2 - - # different rows and columns - # but optimization to slice possible - subset = subset_to_blocks(array, [0, 7]) - assert len(subset.dask.layers) == 2 - - subset = subset_to_blocks(array, [0, 7, 9]) - assert len(subset.dask.layers) == 2 - - subset = subset_to_blocks(array, [0, 7, 8, 9]) - assert len(subset.dask.layers) == 2 - subset = subset_to_blocks(array, [0, 6, 12, 14]) - assert len(subset.dask.layers) == 2 - - # no optimizations possible - subset = subset_to_blocks(array, [0, 12, 14, 19]) - assert len(subset.dask.layers) == 2 - - # one slice, one iterable - subset = subset_to_blocks(array, np.arange(7)) +@requires_dask +@pytest.mark.parametrize( + "flatblocks, expectidx", + [ + (np.arange(10), (slice(2), slice(None))), + (np.arange(8), (slice(2), slice(None))), + ([0, 10], ([0, 2], slice(1))), + ([0, 7], (slice(2), [0, 2])), + ([0, 7, 9], (slice(2), [0, 2, 4])), + ([0, 6, 12, 14], (slice(3), [0, 1, 2, 4])), + ([0, 12, 14, 19], np.ix_([0, 2, 3], [0, 2, 4])), + ], +) +def test_subset_block_2d(flatblocks, expectidx): + array = dask.array.from_array(np.arange(25).reshape((5, 5)), chunks=1) + subset = subset_to_blocks(array, flatblocks) assert len(subset.dask.layers) == 2 + assert_equal(subset, array.compute()[expectidx]) From 160257de599e45ebc09b7d0491b4a03cdff9d1fb Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 16 Oct 2022 13:14:04 -0600 Subject: [PATCH 19/21] Fix mypy --- flox/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index ff0780104..c08ddf5b7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -6,7 +6,7 @@ import operator from collections import namedtuple from functools import partial, reduce -from numbers import Number +from numbers import Integral from typing import TYPE_CHECKING, Any, Callable, Dict, Literal, Mapping, Sequence, Union import numpy as np @@ -1119,7 +1119,7 @@ def subset_to_blocks( return array index = normalize_index(index, array.numblocks) - index = tuple(slice(k, k + 1) if isinstance(k, Number) else k for k in index) + index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index) name = "blocks-" + tokenize(array, index) new_keys = array._key_array[index] From 0ec8be034b7a8a56ac8d47c0e84e1050bd750f22 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 16 Oct 2022 13:17:18 -0600 Subject: [PATCH 20/21] small cleanup --- tests/test_core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index bfb8ef52d..e31f11e56 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1061,9 +1061,7 @@ def test_subset_blocks(): ( ((0, 1, 2, 3, 4), (slice(None),)), ((1, 2, 3), (slice(1, 4),)), - # gets optimized ((1, 3), ([1, 3],)), - # gets optimized ((0, 1, 3), ([0, 1, 3],)), ), ) From 1782bd802c2d8ca72ed50df87307394aae832192 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 16 Oct 2022 13:37:30 -0600 Subject: [PATCH 21/21] small comment --- flox/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flox/core.py b/flox/core.py index c08ddf5b7..6bd390137 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1118,6 +1118,7 @@ def subset_to_blocks( if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index): return array + # These rest is copied from dask.array.core.py with slight modifications index = normalize_index(index, array.numblocks) index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index)