From 247824d0fec36d6c4808cd2bc0160bba4282f509 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 20 Jan 2024 13:40:58 -0700 Subject: [PATCH 01/10] Use threadpool for finding labels in chunk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Great when we have lots of decent size chunks, particularly the NWM county groupby: 600ms -> 400ms. ``` | Before [0cccb903] | After [38fe8a6c] | Ratio | Benchmark (Parameter) | |--------------------------------------|---------------------------------|---------|---------------------------------------------| | 3.50±0.2ms | 2.93±0.07ms | 0.84 | cohorts.PerfectMonthly.time_graph_construct | | 20.0±1ms | 9.66±1ms | 0.48 | cohorts.NWMMidwest.time_find_group_cohorts | ``` --- flox/core.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/flox/core.py b/flox/core.py index 094a8867c..c7455a05a 100644 --- a/flox/core.py +++ b/flox/core.py @@ -8,6 +8,7 @@ import warnings from collections import namedtuple from collections.abc import Sequence +from concurrent.futures import ThreadPoolExecutor from functools import partial, reduce from itertools import product from numbers import Integral @@ -241,27 +242,43 @@ def _compute_label_chunk_bitmask(labels, chunks, nlabels): assert isinstance(labels, np.ndarray) shape = tuple(sum(c) for c in chunks) nchunks = math.prod(len(c) for c in chunks) + approx_chunk_size = math.prod(c[0] for c in chunks) labels = np.broadcast_to(labels, shape[-labels.ndim :]) - - cols = [] - # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) - for region in slices_from_chunks(chunks): + + def chunk_unique(labels, slicer, nlabels, label_is_present=None): + if label_is_present is None: + label_is_present = np.zeros((nlabels + 1,), dtype=bool) + subset = labels[slicer] # This is a quite fast way to find unique integers, when we know how many there are # inspired by a similar idea in numpy_groupies for first, last # instead of explicitly finding uniques, repeatedly write True to the same location - subset = labels[region] - # The reshape is not strictly necessary but is about 100ms faster on a test problem. label_is_present[subset.reshape(-1)] = True # skip the -1 sentinel by slicing # Faster than np.argwhere by a lot uniques = ilabels[label_is_present[:-1]] - cols.append(uniques) - label_is_present[:] = False - rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) + return uniques + + if nlabels < approx_chunk_size: + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(chunk_unique, labels, slicer, nlabels) + for slicer in slices_from_chunks(chunks) + ] + cols = tuple(f.result() for f in futures) + + else: + cols = [] + # Add one to handle the -1 sentinel value + label_is_present = np.zeros((nlabels + 1,), dtype=bool) + for region in slices_from_chunks(chunks): + uniques = chunk_unique(labels, region, nlabels, label_is_present) + cols.append(uniques) + label_is_present[:] = False + cols_array = np.concatenate(cols) + rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) data = np.broadcast_to(np.array(1, dtype=np.uint8), rows_array.shape) bitmask = csc_array((data, (rows_array, cols_array)), dtype=bool, shape=(nchunks, nlabels)) From 7e8e717423c5be35636fb4cadc394955c4c74180 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 20 Jan 2024 15:23:35 -0700 Subject: [PATCH 02/10] Add threshold --- flox/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index c7455a05a..cdb2b4ab8 100644 --- a/flox/core.py +++ b/flox/core.py @@ -260,7 +260,8 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): uniques = ilabels[label_is_present[:-1]] return uniques - if nlabels < approx_chunk_size: + if nlabels < 5 * approx_chunk_size: + logger.debug("Using threadpool since %s < 5 * %s", nlabels, approx_chunk_size) with ThreadPoolExecutor() as executor: futures = [ executor.submit(chunk_unique, labels, slicer, nlabels) From 84508130e4384d7b0a3ea51424b170d11294ad00 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Sat, 20 Jan 2024 20:37:46 -0700 Subject: [PATCH 03/10] Fix + comment --- flox/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index cdb2b4ab8..2d2915207 100644 --- a/flox/core.py +++ b/flox/core.py @@ -260,7 +260,8 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): uniques = ilabels[label_is_present[:-1]] return uniques - if nlabels < 5 * approx_chunk_size: + # TODO: needs a better heuristic + if nlabels < 2 * approx_chunk_size: logger.debug("Using threadpool since %s < 5 * %s", nlabels, approx_chunk_size) with ThreadPoolExecutor() as executor: futures = [ From f4556896e2f1f02a6f95f6875bf041c879863d04 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 25 Apr 2024 21:45:03 -0600 Subject: [PATCH 04/10] Fix benchmark. --- asv_bench/benchmarks/cohorts.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/asv_bench/benchmarks/cohorts.py b/asv_bench/benchmarks/cohorts.py index 7d74f8849..c0162ccca 100644 --- a/asv_bench/benchmarks/cohorts.py +++ b/asv_bench/benchmarks/cohorts.py @@ -1,3 +1,5 @@ +from functools import cached_property + import dask import numpy as np import pandas as pd @@ -11,6 +13,10 @@ class Cohorts: def setup(self, *args, **kwargs): raise NotImplementedError + @cached_property + def dask(self): + return flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)[0].dask + def containment(self): asfloat = self.bitmask().astype(float) chunks_per_label = asfloat.sum(axis=0) @@ -43,26 +49,17 @@ def time_find_group_cohorts(self): pass def time_graph_construct(self): - flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis, method="cohorts") + flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis) def track_num_tasks(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.to_dict()) + return len(self.dask.to_dict()) def track_num_tasks_optimized(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - (opt,) = dask.optimize(result) - return len(opt.dask.to_dict()) + (opt,) = dask.optimize(self.dask) + return len(opt.to_dict()) def track_num_layers(self): - result = flox.groupby_reduce( - self.array, self.by, func="sum", axis=self.axis, method="cohorts" - )[0] - return len(result.dask.layers) + return len(self.dask.layers) track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy From 282367797723be52edecd42913202926d36b52f6 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 25 Apr 2024 21:56:24 -0600 Subject: [PATCH 05/10] Tweak threshold --- flox/core.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index f6d651213..fcecff233 100644 --- a/flox/core.py +++ b/flox/core.py @@ -271,8 +271,14 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): return uniques # TODO: needs a better heuristic - if nlabels < 2 * approx_chunk_size: - logger.debug("Using threadpool since %s < 5 * %s", nlabels, approx_chunk_size) + THRESHOLD = 2 + if nlabels < THRESHOLD * approx_chunk_size: + logger.debug( + "Using threadpool since num_labels %s < %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) with ThreadPoolExecutor() as executor: futures = [ executor.submit(chunk_unique, labels, slicer, nlabels) @@ -281,6 +287,12 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): cols = tuple(f.result() for f in futures) else: + logger.debug( + "Using serial loop since num_labels %s > %d * chunksize %s", + nlabels, + THRESHOLD, + approx_chunk_size, + ) cols = [] # Add one to handle the -1 sentinel value label_is_present = np.zeros((nlabels + 1,), dtype=bool) From 630e0830b7326348ac6d355238329cb83100c547 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:20:59 -0600 Subject: [PATCH 06/10] Small cleanup --- flox/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index 26c03efbe..8b2f641e7 100644 --- a/flox/core.py +++ b/flox/core.py @@ -273,7 +273,8 @@ def make_bitmask(rows, cols): def chunk_unique(labels, slicer, nlabels, label_is_present=None): if label_is_present is None: - label_is_present = np.zeros((nlabels + 1,), dtype=bool) + label_is_present = np.empty((nlabels + 1,), dtype=bool) + label_is_present[:] = False subset = labels[slicer] # This is a quite fast way to find unique integers, when we know how many there are # inspired by a similar idea in numpy_groupies for first, last @@ -309,11 +310,10 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): ) cols = [] # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) + label_is_present = np.empty((nlabels + 1,), dtype=bool) for region in slices_from_chunks(chunks): uniques = chunk_unique(labels, region, nlabels, label_is_present) cols.append(uniques) - label_is_present[:] = False rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols)) cols_array = np.concatenate(cols) From 34c33749f79d56c02e28b50b77ea35190362044f Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:27:18 -0600 Subject: [PATCH 07/10] Comment --- flox/core.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 8b2f641e7..485e1f4ea 100644 --- a/flox/core.py +++ b/flox/core.py @@ -285,7 +285,11 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): uniques = ilabels[label_is_present[:-1]] return uniques - # TODO: needs a better heuristic + # TODO: refine this heuristic. + # The general idea is that with the threadpool, we repeatedly allocate memory + # for `label_is_present`. We trade that off against the parallelism across number of chunks. + # For large enough number of chunks (relative to number of labels), it makes sense to + # suffer the extra allocation in exchange for parallelism. THRESHOLD = 2 if nlabels < THRESHOLD * approx_chunk_size: logger.debug( From 668f7f80a6b7dbdb01297489c98f6792778b5791 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:31:24 -0600 Subject: [PATCH 08/10] Try single allocation --- flox/core.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index 485e1f4ea..a81891789 100644 --- a/flox/core.py +++ b/flox/core.py @@ -267,8 +267,6 @@ def make_bitmask(rows, cols): labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] - # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) def chunk_unique(labels, slicer, nlabels, label_is_present=None): @@ -298,10 +296,12 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): THRESHOLD, approx_chunk_size, ) + # Add one to handle the -1 sentinel value + label_is_present = np.empty((nchunks, nlabels + 1), dtype=bool) with ThreadPoolExecutor() as executor: futures = [ - executor.submit(chunk_unique, labels, slicer, nlabels) - for slicer in slices_from_chunks(chunks) + executor.submit(chunk_unique, labels, slicer, nlabels, label_is_present[i, :]) + for i, slicer in enumerate(slices_from_chunks(chunks)) ] cols = tuple(f.result() for f in futures) From 53479af7a14f889e943f1ba0909ff7d51892bf36 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:36:10 -0600 Subject: [PATCH 09/10] Revert "Try single allocation" This reverts commit c6b93367e2024e60d77af24a69d177670a040dfc. --- flox/core.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index a81891789..485e1f4ea 100644 --- a/flox/core.py +++ b/flox/core.py @@ -267,6 +267,8 @@ def make_bitmask(rows, cols): labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] + # Add one to handle the -1 sentinel value + label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) def chunk_unique(labels, slicer, nlabels, label_is_present=None): @@ -296,12 +298,10 @@ def chunk_unique(labels, slicer, nlabels, label_is_present=None): THRESHOLD, approx_chunk_size, ) - # Add one to handle the -1 sentinel value - label_is_present = np.empty((nchunks, nlabels + 1), dtype=bool) with ThreadPoolExecutor() as executor: futures = [ - executor.submit(chunk_unique, labels, slicer, nlabels, label_is_present[i, :]) - for i, slicer in enumerate(slices_from_chunks(chunks)) + executor.submit(chunk_unique, labels, slicer, nlabels) + for slicer in slices_from_chunks(chunks) ] cols = tuple(f.result() for f in futures) From ff0d8c2440a6a2fdb94cd6bb5a7a5dcf5678fb43 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 1 May 2024 23:36:23 -0600 Subject: [PATCH 10/10] cleanup --- flox/core.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 485e1f4ea..0670c5b37 100644 --- a/flox/core.py +++ b/flox/core.py @@ -267,8 +267,6 @@ def make_bitmask(rows, cols): labels = np.broadcast_to(labels, shape[-labels.ndim :]) cols = [] - # Add one to handle the -1 sentinel value - label_is_present = np.zeros((nlabels + 1,), dtype=bool) ilabels = np.arange(nlabels) def chunk_unique(labels, slicer, nlabels, label_is_present=None):