Skip to content

Commit

Permalink
Merge branch 'main' into features/870-divide-kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
ClaudiaComito authored Apr 22, 2022
2 parents 067c36e + aaafea0 commit 9d2926b
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 43 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Pending additions

- [#595](https://github.com/helmholtz-analytics/heat/pull/595) Distributed 1-D convolution: `ht.convolve`
- [#595](https://github.com/helmholtz-analytics/heat/pull/595) `DNDarray.get_halo` method no longer requires load-balance
- [#867](https://github.com/helmholtz-analytics/heat/pull/867) Upgraded to support torch 1.9.0
- [#876](https://github.com/helmholtz-analytics/heat/pull/876) Make examples work (Lasso and kNN)
- [#894](https://github.com/helmholtz-analytics/heat/pull/894) Change inclusion of license file
Expand Down
1 change: 1 addition & 0 deletions heat/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .tiling import *
from .trigonometrics import *
from .types import *
from .signal import *
from .types import finfo, iinfo
from . import version
from .version import __version__
58 changes: 27 additions & 31 deletions heat/core/dndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def T(self):
return linalg.transpose(self, axes=None)

@property
def array_with_halos(self) -> Tuple[torch.tensor, torch.tensor]:
def array_with_halos(self) -> torch.Tensor:
"""
Fetch halos of size ``halo_size`` from neighboring ranks and save them in ``self.halo_next``/``self.halo_prev``
in case they are not already stored. If ``halo_size`` differs from the size of already stored halos,
Expand All @@ -357,21 +357,15 @@ def __prephalo(self, start, end) -> torch.Tensor:

return self.__array[ix].clone().contiguous()

def get_halo(self, halo_size) -> torch.Tensor:
def get_halo(self, halo_size: int) -> torch.Tensor:
"""
Fetch halos of size ``halo_size`` from neighboring ranks and save them in ``self.halo_next/self.halo_prev``
in case they are not already stored. If ``halo_size`` differs from the size of already stored halos,
the are overwritten.
Fetch halos of size ``halo_size`` from neighboring ranks and save them in ``self.halo_next/self.halo_prev``.
Parameters
----------
halo_size : int
Size of the halo.
"""
if not self.is_balanced():
raise RuntimeError(
"halo cannot be created for unbalanced tensors, running the .balance_() function is recommended"
)
if not isinstance(halo_size, int):
raise TypeError(
"halo_size needs to be of Python type integer, {} given".format(type(halo_size))
Expand All @@ -381,44 +375,47 @@ def get_halo(self, halo_size) -> torch.Tensor:
"halo_size needs to be a positive Python integer, {} given".format(type(halo_size))
)

if self.comm.is_distributed() and self.split is not None:
if self.is_distributed() and halo_size > 0:
# gather lshapes
lshape_map = self.create_lshape_map()
lshape_map = self.lshape_map
rank = self.comm.rank
size = self.comm.size
next_rank = rank + 1
prev_rank = rank - 1
last_rank = size - 1

# if local shape is zero and it's the last process
if self.lshape[self.split] == 0:
return # if process has no data we ignore it
populated_ranks = torch.nonzero(lshape_map[:, self.split]).squeeze().tolist()
if rank in populated_ranks:
first_rank = populated_ranks[0]
last_rank = populated_ranks[-1]
if rank != last_rank:
next_rank = populated_ranks[populated_ranks.index(rank) + 1]
if rank != first_rank:
prev_rank = populated_ranks[populated_ranks.index(rank) - 1]
else:
# if process has no data we ignore it
return

if halo_size > self.lshape[self.split]:
# if on at least one process the halo_size is larger than the local size throw ValueError
if (halo_size > self.lshape_map[:, self.split][populated_ranks]).any():
# halo_size is larger than the local size on at least one process
raise ValueError(
"halo_size {} needs to be smaller than chunck-size {} )".format(
"halo_size {} needs to be smaller than chunk-size {} )".format(
halo_size, self.lshape[self.split]
)
)

a_prev = self.__prephalo(0, halo_size)
a_next = self.__prephalo(-halo_size, None)

res_prev = None
res_next = None

req_list = list()

# only exchange data with next process if it has data
if rank != last_rank and (lshape_map[next_rank, self.split] > 0):
# exchange data with next populated process
if rank != last_rank:
self.comm.Isend(a_next, next_rank)
res_prev = torch.zeros(
a_prev.size(), dtype=a_prev.dtype, device=self.device.torch_device
)
req_list.append(self.comm.Irecv(res_prev, source=next_rank))

if rank != 0:
if rank != first_rank:
self.comm.Isend(a_prev, prev_rank)
res_next = torch.zeros(
a_next.size(), dtype=a_next.dtype, device=self.device.torch_device
Expand All @@ -432,16 +429,15 @@ def get_halo(self, halo_size) -> torch.Tensor:
self.__halo_prev = res_next
self.__ishalo = True

def __cat_halo(self) -> Tuple[torch.tensor, torch.tensor]:
def __cat_halo(self) -> torch.Tensor:
"""
Fetch halos of size ``halo_size`` from neighboring ranks and save them in ``self.halo_next``/``self.halo_prev``
in case they are not already stored. If ``halo_size`` differs from the size of already stored halos,
the are overwritten.
Return local array concatenated to halos if they are available.
"""
if not self.is_distributed():
return self.__array
return torch.cat(
[_ for _ in (self.__halo_prev, self.__array, self.__halo_next) if _ is not None],
self.split,
dim=self.split,
)

def astype(self, dtype, copy=True) -> DNDarray:
Expand Down
4 changes: 4 additions & 0 deletions heat/core/manipulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,10 @@ def pad(
[ 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0]]], dtype=ht.int64, device=cpu:0, split=0)
"""
# early out if pad width is 0
if pad_width == 0:
return array

if not isinstance(array, DNDarray):
raise TypeError("expected array to be a ht.DNDarray, but was {}".format(type(array)))

Expand Down
148 changes: 148 additions & 0 deletions heat/core/signal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Provides a collection of signal-processing operations"""

import torch
from typing import Union, Tuple, Sequence

from .communication import MPI
from .dndarray import DNDarray
from .types import promote_types
from .manipulations import pad
from .factories import array
import torch.nn.functional as fc

__all__ = ["convolve"]


def convolve(a: DNDarray, v: DNDarray, mode: str = "full") -> DNDarray:
"""
Returns the discrete, linear convolution of two one-dimensional `DNDarray`s.
Parameters
----------
a : DNDarray
One-dimensional signal `DNDarray` of shape (N,)
v : DNDarray
One-dimensional filter weight `DNDarray` of shape (M,).
mode : str
Can be 'full', 'valid', or 'same'. Default is 'full'.
'full':
Returns the convolution at
each point of overlap, with an output shape of (N+M-1,). At
the end-points of the convolution, the signals do not overlap
completely, and boundary effects may be seen.
'same':
Mode 'same' returns output of length 'N'. Boundary
effects are still visible. This mode is not supported for
even-sized filter weights
'valid':
Mode 'valid' returns output of length 'N-M+1'. The
convolution product is only given for points where the signals
overlap completely. Values outside the signal boundary have no
effect.
Notes
-----
Contrary to the original `numpy.convolve`, this function does not
swap the input arrays if the second one is larger than the first one.
This is because `a`, the signal, might be memory-distributed,
whereas the filter `v` is assumed to be non-distributed,
i.e. a copy of `v` will reside on each process.
Examples
--------
Note how the convolution operator flips the second array
before "sliding" the two across one another:
>>> a = ht.ones(10)
>>> v = ht.arange(3).astype(ht.float)
>>> ht.convolve(a, v, mode='full')
DNDarray([0., 1., 3., 3., 3., 3., 2.])
>>> ht.convolve(a, v, mode='same')
DNDarray([1., 3., 3., 3., 3.])
>>> ht.convolve(a, v, mode='valid')
DNDarray([3., 3., 3.])
"""
if not isinstance(a, DNDarray):
try:
a = array(a)
except TypeError:
raise TypeError("non-supported type for signal: {}".format(type(a)))
if not isinstance(v, DNDarray):
try:
v = array(v)
except TypeError:
raise TypeError("non-supported type for filter: {}".format(type(v)))
promoted_type = promote_types(a.dtype, v.dtype)
a = a.astype(promoted_type)
v = v.astype(promoted_type)

if v.is_distributed():
raise TypeError("Distributed filter weights are not supported")
if len(a.shape) != 1 or len(v.shape) != 1:
raise ValueError("Only 1-dimensional input DNDarrays are allowed")
if a.shape[0] <= v.shape[0]:
raise ValueError("Filter size must not be greater than or equal to signal size")
if mode == "same" and v.shape[0] % 2 == 0:
raise ValueError("Mode 'same' cannot be used with even-sized kernel")

# compute halo size
halo_size = v.shape[0] // 2

# pad DNDarray with zeros according to mode
if mode == "full":
pad_size = v.shape[0] - 1
gshape = v.shape[0] + a.shape[0] - 1
elif mode == "same":
pad_size = halo_size
gshape = a.shape[0]
elif mode == "valid":
pad_size = 0
gshape = a.shape[0] - v.shape[0] + 1
else:
raise ValueError("Supported modes are 'full', 'valid', 'same', got {}".format(mode))

a = pad(a, pad_size, "constant", 0)

if a.is_distributed():
if (v.shape[0] > a.lshape_map[:, 0]).any():
raise ValueError("Filter weight is larger than the local chunks of signal")
# fetch halos and store them in a.halo_next/a.halo_prev
a.get_halo(halo_size)
# apply halos to local array
signal = a.array_with_halos
else:
signal = a.larray

# make signal and filter weight 3D for Pytorch conv1d function
signal = signal.reshape(1, 1, signal.shape[0])

# flip filter for convolution as Pytorch conv1d computes correlations
weight = v.larray.flip(dims=(0,))
weight = weight.reshape(1, 1, weight.shape[0])

# cast to float if on GPU
if signal.is_cuda:
float_type = promote_types(signal.dtype, torch.float32).torch_type()
signal = signal.to(float_type)
weight = weight.to(float_type)

# apply torch convolution operator
signal_filtered = fc.conv1d(signal, weight)

# unpack 3D result into 1D
signal_filtered = signal_filtered[0, 0, :]

# if kernel shape along split axis is even we need to get rid of duplicated values
if a.comm.rank != 0 and v.shape[0] % 2 == 0:
signal_filtered = signal_filtered[1:]

return DNDarray(
signal_filtered.contiguous(),
(gshape,),
signal_filtered.dtype,
a.split,
a.device,
a.comm,
balanced=False,
).astype(a.dtype.torch_type())
79 changes: 73 additions & 6 deletions heat/core/tests/test_dndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,10 @@ def test_gethalo(self):
# exception for too large halos
with self.assertRaises(ValueError):
data.get_halo(4)
# exception on non balanced tensor
with self.assertRaises(RuntimeError):
data_nobalance = ht.array(
torch.empty(((data.comm.rank + 1) * 2, 3, 4)), is_split=0, device=data.device
)
data_nobalance.get_halo(1)
# test no data on process
data_np = np.arange(2 * 12).reshape(2, 12)
data = ht.array(data_np, split=0)
print("DEBUGGING: data.lshape_map = ", data.lshape_map)
data.get_halo(1)

data_with_halos = data.array_with_halos
Expand Down Expand Up @@ -167,6 +162,78 @@ def test_gethalo(self):
self.assertTrue(data.halo_next is None)
self.assertEqual(data_with_halos.shape, (12, 0))

# test halo of imbalanced dndarray
if data.comm.size > 2:
# test for split=0
t_data = torch.arange(
5 * data.comm.rank, dtype=torch.float64, device=data.larray.device
).reshape(data.comm.rank, 5)
if data.comm.rank > 0:
prev_data = torch.arange(
5 * (data.comm.rank - 1), dtype=torch.float64, device=data.larray.device
).reshape(data.comm.rank - 1, 5)
if data.comm.rank < data.comm.size - 1:
next_data = torch.arange(
5 * (data.comm.rank + 1), dtype=torch.float64, device=data.larray.device
).reshape(data.comm.rank + 1, 5)
data = ht.array(t_data, is_split=0)
data.get_halo(1)
data_with_halos = data.array_with_halos
if data.comm.rank == 0:
prev_halo = None
next_halo = None
new_split_size = 0
elif data.comm.rank == 1:
prev_halo = None
next_halo = next_data[0]
new_split_size = data.larray.shape[0] + 1
elif data.comm.rank == data.comm.size - 1:
prev_halo = prev_data[-1]
next_halo = None
new_split_size = data.larray.shape[0] + 1
else:
prev_halo = prev_data[-1]
next_halo = next_data[0]
new_split_size = data.larray.shape[0] + 2
self.assertEqual(data_with_halos.shape, (new_split_size, 5))
self.assertTrue(data.halo_prev is prev_halo or (data.halo_prev == prev_halo).all())
self.assertTrue(data.halo_next is next_halo or (data.halo_next == next_halo).all())

# test for split=1
t_data = torch.arange(
5 * data.comm.rank, dtype=torch.float64, device=data.larray.device
).reshape(5, -1)
if data.comm.rank > 0:
prev_data = torch.arange(
5 * (data.comm.rank - 1), dtype=torch.float64, device=data.larray.device
).reshape(5, -1)
if data.comm.rank < data.comm.size - 1:
next_data = torch.arange(
5 * (data.comm.rank + 1), dtype=torch.float64, device=data.larray.device
).reshape(5, -1)
data = ht.array(t_data, is_split=1)
data.get_halo(1)
data_with_halos = data.array_with_halos
if data.comm.rank == 0:
prev_halo = None
next_halo = None
new_split_size = 0
elif data.comm.rank == 1:
prev_halo = None
next_halo = next_data[:, 0].unsqueeze_(1)
new_split_size = data.larray.shape[1] + 1
elif data.comm.rank == data.comm.size - 1:
prev_halo = prev_data[:, -1].unsqueeze_(1)
next_halo = None
new_split_size = data.larray.shape[1] + 1
else:
prev_halo = prev_data[:, -1].unsqueeze_(1)
next_halo = next_data[:, 0].unsqueeze_(1)
new_split_size = data.larray.shape[1] + 2
self.assertEqual(data_with_halos.shape, (5, new_split_size))
self.assertTrue(data.halo_prev is prev_halo or (data.halo_prev == prev_halo).all())
self.assertTrue(data.halo_next is next_halo or (data.halo_next == next_halo).all())

def test_larray(self):
# undistributed case
x = ht.arange(6 * 7 * 8).reshape((6, 7, 8))
Expand Down
Loading

0 comments on commit 9d2926b

Please sign in to comment.