From ebcd06c7dc70ae7710eeea47833e05503c445027 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 13:20:34 -0600 Subject: [PATCH] fix tests --- flox/core.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index eb6ed13b..d1ac8815 100644 --- a/flox/core.py +++ b/flox/core.py @@ -639,7 +639,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> DaskArray: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -672,11 +674,16 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> return array Δn = abs(len(newchunks) - len(chunks)) - if (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) and ( - abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + if force or ( + (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) ): # Less than 25% change in number of chunks, let's do it return array.rechunk({axis: newchunks}) + else: + return array def reindex_( @@ -2496,7 +2503,7 @@ def groupby_reduce( ): # Let's try rechunking for sorted 1D by. (single_axis,) = axis_ - array = rechunk_for_blockwise(array, single_axis, by_) + array = rechunk_for_blockwise(array, single_axis, by_, force=False) if _is_first_last_reduction(func): if has_dask and nax != 1: