From f9d11913240c2e4bf4977def799a266c0d60029f Mon Sep 17 00:00:00 2001 From: ssgier Date: Thu, 23 Feb 2023 21:22:46 +0100 Subject: [PATCH 01/11] Plug file descriptor leaks --- .../magma/compiler/channels/pypychannel.py | 4 +-- .../close_on_shutdown_smm.py | 26 +++++++++++++++++++ .../message_infrastructure/multiprocessing.py | 24 +++++++++++++---- .../channel_broker/channel_broker.py | 8 +++--- .../magma/compiler/builders/test_builder.py | 8 +++--- .../compiler/channels/test_pypychannel.py | 14 +++++----- .../magma/compiler/test_channel_builder.py | 8 +++--- tests/lava/magma/core/model/py/test_ports.py | 8 +++--- .../magma/runtime/test_runtime_service.py | 13 +++++----- 9 files changed, 82 insertions(+), 31 deletions(-) create mode 100644 src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 10ec2b0cc..4068aa2f7 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ @@ -345,7 +345,7 @@ def __init__( """ nbytes = self.nbytes(shape, dtype) smm = message_infrastructure.smm - shm = smm.SharedMemory(int(nbytes * size)) + shm = smm.shared_memory(int(nbytes * size)) req = Semaphore(0) ack = Semaphore(0) proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes) diff --git a/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py b/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py new file mode 100644 index 000000000..c2f2cb4dc --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py @@ -0,0 +1,26 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ + + +from multiprocessing.managers import SharedMemoryManager + + +class CloseOnShutdownSMM: + def __init__(self): + self._shms = [] + self._smm = SharedMemoryManager() + + def start(self): + self._smm.start() + + def shutdown(self): + for shm in self._shms: + shm.close() + self._shms.clear() + self._smm.shutdown() + + def shared_memory(self, size: int): + shm = self._smm.SharedMemory(size) + self._shms.append(shm) + return shm diff --git a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py index b278d3714..b2d313c43 100644 --- a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ @@ -11,11 +11,14 @@ import multiprocessing as mp import os -from multiprocessing.managers import SharedMemoryManager import traceback from lava.magma.compiler.channels.interfaces import ChannelType, Channel from lava.magma.compiler.channels.pypychannel import PyPyChannel +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM, +) + try: from lava.magma.compiler.channels.cpychannel import \ CPyChannel, PyCChannel @@ -50,6 +53,7 @@ def __init__(self, *args, **kwargs): mp.Process.__init__(self, *args, **kwargs) self._pconn, self._cconn = mp.Pipe() self._exception = None + self._is_done = False def run(self): try: @@ -59,10 +63,20 @@ def run(self): tb = traceback.format_exc() self._cconn.send((e, tb)) + def join(self): + if not self._is_done: + super().join() + super().close() + if self._pconn.poll(): + self._exception = self._pconn.recv() + self._cconn.close() + self._pconn.close() + self._is_done = True + @property def exception(self): """Exception property.""" - if self._pconn.poll(): + if not self._is_done and self._pconn.poll(): self._exception = self._pconn.recv() return self._exception @@ -71,7 +85,7 @@ class MultiProcessing(MessageInfrastructureInterface): """Implements message passing using shared memory and multiprocessing""" def __init__(self): - self._smm: ty.Optional[SharedMemoryManager] = None + self._smm = None self._actors: ty.List[SystemProcess] = [] @property @@ -86,7 +100,7 @@ def smm(self): def start(self): """Starts the shared memory manager""" - self._smm = SharedMemoryManager() + self._smm = CloseOnShutdownSMM() self._smm.start() def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ diff --git a/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py b/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py index 9b3b30833..b816a8e98 100644 --- a/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py +++ b/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py @@ -1,17 +1,19 @@ -# Copyright (C) 2022 Intel Corporation +# Copyright (C) 2022-23 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ import threading from abc import ABC import logging -from multiprocessing.managers import SharedMemoryManager import numpy as np import typing as ty from lava.magma.compiler.channels.interfaces import AbstractCspPort from lava.magma.compiler.channels.pypychannel import CspSelector, PyPyChannel +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM, +) try: from nxcore.arch.base.nxboard import NxBoard @@ -99,7 +101,7 @@ def __init__(self, # Need to pill for COutPorts self.c_outports_to_poll: ty.Dict[Channel, COutPort] = {} - self.smm: SharedMemoryManager = SharedMemoryManager() + self.smm: CloseOnShutdownSMM = CloseOnShutdownSMM() self.mgmt_channel: ty.Optional[PyPyChannel] = None self.grpc_stopping_event: ty.Optional[threading.Event] = None self.port_poller: ty.Optional[threading.Thread] = None diff --git a/tests/lava/magma/compiler/builders/test_builder.py b/tests/lava/magma/compiler/builders/test_builder.py index df83ad12f..0457fcf62 100644 --- a/tests/lava/magma/compiler/builders/test_builder.py +++ b/tests/lava/magma/compiler/builders/test_builder.py @@ -1,8 +1,7 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ -from multiprocessing.managers import SharedMemoryManager import typing as ty import unittest import numpy as np @@ -28,6 +27,9 @@ from lava.magma.core.process.process import AbstractProcess from lava.magma.core.process.variable import Var from lava.magma.core.resources import CPU +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM, +) class MockMessageInterface: @@ -41,7 +43,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type: class TestChannelBuilder(unittest.TestCase): def test_channel_builder(self): """Tests Channel Builder creation""" - smm: SharedMemoryManager = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: port_initializer: PortInitializer = PortInitializer( name="mock", shape=(1, 2), d_type=np.int32, diff --git a/tests/lava/magma/compiler/channels/test_pypychannel.py b/tests/lava/magma/compiler/channels/test_pypychannel.py index 7e8a6e95a..315bbb6e8 100644 --- a/tests/lava/magma/compiler/channels/test_pypychannel.py +++ b/tests/lava/magma/compiler/channels/test_pypychannel.py @@ -1,13 +1,15 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ import numpy as np import unittest from multiprocessing import Process -from multiprocessing.managers import SharedMemoryManager from lava.magma.compiler.channels.pypychannel import PyPyChannel +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM +) class MockInterface: @@ -30,7 +32,7 @@ def get_channel(smm, data, size, name="test_channel") -> PyPyChannel: class TestPyPyChannelSingleProcess(unittest.TestCase): def test_send_recv_single_process(self): - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: smm.start() @@ -47,7 +49,7 @@ def test_send_recv_single_process(self): smm.shutdown() def test_send_recv_single_process_2d_data(self): - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: smm.start() @@ -64,7 +66,7 @@ def test_send_recv_single_process_2d_data(self): smm.shutdown() def test_send_recv_single_process_1d_data(self): - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: smm.start() @@ -127,7 +129,7 @@ def buffer(shape, dst_port, src_port): class TestPyPyChannelMultiProcess(unittest.TestCase): def test_send_recv_relay(self): - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: smm.start() data = np.ones((2, 2)) diff --git a/tests/lava/magma/compiler/test_channel_builder.py b/tests/lava/magma/compiler/test_channel_builder.py index 30b6490e7..a9aacfef9 100644 --- a/tests/lava/magma/compiler/test_channel_builder.py +++ b/tests/lava/magma/compiler/test_channel_builder.py @@ -1,10 +1,9 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ import typing as ty import unittest -from multiprocessing.managers import SharedMemoryManager import numpy as np @@ -16,6 +15,9 @@ CspSendPort, CspRecvPort, ) +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM +) class MockMessageInterface: @@ -29,7 +31,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type: class TestChannelBuilder(unittest.TestCase): def test_channel_builder(self): """Tests Channel Builder creation""" - smm: SharedMemoryManager = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: port_initializer: PortInitializer = PortInitializer( name="mock", shape=(1, 2), d_type=np.int32, diff --git a/tests/lava/magma/core/model/py/test_ports.py b/tests/lava/magma/core/model/py/test_ports.py index 84f49a011..a2c8b26e9 100644 --- a/tests/lava/magma/core/model/py/test_ports.py +++ b/tests/lava/magma/core/model/py/test_ports.py @@ -1,10 +1,9 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ import unittest import time -from multiprocessing.managers import SharedMemoryManager import numpy as np import typing as ty import functools as ft @@ -21,6 +20,9 @@ PyOutPortVectorDense, VirtualPortTransformer, IdentityTransformer) +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM +) class MockInterface: @@ -44,7 +46,7 @@ class TestPyPorts(unittest.TestCase): def probe_test_routine(self, cls): """Routine that tests probe method on one implementation of PyInPorts. """ - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() try: smm.start() diff --git a/tests/lava/magma/runtime/test_runtime_service.py b/tests/lava/magma/runtime/test_runtime_service.py index db6c237d7..4a4e48ab8 100644 --- a/tests/lava/magma/runtime/test_runtime_service.py +++ b/tests/lava/magma/runtime/test_runtime_service.py @@ -1,9 +1,8 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ import unittest -from multiprocessing.managers import SharedMemoryManager import numpy as np from lava.magma.compiler.channels.pypychannel import PyPyChannel @@ -11,8 +10,10 @@ from lava.magma.core.model.py.model import AbstractPyProcessModel from lava.magma.core.process.process import AbstractProcess from lava.magma.core.sync.protocol import AbstractSyncProtocol -from lava.magma.runtime.runtime_services.runtime_service import \ - PyRuntimeService +from lava.magma.runtime.runtime_services.runtime_service import PyRuntimeService +from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( + CloseOnShutdownSMM, +) class MockInterface: @@ -20,7 +21,7 @@ def __init__(self, smm): self.smm = smm -def create_channel(smm: SharedMemoryManager, name: str): +def create_channel(smm: CloseOnShutdownSMM, name: str): mock = MockInterface(smm=smm) return PyPyChannel( mock, @@ -68,7 +69,7 @@ def test_runtime_service_start_run(self): pm = SimpleProcessModel(proc_params={}) sp = SimpleSyncProtocol() rs = SimplePyRuntimeService(protocol=sp) - smm = SharedMemoryManager() + smm = CloseOnShutdownSMM() smm.start() runtime_to_service = create_channel(smm, name="runtime_to_service") service_to_runtime = create_channel(smm, name="service_to_runtime") From 7efe7fb3273536e0f8bfd4f321e72521d8173447 Mon Sep 17 00:00:00 2001 From: ssgier Date: Sun, 26 Feb 2023 08:09:45 +0100 Subject: [PATCH 02/11] Plug more leaks --- .../magma/compiler/channels/pypychannel.py | 37 +++++++++++++------ .../close_on_shutdown_smm.py | 10 +++-- src/lava/magma/runtime/runtime.py | 20 +++++++--- 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 4068aa2f7..c7499b95c 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -29,6 +29,9 @@ class Proto: nbytes: int +_SEMAPHORE_TIMEOUT = 0.1 + + class CspSendPort(AbstractCspSendPort): """ CspSendPort is a low level send port implementation based on CSP @@ -101,11 +104,13 @@ def start(self): def _ack_callback(self): try: while not self._done: - self._ack.acquire() - not_full = self.probe() - self._semaphore.release() - if self.observer and not not_full: - self.observer() + if self._ack.acquire(timeout=_SEMAPHORE_TIMEOUT): + not_full = self.probe() + self._semaphore.release() + if self.observer and not not_full: + self.observer() + self._req = None + self._ack = None except EOFError: pass @@ -131,7 +136,10 @@ def send(self, data): self._req.release() def join(self): - self._done = True + if not self._done: + self._ack = None + self._req = None + self._done = True class CspRecvQueue(Queue): @@ -243,11 +251,13 @@ def start(self): def _req_callback(self): try: while not self._done: - self._req.acquire() - not_empty = self.probe() - self._queue.put_nowait(0) - if self.observer and not not_empty: - self.observer() + if self._req.acquire(timeout=_SEMAPHORE_TIMEOUT): + not_empty = self.probe() + self._queue.put_nowait(0) + if self.observer and not not_empty: + self.observer() + self._req = None + self._ack = None except EOFError: pass @@ -279,7 +289,10 @@ def recv(self): return result def join(self): - self._done = True + if not self._done: + self._ack = None + self._req = None + self._done = True class CspSelector: diff --git a/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py b/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py index c2f2cb4dc..c3ddde157 100644 --- a/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py +++ b/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py @@ -15,10 +15,12 @@ def start(self): self._smm.start() def shutdown(self): - for shm in self._shms: - shm.close() - self._shms.clear() - self._smm.shutdown() + if self._smm is not None: + for shm in self._shms: + shm.close() + self._shms.clear() + self._smm.shutdown() + self._smm = None def shared_memory(self, size: int): shm = self._smm.SharedMemory(size) diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index caa1ff425..b2dcab63b 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -128,6 +128,7 @@ def __init__(self, self._req_stop: bool = False self.runtime_to_service: ty.Iterable[CspSendPort] = [] self.service_to_runtime: ty.Iterable[CspRecvPort] = [] + self._open_ports = [] def __del__(self): """On destruction, terminate Runtime automatically to @@ -191,6 +192,9 @@ def _build_channels(self): self._messaging_infrastructure ) + self._open_ports.append(channel.src_port) + self._open_ports.append(channel.dst_port) + self._get_process_builder_for_process( channel_builder.src_process).set_csp_ports( [channel.src_port]) @@ -214,6 +218,10 @@ def _build_sync_channels(self): channel: Channel = sync_channel_builder.build( self._messaging_infrastructure ) + + self._open_ports.append(channel.src_port) + self._open_ports.append(channel.dst_port) + if isinstance(sync_channel_builder, RuntimeChannelBuilderMp): if isinstance(sync_channel_builder.src_process, RuntimeServiceBuilder): @@ -407,11 +415,11 @@ def stop(self): def join(self): """Join all ports and processes""" - for port in self.runtime_to_service: - port.join() - for port in self.service_to_runtime: + for port in self._open_ports: port.join() + self._open_ports.clear() + def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None): """Sets value of a variable with id 'var_id'.""" if self._is_running: @@ -453,7 +461,8 @@ def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None): buffer = buffer[idx] buffer_shape: ty.Tuple[int, ...] = buffer.shape num_items: int = np.prod(buffer_shape).item() - reshape_order = 'F' if isinstance(ev, LoihiSynapseVarModel) else 'C' + reshape_order = 'F' if isinstance( + ev, LoihiSynapseVarModel) else 'C' buffer = buffer.reshape((1, num_items), order=reshape_order) # 3. Send [NUM_ITEMS, DATA1, DATA2, ...] @@ -505,7 +514,8 @@ def get_var(self, var_id: int, idx: np.ndarray = None) -> np.ndarray: buffer[0, i] = data_port.recv()[0] # 3. Reshape result and return - reshape_order = 'F' if isinstance(ev, LoihiSynapseVarModel) else 'C' + reshape_order = 'F' if isinstance( + ev, LoihiSynapseVarModel) else 'C' buffer = buffer.reshape(ev.shape, order=reshape_order) if idx: return buffer[idx] From 46d5666cc67346273cb78063b1a98e3ad6c8bf4f Mon Sep 17 00:00:00 2001 From: ssgier Date: Sun, 26 Feb 2023 15:41:37 +0100 Subject: [PATCH 03/11] Improve code style --- .../magma/compiler/channels/pypychannel.py | 2 +- .../close_on_shutdown_smm.py | 28 ---------------- .../message_infrastructure/multiprocessing.py | 8 ++--- .../shared_memory_manager.py | 32 +++++++++++++++++++ src/lava/magma/runtime/runtime.py | 6 ++-- .../channel_broker/channel_broker.py | 6 ++-- .../magma/compiler/builders/test_builder.py | 6 ++-- .../compiler/channels/test_pypychannel.py | 12 +++---- .../magma/compiler/test_channel_builder.py | 6 ++-- tests/lava/magma/core/model/py/test_ports.py | 6 ++-- .../magma/runtime/test_runtime_service.py | 8 ++--- 11 files changed, 62 insertions(+), 58 deletions(-) delete mode 100644 src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py create mode 100644 src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index c7499b95c..9596f3483 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -358,7 +358,7 @@ def __init__( """ nbytes = self.nbytes(shape, dtype) smm = message_infrastructure.smm - shm = smm.shared_memory(int(nbytes * size)) + shm = smm.create_shared_memory(int(nbytes * size)) req = Semaphore(0) ack = Semaphore(0) proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes) diff --git a/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py b/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py deleted file mode 100644 index c3ddde157..000000000 --- a/src/lava/magma/runtime/message_infrastructure/close_on_shutdown_smm.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (C) 2023 Intel Corporation -# SPDX-License-Identifier: LGPL 2.1 or later -# See: https://spdx.org/licenses/ - - -from multiprocessing.managers import SharedMemoryManager - - -class CloseOnShutdownSMM: - def __init__(self): - self._shms = [] - self._smm = SharedMemoryManager() - - def start(self): - self._smm.start() - - def shutdown(self): - if self._smm is not None: - for shm in self._shms: - shm.close() - self._shms.clear() - self._smm.shutdown() - self._smm = None - - def shared_memory(self, size: int): - shm = self._smm.SharedMemory(size) - self._shms.append(shm) - return shm diff --git a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py index b2d313c43..5d660114d 100644 --- a/src/lava/magma/runtime/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/multiprocessing.py @@ -15,8 +15,8 @@ from lava.magma.compiler.channels.interfaces import ChannelType, Channel from lava.magma.compiler.channels.pypychannel import PyPyChannel -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM, +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager, ) try: @@ -85,7 +85,7 @@ class MultiProcessing(MessageInfrastructureInterface): """Implements message passing using shared memory and multiprocessing""" def __init__(self): - self._smm = None + self._smm: ty.Optional[SharedMemoryManager] = None self._actors: ty.List[SystemProcess] = [] @property @@ -100,7 +100,7 @@ def smm(self): def start(self): """Starts the shared memory manager""" - self._smm = CloseOnShutdownSMM() + self._smm = SharedMemoryManager() self._smm.start() def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ diff --git a/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py new file mode 100644 index 000000000..c9acd3429 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py @@ -0,0 +1,32 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ + +from multiprocessing.managers import (SharedMemoryManager as + DelegateManager + ) +from multiprocessing.shared_memory import SharedMemory +import typing as ty + + +class SharedMemoryManager: + def __init__(self): + self._shared_memory_handles: ty.List[SharedMemory] = [] + self._inner_shared_memory_manager: ty.Optional[DelegateManager] = ( + DelegateManager()) + + def start(self) -> None: + self._inner_shared_memory_manager.start() + + def shutdown(self): + if self._inner_shared_memory_manager is not None: + for shm in self._shared_memory_handles: + shm.close() + self._shared_memory_handles.clear() + self._inner_shared_memory_manager.shutdown() + self._inner_shared_memory_manager = None + + def create_shared_memory(self, size: int) -> SharedMemory: + shm = self._inner_shared_memory_manager.SharedMemory(size) + self._shared_memory_handles.append(shm) + return shm diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index b2dcab63b..d704bdec9 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-22 Intel Corporation +# Copyright (C) 2021-23 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ @@ -34,7 +34,7 @@ from lava.magma.compiler.builders.py_builder import PyProcessBuilder from lava.magma.compiler.builders.runtimeservice_builder import \ RuntimeServiceBuilder -from lava.magma.compiler.channels.interfaces import Channel +from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel from lava.magma.compiler.executable import Executable from lava.magma.compiler.node import NodeConfig from lava.magma.core.process.ports.ports import create_port_id @@ -128,7 +128,7 @@ def __init__(self, self._req_stop: bool = False self.runtime_to_service: ty.Iterable[CspSendPort] = [] self.service_to_runtime: ty.Iterable[CspRecvPort] = [] - self._open_ports = [] + self._open_ports: ty.List[AbstractCspPort] = [] def __del__(self): """On destruction, terminate Runtime automatically to diff --git a/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py b/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py index b816a8e98..11db68cd7 100644 --- a/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py +++ b/src/lava/magma/runtime/runtime_services/channel_broker/channel_broker.py @@ -11,8 +11,8 @@ from lava.magma.compiler.channels.interfaces import AbstractCspPort from lava.magma.compiler.channels.pypychannel import CspSelector, PyPyChannel -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM, +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager, ) try: @@ -101,7 +101,7 @@ def __init__(self, # Need to pill for COutPorts self.c_outports_to_poll: ty.Dict[Channel, COutPort] = {} - self.smm: CloseOnShutdownSMM = CloseOnShutdownSMM() + self.smm: SharedMemoryManager = SharedMemoryManager() self.mgmt_channel: ty.Optional[PyPyChannel] = None self.grpc_stopping_event: ty.Optional[threading.Event] = None self.port_poller: ty.Optional[threading.Thread] = None diff --git a/tests/lava/magma/compiler/builders/test_builder.py b/tests/lava/magma/compiler/builders/test_builder.py index 0457fcf62..06a64b5b0 100644 --- a/tests/lava/magma/compiler/builders/test_builder.py +++ b/tests/lava/magma/compiler/builders/test_builder.py @@ -27,8 +27,8 @@ from lava.magma.core.process.process import AbstractProcess from lava.magma.core.process.variable import Var from lava.magma.core.resources import CPU -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM, +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager, ) @@ -43,7 +43,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type: class TestChannelBuilder(unittest.TestCase): def test_channel_builder(self): """Tests Channel Builder creation""" - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: port_initializer: PortInitializer = PortInitializer( name="mock", shape=(1, 2), d_type=np.int32, diff --git a/tests/lava/magma/compiler/channels/test_pypychannel.py b/tests/lava/magma/compiler/channels/test_pypychannel.py index 315bbb6e8..62871d01a 100644 --- a/tests/lava/magma/compiler/channels/test_pypychannel.py +++ b/tests/lava/magma/compiler/channels/test_pypychannel.py @@ -7,8 +7,8 @@ from multiprocessing import Process from lava.magma.compiler.channels.pypychannel import PyPyChannel -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager ) @@ -32,7 +32,7 @@ def get_channel(smm, data, size, name="test_channel") -> PyPyChannel: class TestPyPyChannelSingleProcess(unittest.TestCase): def test_send_recv_single_process(self): - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: smm.start() @@ -49,7 +49,7 @@ def test_send_recv_single_process(self): smm.shutdown() def test_send_recv_single_process_2d_data(self): - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: smm.start() @@ -66,7 +66,7 @@ def test_send_recv_single_process_2d_data(self): smm.shutdown() def test_send_recv_single_process_1d_data(self): - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: smm.start() @@ -129,7 +129,7 @@ def buffer(shape, dst_port, src_port): class TestPyPyChannelMultiProcess(unittest.TestCase): def test_send_recv_relay(self): - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: smm.start() data = np.ones((2, 2)) diff --git a/tests/lava/magma/compiler/test_channel_builder.py b/tests/lava/magma/compiler/test_channel_builder.py index a9aacfef9..da5142e56 100644 --- a/tests/lava/magma/compiler/test_channel_builder.py +++ b/tests/lava/magma/compiler/test_channel_builder.py @@ -15,8 +15,8 @@ CspSendPort, CspRecvPort, ) -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager ) @@ -31,7 +31,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type: class TestChannelBuilder(unittest.TestCase): def test_channel_builder(self): """Tests Channel Builder creation""" - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: port_initializer: PortInitializer = PortInitializer( name="mock", shape=(1, 2), d_type=np.int32, diff --git a/tests/lava/magma/core/model/py/test_ports.py b/tests/lava/magma/core/model/py/test_ports.py index a2c8b26e9..31bf2c325 100644 --- a/tests/lava/magma/core/model/py/test_ports.py +++ b/tests/lava/magma/core/model/py/test_ports.py @@ -20,8 +20,8 @@ PyOutPortVectorDense, VirtualPortTransformer, IdentityTransformer) -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager ) @@ -46,7 +46,7 @@ class TestPyPorts(unittest.TestCase): def probe_test_routine(self, cls): """Routine that tests probe method on one implementation of PyInPorts. """ - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() try: smm.start() diff --git a/tests/lava/magma/runtime/test_runtime_service.py b/tests/lava/magma/runtime/test_runtime_service.py index 4a4e48ab8..77dad457e 100644 --- a/tests/lava/magma/runtime/test_runtime_service.py +++ b/tests/lava/magma/runtime/test_runtime_service.py @@ -11,8 +11,8 @@ from lava.magma.core.process.process import AbstractProcess from lava.magma.core.sync.protocol import AbstractSyncProtocol from lava.magma.runtime.runtime_services.runtime_service import PyRuntimeService -from lava.magma.runtime.message_infrastructure.close_on_shutdown_smm import ( - CloseOnShutdownSMM, +from lava.magma.runtime.message_infrastructure.shared_memory_manager import ( + SharedMemoryManager, ) @@ -21,7 +21,7 @@ def __init__(self, smm): self.smm = smm -def create_channel(smm: CloseOnShutdownSMM, name: str): +def create_channel(smm: SharedMemoryManager, name: str): mock = MockInterface(smm=smm) return PyPyChannel( mock, @@ -69,7 +69,7 @@ def test_runtime_service_start_run(self): pm = SimpleProcessModel(proc_params={}) sp = SimpleSyncProtocol() rs = SimplePyRuntimeService(protocol=sp) - smm = CloseOnShutdownSMM() + smm = SharedMemoryManager() smm.start() runtime_to_service = create_channel(smm, name="runtime_to_service") service_to_runtime = create_channel(smm, name="service_to_runtime") From 00066d3d9d8c993ba1306d0676e48c7ed8842f1e Mon Sep 17 00:00:00 2001 From: ssgier Date: Sun, 26 Feb 2023 19:28:22 +0100 Subject: [PATCH 04/11] Check higher wait time on CI run --- tests/lava/magma/core/model/py/test_ports.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lava/magma/core/model/py/test_ports.py b/tests/lava/magma/core/model/py/test_ports.py index 31bf2c325..04da6a08b 100644 --- a/tests/lava/magma/core/model/py/test_ports.py +++ b/tests/lava/magma/core/model/py/test_ports.py @@ -82,7 +82,7 @@ def probe_test_routine(self, cls): # Send data through second PyOutPort send_py_port_2.send(data) # Sleep to let message reach the PyInPort - time.sleep(0.001) + time.sleep(0.01) # Probe PyInPort probe_value = recv_py_port.probe() From 51b89c2a43d03da374698f8f3de0e1952492e6af Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 17:10:41 +0100 Subject: [PATCH 05/11] Better fix for semaphore logic --- .../magma/compiler/channels/pypychannel.py | 39 +++++++++---------- tests/lava/magma/core/model/py/test_ports.py | 2 +- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 9596f3483..f52d593cf 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -29,9 +29,6 @@ class Proto: nbytes: int -_SEMAPHORE_TIMEOUT = 0.1 - - class CspSendPort(AbstractCspSendPort): """ CspSendPort is a low level send port implementation based on CSP @@ -103,14 +100,11 @@ def start(self): def _ack_callback(self): try: - while not self._done: - if self._ack.acquire(timeout=_SEMAPHORE_TIMEOUT): - not_full = self.probe() - self._semaphore.release() - if self.observer and not not_full: - self.observer() - self._req = None - self._ack = None + while self._ack.acquire() and not self._done: + not_full = self.probe() + self._semaphore.release() + if self.observer and not not_full: + self.observer() except EOFError: pass @@ -137,9 +131,12 @@ def send(self, data): def join(self): if not self._done: + self._done = True + if self.thread is not None: + self._ack.release() + self.thread.join() self._ack = None self._req = None - self._done = True class CspRecvQueue(Queue): @@ -250,14 +247,11 @@ def start(self): def _req_callback(self): try: - while not self._done: - if self._req.acquire(timeout=_SEMAPHORE_TIMEOUT): - not_empty = self.probe() - self._queue.put_nowait(0) - if self.observer and not not_empty: - self.observer() - self._req = None - self._ack = None + while self._req.acquire() and not self._done: + not_empty = self.probe() + self._queue.put_nowait(0) + if self.observer and not not_empty: + self.observer() except EOFError: pass @@ -290,9 +284,12 @@ def recv(self): def join(self): if not self._done: + self._done = True + if self.thread is not None: + self._req.release() + self.thread.join() self._ack = None self._req = None - self._done = True class CspSelector: diff --git a/tests/lava/magma/core/model/py/test_ports.py b/tests/lava/magma/core/model/py/test_ports.py index 04da6a08b..31bf2c325 100644 --- a/tests/lava/magma/core/model/py/test_ports.py +++ b/tests/lava/magma/core/model/py/test_ports.py @@ -82,7 +82,7 @@ def probe_test_routine(self, cls): # Send data through second PyOutPort send_py_port_2.send(data) # Sleep to let message reach the PyInPort - time.sleep(0.01) + time.sleep(0.001) # Probe PyInPort probe_value = recv_py_port.probe() From f91f4b8b1eae398096346d635c45718e978ba3d7 Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 17:11:07 +0100 Subject: [PATCH 06/11] Add unit test for file descriptor leakage --- tests/lava/magma/runtime/test_leakage.py | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 tests/lava/magma/runtime/test_leakage.py diff --git a/tests/lava/magma/runtime/test_leakage.py b/tests/lava/magma/runtime/test_leakage.py new file mode 100644 index 000000000..6279f29f1 --- /dev/null +++ b/tests/lava/magma/runtime/test_leakage.py @@ -0,0 +1,39 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest + +import numpy as np +import psutil + +from lava.magma.core.run_conditions import RunSteps +from lava.proc.dense.process import Dense +from lava.proc.lif.process import LIF +from lava.magma.core.run_configs import Loihi1SimCfg + + +def run_simulation() -> None: + lif1 = LIF(shape=(1,)) + dense = Dense(weights=np.eye(1)) + lif2 = LIF(shape=(1,)) + lif1.out_ports.s_out.connect(dense.in_ports.s_in) + dense.out_ports.a_out.connect(lif2.in_ports.a_in) + lif1.run(condition=RunSteps(num_steps=10), run_cfg=Loihi1SimCfg()) + lif1.stop() + + +class TestLeakage(unittest.TestCase): + def test_leakage(self): + + # initial run to make sure all components are initialized + run_simulation() + + process = psutil.Process() + num_fds_before = process.num_fds() + + # file descriptors opened by further runs should be closed + run_simulation() + + num_fds_after = process.num_fds() + self.assertEqual(num_fds_before, num_fds_after) From 9537d24b67e96f6695129dc47d77f8d429819c83 Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 18:45:53 +0100 Subject: [PATCH 07/11] Let threads terminate asynchronously --- src/lava/magma/compiler/channels/pypychannel.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index f52d593cf..6054dc35a 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -94,13 +94,14 @@ def start(self): self.thread = Thread( target=self._ack_callback, name="{}.send".format(self._name), + args=(self._ack,), daemon=True, ) self.thread.start() - def _ack_callback(self): + def _ack_callback(self, ack): try: - while self._ack.acquire() and not self._done: + while ack.acquire() and not self._done: not_full = self.probe() self._semaphore.release() if self.observer and not not_full: @@ -134,7 +135,6 @@ def join(self): self._done = True if self.thread is not None: self._ack.release() - self.thread.join() self._ack = None self._req = None @@ -241,13 +241,14 @@ def start(self): self.thread = Thread( target=self._req_callback, name="{}.send".format(self._name), + args=(self._req,), daemon=True, ) self.thread.start() - def _req_callback(self): + def _req_callback(self, req): try: - while self._req.acquire() and not self._done: + while req.acquire() and not self._done: not_empty = self.probe() self._queue.put_nowait(0) if self.observer and not not_empty: @@ -287,7 +288,6 @@ def join(self): self._done = True if self.thread is not None: self._req.release() - self.thread.join() self._ack = None self._req = None From 976dbcb7bf9ab864fb0822269327f7d6075cd7f5 Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 19:12:18 +0100 Subject: [PATCH 08/11] Disable leakage test on Windows (not applicable) --- tests/lava/magma/runtime/test_leakage.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/lava/magma/runtime/test_leakage.py b/tests/lava/magma/runtime/test_leakage.py index 6279f29f1..672b7489f 100644 --- a/tests/lava/magma/runtime/test_leakage.py +++ b/tests/lava/magma/runtime/test_leakage.py @@ -3,6 +3,7 @@ # See: https://spdx.org/licenses/ import unittest +import platform import numpy as np import psutil @@ -24,8 +25,10 @@ def run_simulation() -> None: class TestLeakage(unittest.TestCase): + @unittest.skipIf( + platform.system() == "Windows", "Windows has no file descriptors" + ) def test_leakage(self): - # initial run to make sure all components are initialized run_simulation() From 34230ad676ad511b6011f57645a44069cb8a5f30 Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 19:23:03 +0100 Subject: [PATCH 09/11] Add more type hints --- .../runtime/message_infrastructure/shared_memory_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py index c9acd3429..949b52e06 100644 --- a/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py +++ b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py @@ -10,7 +10,7 @@ class SharedMemoryManager: - def __init__(self): + def __init__(self) -> None: self._shared_memory_handles: ty.List[SharedMemory] = [] self._inner_shared_memory_manager: ty.Optional[DelegateManager] = ( DelegateManager()) @@ -18,7 +18,7 @@ def __init__(self): def start(self) -> None: self._inner_shared_memory_manager.start() - def shutdown(self): + def shutdown(self) -> None: if self._inner_shared_memory_manager is not None: for shm in self._shared_memory_handles: shm.close() From 80f10b549a0123fab1c7813742b59baedb9d6faf Mon Sep 17 00:00:00 2001 From: ssgier Date: Tue, 28 Feb 2023 19:32:03 +0100 Subject: [PATCH 10/11] Improve naming --- .../shared_memory_manager.py | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py index 949b52e06..688b8f085 100644 --- a/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py +++ b/src/lava/magma/runtime/message_infrastructure/shared_memory_manager.py @@ -2,9 +2,7 @@ # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ -from multiprocessing.managers import (SharedMemoryManager as - DelegateManager - ) +import multiprocessing.managers as managers from multiprocessing.shared_memory import SharedMemory import typing as ty @@ -12,21 +10,22 @@ class SharedMemoryManager: def __init__(self) -> None: self._shared_memory_handles: ty.List[SharedMemory] = [] - self._inner_shared_memory_manager: ty.Optional[DelegateManager] = ( - DelegateManager()) + self._manager: ty.Optional[ + managers.SharedMemoryManager + ] = managers.SharedMemoryManager() def start(self) -> None: - self._inner_shared_memory_manager.start() + self._manager.start() def shutdown(self) -> None: - if self._inner_shared_memory_manager is not None: - for shm in self._shared_memory_handles: - shm.close() + if self._manager is not None: + for handle in self._shared_memory_handles: + handle.close() self._shared_memory_handles.clear() - self._inner_shared_memory_manager.shutdown() - self._inner_shared_memory_manager = None + self._manager.shutdown() + self._manager = None def create_shared_memory(self, size: int) -> SharedMemory: - shm = self._inner_shared_memory_manager.SharedMemory(size) - self._shared_memory_handles.append(shm) - return shm + handle = self._manager.SharedMemory(size) + self._shared_memory_handles.append(handle) + return handle From 4b51dc78528eb126c028d3be32d44fffc731f35b Mon Sep 17 00:00:00 2001 From: ssgier Date: Wed, 8 Mar 2023 19:51:35 +0100 Subject: [PATCH 11/11] Remove hint about too many open files error --- README.md | 2 -- tutorials/in_depth/tutorial01_installing_lava.ipynb | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/README.md b/README.md index 325bab802..b1b1ef9c5 100644 --- a/README.md +++ b/README.md @@ -84,8 +84,6 @@ poetry install source .venv/bin/activate pytest -## When running tests if you see 'OSError: [Errno 24] Too many open files' -## consider setting ulimit using `ulimit -n 4096` ## See FAQ for more info: https://github.com/lava-nc/lava/wiki/Frequently-Asked-Questions-(FAQ)#install ``` diff --git a/tutorials/in_depth/tutorial01_installing_lava.ipynb b/tutorials/in_depth/tutorial01_installing_lava.ipynb index b523dc58c..afb67b042 100644 --- a/tutorials/in_depth/tutorial01_installing_lava.ipynb +++ b/tutorials/in_depth/tutorial01_installing_lava.ipynb @@ -86,8 +86,7 @@ "- source .venv/bin/activate\n", "- pip install -U pip\n", "- pip install lava-nc-\\.tar.gz\n", - "\n", - "## When running tests if you see 'OSError: [Errno 24] Too many open files' consider setting ulimit using `ulimit -n 4096`\n" + "\n" ] }, {