Skip to content

Commit

Permalink
Improvements to histogramdd (for handling inputs that are sequences-o…
Browse files Browse the repository at this point in the history
…f-arrays). (dask#7634)

This PR improves `dask.array.histogramdd` such that multi-sequence input does not have to be stacked, transposed, and potentially rechunked.

The original implementation of `histogramdd` was focused on columnar/rectangular input, with each column of the input array representing one coordinate of the input to be histogrammed. If the input was a sequence of 1D arrays then we stacked the 1D arrays and transposed that stack to convert to the 2D/rectangular/columnar format we were mainly supporting. This stacking/fusing and transposing is an unnecessary step.

The rectangular input implementation is still there, but now we handle the sequence-of-1D inputs without converting to a 2D array. This optimization is important for supporting `numpy.histogram2d` as `dask.array.histogram2d` (subsequent PR).

Along with supporting sequence-of-1D input, this PR also does some general cleanup.
  • Loading branch information
douglasdavis authored Jun 23, 2021
1 parent 38f8de4 commit 0f2ba09
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 67 deletions.
191 changes: 137 additions & 54 deletions dask/array/routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,12 +978,12 @@ def histogram(a, bins=None, range=None, normed=False, weights=None, density=None
return n, bins


def _block_histogramdd(sample, bins, range=None, weights=None):
def _block_histogramdd_rect(sample, bins, range, weights):
"""Call numpy.histogramdd for a blocked/chunked calculation.
Slurps the result into an additional outer axis via [np.newaxis].
This new axis will be used to stack chunked calls of the numpy
function and add them together later.
Slurps the result into an additional outer axis; this new axis
will be used to stack chunked calls of the numpy function and add
them together later.
Returns
-------
Expand All @@ -994,6 +994,28 @@ def _block_histogramdd(sample, bins, range=None, weights=None):
return np.histogramdd(sample, bins, range=range, weights=weights)[0:1]


def _block_histogramdd_multiarg(*args):
"""Call numpy.histogramdd for a multi argument blocked/chunked calculation.
Slurps the result into an additional outer axis; this new axis
will be used to stack chunked calls of the numpy function and add
them together later.
The last three arguments _must be_ (bins, range, weights).
The difference between this function and
_block_histogramdd_rect is that here we expect the sample
to be composed of multiple arguments (multiple 1D arrays, each one
representing a coordinate), while _block_histogramdd_rect
expects a single rectangular (2D array where columns are
coordinates) sample.
"""
bins, range, weights = args[-3:]
sample = args[:-3]
return np.histogramdd(sample, bins=bins, range=range, weights=weights)[0:1]


def histogramdd(sample, bins, range=None, normed=None, weights=None, density=None):
"""Blocked variant of :func:`numpy.histogramdd`.
Expand All @@ -1003,10 +1025,10 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
compatible with this function. If weights are used, they must be
chunked along the 0th axis identically to the input sample.
A proper example setup for a three dimensional histogram, where
the sample shape is ``(8, 3)`` and weights are shape ``(8,)``,
sample chunks would be ``((4, 4), (3,))`` and the weights chunks
would be ``((4, 4),)`` a table of the structure:
An example setup for a three dimensional histogram, where the
sample shape is ``(8, 3)`` and weights are shape ``(8,)``, sample
chunks would be ``((4, 4), (3,))`` and the weights chunks would be
``((4, 4),)`` a table of the structure:
+-------+-----------------------+-----------+
| | sample (8 x 3) | weights |
Expand Down Expand Up @@ -1042,6 +1064,10 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
from ``dask.dataframe``, i.e. :class:`dask.dataframe.Series` or
:class:`dask.dataframe.DataFrame`.
The function is also compatible with `x`, `y`, and `z` being
individual 1D arrays with equal chunking. In that case, the data
should be passed as a tuple: ``histogramdd((x, y, z), ...)``
Parameters
----------
sample : dask.array.Array (N, D) or sequence of dask.array.Array
Expand All @@ -1053,9 +1079,7 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
* When a (N, D) dask Array, each row is an entry in the sample
(coordinate in D dimensional space).
* When a sequence of dask Arrays, each element in the sequence
is the array of values for a single coordinate. This type of
input will be automatically rechunked along the column axis
if necessary. This may induce a runtime increase.
is the array of values for a single coordinate.
bins : sequence of arrays describing bin edges, int, or sequence of ints
The bin specification.
Expand Down Expand Up @@ -1126,8 +1150,31 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
>>> np.isclose(h.sum().compute(), w.sum().compute())
True
"""
Using a sequence of 1D arrays as the input:
>>> x = da.array([2, 4, 2, 4, 2, 4])
>>> y = da.array([2, 2, 4, 4, 2, 4])
>>> z = da.array([4, 2, 4, 2, 4, 2])
>>> bins = ([0, 3, 6],) * 3
>>> h, edges = da.histogramdd((x, y, z), bins)
>>> h
dask.array<sum-aggregate, shape=(2, 2, 2), dtype=float64, chunksize=(2, 2, 2), chunktype=numpy.ndarray>
>>> edges[0]
dask.array<array, shape=(3,), dtype=int64, chunksize=(3,), chunktype=numpy.ndarray>
>>> h.compute()
array([[[0., 2.],
[0., 1.]],
<BLANKLINE>
[[1., 0.],
[2., 0.]]])
>>> edges[0].compute()
array([0, 3, 6])
>>> edges[1].compute()
array([0, 3, 6])
>>> edges[2].compute()
array([0, 3, 6])
"""
# logic used in numpy.histogramdd to handle normed/density.
if normed is None:
if density is None:
Expand Down Expand Up @@ -1157,16 +1204,27 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non

# N == total number of samples
# D == total number of dimensions
try:
# Recommended input ND-array
N, D = sample.shape
except (AttributeError, ValueError):
# If we have a sequence of 1D arrays
sample = atleast_2d(sample).T
N, D = sample.shape
# rechunk if necessary
if sample.chunksize[1] != D:
sample = sample.rechunk((sample.chunksize[0], D))
if hasattr(sample, "shape"):
if len(sample.shape) != 2:
raise ValueError("Single array input to histogramdd should be columnar")
else:
_, D = sample.shape
n_chunks = sample.numblocks[0]
rectangular_sample = True
# Require data to be chunked along the first axis only.
if sample.shape[1:] != sample.chunksize[1:]:
raise ValueError("Input array can only be chunked along the 0th axis.")
elif isinstance(sample, (tuple, list)):
rectangular_sample = False
D = len(sample)
n_chunks = sample[0].numblocks[0]
for i in _range(1, D):
if sample[i].chunks != sample[0].chunks:
raise ValueError("All coordinate arrays must be chunked identically.")
else:
raise ValueError(
"Incompatible sample. Must be a 2D array or a sequence of 1D arrays."
)

# Require only Array or Delayed objects for bins, range, and weights.
for argname, val in [("bins", bins), ("range", range), ("weights", weights)]:
Expand All @@ -1176,16 +1234,18 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
"for `histogramdd`. For argument `{}`, got: {!r}".format(argname, val)
)

# Require data to be chunked along the first axis only.
if sample.shape[1:] != sample.chunksize[1:]:
raise ValueError("Input array can only be chunked along the 0th axis")

# Require that the chunking of the sample and weights are compatible.
if weights is not None and weights.chunks[0] != sample.chunks[0]:
raise ValueError(
"Input array and weights must have the same shape "
"and chunk structure along the first dimension."
)
if weights is not None:
if rectangular_sample and weights.chunks[0] != sample.chunks[0]:
raise ValueError(
"Input array and weights must have the same shape "
"and chunk structure along the first dimension."
)
elif not rectangular_sample and weights.numblocks != n_chunks:
raise ValueError(
"Input arrays and weights must have the same shape "
"and chunk structure."
)

# if bins is a list, tuple, then make sure the length is the same
# as the number dimensions.
Expand All @@ -1205,50 +1265,73 @@ def histogramdd(sample, bins, range=None, normed=None, weights=None, density=Non
if not all(len(r) == 2 for r in range):
raise ValueError("range argument should be a sequence of pairs")

# we will return the edges to mimic the NumPy API (we also use the
# edges later as a way to calculate the total number of bins).
# If bins is a single int, create a tuple of len `D` containing `bins`.
if isinstance(bins, int):
bins = (bins,) * D

# we will return the edges to mimic the NumPy API (we also use the
# edges later as a way to calculate the total number of bins).
if all(isinstance(b, int) for b in bins) and all(len(r) == 2 for r in range):
edges = [np.linspace(r[0], r[1], b + 1) for b, r in zip(bins, range)]
else:
edges = [np.asarray(b) for b in bins]

# With dsk below, we will construct a (D + 1) dimensional array
# stacked for each chunk. For example, if the histogram is going
# to be 3 dimensions, this creates a stack of cubes (1 cube for
# each sample chunk) that will be collapsed into a final cube (the
# result).
if rectangular_sample:
deps = (sample,)
else:
deps = tuple(sample)

if weights is not None:
w_keys = flatten(weights.__dask_keys__())
deps += (weights,)
dtype = weights.dtype
else:
w_keys = (None,) * n_chunks
dtype = np.histogramdd([])[0].dtype

# This tuple of zeros represents the chunk index along the columns
# (we only allow chunking along the rows).
column_zeros = tuple(0 for _ in _range(D))

if weights is None:
# With dsk below, we will construct a (D + 1) dimensional array
# stacked for each chunk. For example, if the histogram is going
# to be 3 dimensions, this creates a stack of cubes (1 cube for
# each sample chunk) that will be collapsed into a final cube (the
# result). Depending on the input data, we can do this in two ways
#
# 1. The rectangular case: when the sample is a single 2D array
# where each column in the sample represents a coordinate of
# the sample).
#
# 2. The sequence-of-arrays case, when the sample is a tuple or
# list of arrays, with each array in that sequence representing
# the entirety of one coordinate of the complete sample.

if rectangular_sample:
sample_keys = flatten(sample.__dask_keys__())
dsk = {
(name, i, *column_zeros): (_block_histogramdd, k, bins, range)
for i, k in enumerate(flatten(sample.__dask_keys__()))
(name, i, *column_zeros): (_block_histogramdd_rect, k, bins, range, w)
for i, (k, w) in enumerate(zip(sample_keys, w_keys))
}
dtype = np.histogramdd([])[0].dtype
else:
a_keys = flatten(sample.__dask_keys__())
w_keys = flatten(weights.__dask_keys__())
sample_keys = [
list(flatten(sample[i].__dask_keys__())) for i in _range(len(sample))
]
fused_on_chunk_keys = [
tuple(sample_keys[j][i] for j in _range(D)) for i in _range(n_chunks)
]
dsk = {
(name, i, *column_zeros): (_block_histogramdd, k, bins, range, w)
for i, (k, w) in enumerate(zip(a_keys, w_keys))
(name, i, *column_zeros): (
_block_histogramdd_multiarg,
*(*k, bins, range, w),
)
for i, (k, w) in enumerate(zip(fused_on_chunk_keys, w_keys))
}
dtype = weights.dtype

deps = (sample,)
if weights is not None:
deps += (weights,)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=deps)

nchunks = len(list(flatten(sample.__dask_keys__())))
all_nbins = tuple((b.size - 1,) for b in edges)
stacked_chunks = ((1,) * nchunks, *all_nbins)
stacked_chunks = ((1,) * n_chunks, *all_nbins)
mapped = Array(graph, name, stacked_chunks, dtype=dtype)

# Finally, sum over chunks providing to get the final D
# dimensional result array.
n = mapped.sum(axis=0)
Expand Down
Loading

0 comments on commit 0f2ba09

Please sign in to comment.