Skip to content

Commit

Permalink
Added basic statistics (mean/sum/var) for SparseNdarrays.
Browse files Browse the repository at this point in the history
  • Loading branch information
LTLA committed Jan 29, 2024
1 parent a7b29d2 commit d95b8e5
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 8 deletions.
240 changes: 233 additions & 7 deletions src/delayedarray/SparseNdarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
_concatenate_unmasked_ndarrays,
_concatenate_maybe_masked_ndarrays
)
from ._statistics import _find_useful_axes, _expected_sample_size, _choose_output_type, _create_offset_multipliers

__author__ = "ltla"
__copyright__ = "ltla"
Expand Down Expand Up @@ -778,6 +779,76 @@ def T(self) -> "SparseNdarray":
return _transpose_SparseNdarray(self, axes)


def sum(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray:
"""
Take the sum of values across the ``SparseNdarray``, possibly over a
given axis or set of axes.
Args:
axis:
A single integer specifying the axis to perform the calculation.
Alternatively a tuple or None, see ``numpy.sum`` for details.
dtype:
NumPy type for the output array. If None, this is automatically
chosen based on the type of the ``SparseNdarray``, see
``numpy.sum`` for details.
Returns:
A NumPy array containing the summed values. If ``axis = None``,
this will be a NumPy scalar instead.
"""
return _sum(self, axis=axis, dtype=dtype)


def mean(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None) -> numpy.ndarray:
"""
Take the mean of values across the ``SparseNdarray``, possibly over a
given axis or set of axes.
Args:
axis:
A single integer specifying the axis to perform the calculation.
Alternatively a tuple or None, see ``numpy.mean`` for details.
dtype:
NumPy type for the output array. If None, this is automatically
chosen based on the type of the ``SparseNdarray``, see
``numpy.mean`` for details.
Returns:
A NumPy array containing the mean values. If ``axis = None``,
this will be a NumPy scalar instead.
"""
return _mean(self, axis=axis, dtype=dtype)


def var(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None, ddof: int = 0) -> numpy.ndarray:
"""
Take the variances of values across the ``SparseNdarray``, possibly
over a given axis or set of axes.
Args:
axis:
A single integer specifying the axis to perform the calculation.
Alternatively a tuple or None, see ``numpy.mean`` for details.
dtype:
NumPy type for the output array. If None, this is automatically
chosen based on the type of the ``SparseNdarray``, see
``numpy.mean`` for details.
ddof:
Delta in the degrees of freedom to subtract from the denominator.
Typically set to 1 to obtain the sample variance.
Returns:
A NumPy array containing the variances. If ``axis = None``,
this will be a NumPy scalar instead.
"""
return _var(self, axis=axis, dtype=dtype, ddof=ddof)


# Other stuff
def __copy__(self) -> "SparseNdarray":
"""
Expand Down Expand Up @@ -1008,19 +1079,14 @@ def _extract_dense_array_from_SparseNdarray(x: SparseNdarray, subset: Tuple[Sequ
output = numpy.zeros((*reversed(idims),), dtype=x._dtype)

if x._contents is not None:
# Wrapping it in a masked array so that masked values are respected.
output = numpy.ma.MaskedArray(output, mask=False)

if x._is_masked:
output = numpy.ma.MaskedArray(output, mask=False)
ndim = len(x._shape)
if ndim > 1:
_recursive_extract_dense_array(x._contents, subset, subset_summary=subset_summary, output=output, dim=ndim-1)
else:
_extract_sparse_vector_to_dense(x._contents[0], x._contents[1], subset_summary=subset_summary, output=output)

# Stripping out the mask if there were, in fact, no masked values.
if isinstance(output.mask, bool) and not output.mask:
output = output.data

return output.T


Expand Down Expand Up @@ -1614,3 +1680,163 @@ def _concatenate_SparseNdarrays(xs: List[SparseNdarray], along: int):
new_contents = _concatenate_sparse_vectors(outidx, outval, payload)

return SparseNdarray(shape=(*new_shape,), contents=new_contents, dtype=output_dtype, index_dtype=output_index_dtype, is_masked=output_is_masked, check=False)


#########################################################
#########################################################


_ReductionPayload = namedtuple("_ReductionPayload", [ "multipliers", "operation" ])


def _reduce_sparse_vector(idx: numpy.ndarray, val: numpy.ndarray, payload: _ReductionPayload, offset: int = 0):
for j, ix in enumerate(idx):
extra = payload.multipliers[0] * ix
payload.operation(offset + extra, val[j])
return


def _recursive_reduce_SparseNdarray(contents, payload: _ReductionPayload, dim: int, offset: int = 0):
mult = payload.multipliers[dim]
if dim == 1:
for i, con in enumerate(contents):
if con is not None:
_reduce_sparse_vector(con[0], con[1], payload, offset = offset + mult * i)
else:
for i, con in enumerate(contents):
if con is not None:
_recursive_reduce_SparseNdarray(con, payload, dim = dim - 1, offset = offset + mult * i)
return


def _reduce_SparseNdarray(x: SparseNdarray, axes: List[int], operation: Callable):
if x.contents is not None:
multipliers = _create_offset_multipliers(x.shape, axes)
payload = _ReductionPayload(operation=operation, multipliers=multipliers)
ndim = len(x.shape)
if ndim == 1:
_reduce_sparse_vector(x.contents[0], x.contents[1], payload)
else:
_recursive_reduce_SparseNdarray(x.contents, payload, dim=ndim - 1)
return


def _allocate_output_array(shape: Tuple[int, ...], axes: List[int], dtype: numpy.dtype) -> numpy.ndarray:
# Either returning a scalar or not.
if len(axes) == 0:
return numpy.zeros(1, dtype=dtype)
else:
shape = [shape[i] for i in axes]
return numpy.zeros((*shape,), dtype=dtype, order="F")


def _sum(x: SparseNdarray, axis: Optional[Union[int, Tuple[int, ...]]], dtype: Optional[numpy.dtype]) -> numpy.ndarray:
axes = _find_useful_axes(len(x.shape), axis)
if dtype is None:
dtype = _choose_output_type(x.dtype, preserve_integer = True)
output = _allocate_output_array(x.shape, axes, dtype)
buffer = output.ravel(order="F")

if x._is_masked:
masked = numpy.zeros(output.shape, dtype=numpy.uint, order="F")
mask_buffer = masked.ravel(order="F")
def op(offset, value):
if value is not numpy.ma.masked:
buffer[offset] += value
else:
mask_buffer[offset] += 1
_reduce_SparseNdarray(x, axes, op)
size = _expected_sample_size(x.shape, axes)
output = numpy.ma.MaskedArray(output, mask=(masked == size))
else:
def op(offset, value):
buffer[offset] += value
_reduce_SparseNdarray(x, axes, op)

if len(axes) == 0:
return output[0]
else:
return output


def _mean(x: SparseNdarray, axis: Optional[Union[int, Tuple[int, ...]]], dtype: Optional[numpy.dtype]) -> numpy.ndarray:
axes = _find_useful_axes(len(x.shape), axis)
if dtype is None:
dtype = _choose_output_type(x.dtype, preserve_integer = False)
output = _allocate_output_array(x.shape, axes, dtype)
buffer = output.ravel(order="F")
size = _expected_sample_size(x.shape, axes)

if x._is_masked:
masked = numpy.zeros(output.shape, dtype=numpy.uint, order="F")
mask_buffer = masked.ravel(order="F")
def op(offset, value):
if value is not numpy.ma.masked:
buffer[offset] += value
else:
mask_buffer[offset] += 1
_reduce_SparseNdarray(x, axes, op)
denom = size - masked
output = numpy.ma.MaskedArray(output, mask=(denom==0))
else:
def op(offset, value):
buffer[offset] += value
_reduce_SparseNdarray(x, axes, op)
denom = size

output /= denom
if len(axes) == 0:
return output[0]
else:
return output


def _var(x: SparseNdarray, axis: Optional[Union[int, Tuple[int, ...]]], dtype: Optional[numpy.dtype], ddof: int) -> numpy.ndarray:
axes = _find_useful_axes(len(x.shape), axis)
if dtype is None:
dtype = _choose_output_type(x.dtype, preserve_integer = False)
size = _expected_sample_size(x.shape, axes)

# Using Welford's online algorithm.
sumsq = _allocate_output_array(x.shape, axes, dtype)
sumsq_buffer = sumsq.ravel(order="F")
means = numpy.zeros(sumsq.shape, dtype=dtype, order="F")
means_buffer = means.ravel(order="F")
counts = numpy.zeros(sumsq.shape, dtype=numpy.int64, order="F")
counts_buffer = counts.ravel(order="F")

def raw_op(offset, value):
counts_buffer[offset] += 1
delta = value - means_buffer[offset]
means_buffer[offset] += delta / counts_buffer[offset]
delta_2 = value - means_buffer[offset]
sumsq_buffer[offset] += delta * delta_2

if x._is_masked:
masked = numpy.zeros(sumsq.shape, dtype=numpy.int64, order="F")
mask_buffer = masked.ravel(order="F")
def op(offset, value):
if value is not numpy.ma.masked:
raw_op(offset, value)
else:
mask_buffer[offset] += 1
_reduce_SparseNdarray(x, axes, op)
actual_size = size - masked
denom = actual_size - ddof
num_zero = actual_size - counts
sumsq = numpy.ma.MaskedArray(sumsq, mask = (denom <= 0))
else:
_reduce_SparseNdarray(x, axes, raw_op)
actual_size = size
denom = max(0, size - ddof)
num_zero = size - counts

old_means = means.copy()
means *= counts / actual_size
sumsq += num_zero * (old_means * means)

sumsq /= denom
if len(axes) == 0:
return sumsq[0]
else:
return sumsq
61 changes: 61 additions & 0 deletions src/delayedarray/_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import List, Tuple
import numpy


def _find_useful_axes(ndim, axis) -> List[int]:
output = []
if axis is not None:
if isinstance(axis, int):
if axis < 0:
axis = ndim + axis
for i in range(ndim):
if i != axis:
output.append(i)
else:
used = set()
for a in axis:
if a < 0:
a = ndim + a
used.add(a)
for i in range(ndim):
if i not in used:
output.append(i)
return output


def _expected_sample_size(shape: Tuple[int, ...], axes: List[int]) -> int:
size = 1
j = 0
for i, d in enumerate(shape):
if j == len(axes) or i < axes[j]:
size *= d
else:
j += 1
return size


def _choose_output_type(dtype: numpy.dtype, preserve_integer: bool) -> numpy.dtype:
# Mimic numpy.sum's method for choosing the type.
if numpy.issubdtype(dtype, numpy.integer):
if preserve_integer:
xinfo = numpy.iinfo(dtype)
if xinfo.kind == "i":
pinfo = numpy.iinfo(numpy.int_)
if xinfo.bits < pinfo.bits:
dtype = numpy.dtype(numpy.int_)
else:
pinfo = numpy.iinfo(numpy.uint)
if xinfo.bits < pinfo.bits:
dtype = numpy.dtype(numpy.uint)
else:
dtype = numpy.dtype("float64")
return dtype


def _create_offset_multipliers(shape: Tuple[int, ...], axes: List[int]) -> List[int]:
multipliers = [0] * len(shape)
sofar = 1
for a in axes:
multipliers[a] = sofar
sofar *= shape[a]
return multipliers
Loading

0 comments on commit d95b8e5

Please sign in to comment.