Skip to content

Commit

Permalink
Adds support for Monitor a Port to observe if it is blocked (#755)
Browse files Browse the repository at this point in the history
* Adds support for Monitor a Port to observe if it is blocked

* Fix lint issues

* Redesigned Watchdog to use Multiprocessing Manager; Invoke only 2 Event Monitors and use 2 queues for watching events; Configs are piped in via compiler now

* Incorporate Codacy Suggestions

* Fix lint comments

* Fix failing unit tests to add the watchdog builder

* Code review comments
  • Loading branch information
joyeshmishra authored Jul 31, 2023
1 parent d12c267 commit 9b98a04
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 43 deletions.
97 changes: 91 additions & 6 deletions src/lava/magma/compiler/builders/channel_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import typing as ty
from dataclasses import dataclass
from multiprocessing import Queue

from lava.magma.compiler.builders.interfaces import \
AbstractChannelBuilder, \
Expand All @@ -17,14 +18,58 @@
from lava.magma.compiler.utils import PortInitializer
from lava.magma.runtime.message_infrastructure \
.message_infrastructure_interface import (MessageInfrastructureInterface)
from lava.magma.compiler.channels.watchdog import WatchdogManager, Watchdog

if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
from lava.magma.runtime.runtime import Runtime

Queues = ty.Tuple[Queue, Queue, Queue, Queue]
Watchdogs = ty.Tuple[Watchdog, Watchdog, Watchdog, Watchdog]
PortInitializers = ty.Tuple[PortInitializer, PortInitializer]


class WatchdogEnabledMixin:
@staticmethod
def watch(watchdog_manager: WatchdogManager,
queue: Queue,
process: "AbstractProcess",
pi: PortInitializer,
method_type: str) -> Watchdog:
process_cls: str = process.__class__.__name__
port_name: str = pi.name
name: str = f"{process_cls}.{port_name}"
w: Watchdog = watchdog_manager.create_watchdog(queue=queue,
channel_name=name,
method_type=method_type)
return w

def create_watchdogs(self,
watchdog_manager: WatchdogManager,
queues: Queues,
port_initializers: PortInitializers) -> Watchdogs:
src_send_watchdog: Watchdog = self.watch(watchdog_manager,
queues[0], self.src_process,
port_initializers[0],
"send")
src_join_watchdog: Watchdog = self.watch(watchdog_manager,
queues[1], self.src_process,
port_initializers[0],
"join")
dst_recv_watchdog: Watchdog = self.watch(watchdog_manager,
queues[2], self.dst_process,
port_initializers[1],
"recv")
dst_join_watchdog: Watchdog = self.watch(watchdog_manager,
queues[3], self.dst_process,
port_initializers[1],
"join")
return (src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog)


@dataclass
class ChannelBuilderMp(AbstractChannelBuilder):
class ChannelBuilderMp(AbstractChannelBuilder, WatchdogEnabledMixin):
"""A ChannelBuilder assuming Python multi-processing is used as messaging
and multi processing backbone.
"""
Expand All @@ -36,13 +81,15 @@ class ChannelBuilderMp(AbstractChannelBuilder):
dst_port_initializer: PortInitializer

def build(
self, messaging_infrastructure: MessageInfrastructureInterface
self, messaging_infrastructure: MessageInfrastructureInterface,
watchdog_manager: WatchdogManager
) -> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : MessageInfrastructureInterface
watchdog_manager: WatchdogManager
Returns
-------
Expand All @@ -57,18 +104,30 @@ def build(
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type
)

# Watchdogs
sq = watchdog_manager.sq
queues = (sq, sq, sq, sq)
port_initializers = (self.src_port_initializer,
self.dst_port_initializer)
(src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog) = \
self.create_watchdogs(watchdog_manager, queues, port_initializers)

return channel_class(
messaging_infrastructure,
self.src_port_initializer.name,
self.dst_port_initializer.name,
self.src_port_initializer.shape,
self.src_port_initializer.d_type,
self.src_port_initializer.size,
src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog
)


@dataclass
class ServiceChannelBuilderMp(AbstractChannelBuilder):
class ServiceChannelBuilderMp(AbstractChannelBuilder, WatchdogEnabledMixin):
"""A RuntimeServiceChannelBuilder assuming Python multi-processing is used
as messaging and multi processing backbone.
"""
Expand All @@ -81,13 +140,15 @@ class ServiceChannelBuilderMp(AbstractChannelBuilder):
port_initializer: PortInitializer

def build(
self, messaging_infrastructure: MessageInfrastructureInterface
self, messaging_infrastructure: MessageInfrastructureInterface,
watchdog_manager: WatchdogManager
) -> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : MessageInfrastructureInterface
watchdog_manager: WatchdogManager
Returns
-------
Expand All @@ -103,6 +164,15 @@ def build(
channel_type=self.channel_type
)

# Watchdogs
lq, sq = watchdog_manager.lq, watchdog_manager.sq
queues = (sq, sq, lq, sq)
port_initializers = (self.port_initializer,
self.port_initializer)
(src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog) = \
self.create_watchdogs(watchdog_manager, queues, port_initializers)

channel_name: str = self.port_initializer.name
return channel_class(
messaging_infrastructure,
Expand All @@ -111,11 +181,13 @@ def build(
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog
)


@dataclass
class RuntimeChannelBuilderMp(AbstractChannelBuilder):
class RuntimeChannelBuilderMp(AbstractChannelBuilder, WatchdogEnabledMixin):
"""A RuntimeChannelBuilder assuming Python multi-processing is
used as messaging and multi processing backbone.
"""
Expand All @@ -126,13 +198,15 @@ class RuntimeChannelBuilderMp(AbstractChannelBuilder):
port_initializer: PortInitializer

def build(
self, messaging_infrastructure: MessageInfrastructureInterface
self, messaging_infrastructure: MessageInfrastructureInterface,
watchdog_manager: WatchdogManager
) -> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : MessageInfrastructureInterface
watchdog_manager: WatchdogManager
Returns
-------
Expand All @@ -148,6 +222,15 @@ def build(
channel_type=self.channel_type
)

# Watchdogs
lq, sq = watchdog_manager.lq, watchdog_manager.sq
queues = (sq, sq, lq, sq)
port_initializers = (self.port_initializer,
self.port_initializer)
(src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog) = \
self.create_watchdogs(watchdog_manager, queues, port_initializers)

channel_name: str = self.port_initializer.name
return channel_class(
messaging_infrastructure,
Expand All @@ -156,6 +239,8 @@ def build(
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
src_send_watchdog, src_join_watchdog,
dst_recv_watchdog, dst_join_watchdog
)


Expand Down
79 changes: 50 additions & 29 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from time import time
from scipy.sparse import csr_matrix
from lava.utils.sparse import find
from lava.magma.compiler.channels.watchdog import Watchdog, NoOPWatchdog


import numpy as np
Expand Down Expand Up @@ -38,7 +39,8 @@ class CspSendPort(AbstractCspSendPort):
semantics. It can be understood as the input port of a CSP channel.
"""

def __init__(self, name, shm, proto, size, req, ack):
def __init__(self, name, shm, proto, size, req, ack,
io_watchdog: Watchdog, join_watchdog: Watchdog):
"""Instantiates CspSendPort object and class attributes
Parameters
Expand All @@ -65,6 +67,9 @@ def __init__(self, name, shm, proto, size, req, ack):
self.observer: ty.Optional[ty.Callable[[], ty.Any]] = None
self.thread = None

self._io_watchdog: Watchdog = io_watchdog
self._join_watchdog: Watchdog = join_watchdog

@property
def name(self) -> str:
return self._name
Expand Down Expand Up @@ -126,24 +131,26 @@ def send(self, data):
"""
Send data on the channel. May block if the channel is already full.
"""
if data.shape != self._shape:
raise AssertionError(f"{data.shape=} {self._shape=} Mismatch")
with self._io_watchdog:
if data.shape != self._shape:
raise AssertionError(f"{data.shape=} {self._shape=} Mismatch")

if isinstance(data, csr_matrix):
data = find(data, explicit_zeros=True)[2]
if isinstance(data, csr_matrix):
data = find(data, explicit_zeros=True)[2]

self._semaphore.acquire()
self._array[self._idx][:] = data[:]
self._idx = (self._idx + 1) % self._size
self._req.release()
self._semaphore.acquire()
self._array[self._idx][:] = data[:]
self._idx = (self._idx + 1) % self._size
self._req.release()

def join(self):
if not self._done:
self._done = True
if self.thread is not None:
self._ack.release()
self._ack = None
self._req = None
with self._join_watchdog:
if not self._done:
self._done = True
if self.thread is not None:
self._ack.release()
self._ack = None
self._req = None


class CspRecvQueue(Queue):
Expand Down Expand Up @@ -186,7 +193,8 @@ class CspRecvPort(AbstractCspRecvPort):
semantics. It can be understood as the output port of a CSP channel.
"""

def __init__(self, name, shm, proto, size, req, ack):
def __init__(self, name, shm, proto, size, req, ack,
io_watchdog, join_watchdog):
"""Instantiates CspRecvPort object and class attributes
Parameters
Expand All @@ -213,6 +221,9 @@ def __init__(self, name, shm, proto, size, req, ack):
self.observer: ty.Optional[ty.Callable[[], ty.Any]] = None
self.thread = None

self._io_watchdog = io_watchdog
self._join_watchdog = join_watchdog

@property
def name(self) -> str:
return self._name
Expand Down Expand Up @@ -283,19 +294,21 @@ def recv(self):
"""
Receive from the channel. Blocks if there is no data on the channel.
"""
self._queue.get()
result = self._array[self._idx].copy()
self._idx = (self._idx + 1) % self._size
self._ack.release()
return result
with self._io_watchdog:
self._queue.get()
result = self._array[self._idx].copy()
self._idx = (self._idx + 1) % self._size
self._ack.release()
return result

def join(self):
if not self._done:
self._done = True
if self.thread is not None:
self._req.release()
self._ack = None
self._req = None
with self._join_watchdog:
if not self._done:
self._done = True
if self.thread is not None:
self._req.release()
self._ack = None
self._req = None


class CspSelector:
Expand Down Expand Up @@ -351,6 +364,10 @@ def __init__(
shape,
dtype,
size,
src_send_watchdog=NoOPWatchdog(None),
src_join_watchdog=NoOPWatchdog(None),
dst_recv_watchdog=NoOPWatchdog(None),
dst_join_watchdog=NoOPWatchdog(None)
):
"""Instantiates PyPyChannel object and class attributes
Expand All @@ -369,8 +386,12 @@ def __init__(
req = Semaphore(0)
ack = Semaphore(0)
proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes)
self._src_port = CspSendPort(src_name, shm, proto, size, req, ack)
self._dst_port = CspRecvPort(dst_name, shm, proto, size, req, ack)
self._src_port = CspSendPort(src_name, shm, proto, size, req, ack,
src_send_watchdog,
src_join_watchdog)
self._dst_port = CspRecvPort(dst_name, shm, proto, size, req, ack,
dst_recv_watchdog,
dst_join_watchdog)

def nbytes(self, shape, dtype):
return np.prod(shape) * np.dtype(dtype).itemsize
Expand Down
Loading

0 comments on commit 9b98a04

Please sign in to comment.