Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plug file descriptor leaks #643

Merged
merged 12 commits into from
Mar 13, 2023
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ poetry install
source .venv/bin/activate
pytest

## When running tests if you see 'OSError: [Errno 24] Too many open files'
## consider setting ulimit using `ulimit -n 4096`
## See FAQ for more info: https://github.com/lava-nc/lava/wiki/Frequently-Asked-Questions-(FAQ)#install
```

Expand Down
30 changes: 20 additions & 10 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/

Expand Down Expand Up @@ -94,14 +94,14 @@ def start(self):
self.thread = Thread(
target=self._ack_callback,
name="{}.send".format(self._name),
args=(self._ack,),
daemon=True,
)
self.thread.start()

def _ack_callback(self):
def _ack_callback(self, ack):
try:
while not self._done:
self._ack.acquire()
while ack.acquire() and not self._done:
not_full = self.probe()
self._semaphore.release()
if self.observer and not not_full:
Expand Down Expand Up @@ -131,7 +131,12 @@ def send(self, data):
self._req.release()

def join(self):
self._done = True
if not self._done:
self._done = True
if self.thread is not None:
self._ack.release()
self._ack = None
self._req = None


class CspRecvQueue(Queue):
Expand Down Expand Up @@ -236,14 +241,14 @@ def start(self):
self.thread = Thread(
target=self._req_callback,
name="{}.send".format(self._name),
args=(self._req,),
daemon=True,
)
self.thread.start()

def _req_callback(self):
def _req_callback(self, req):
try:
while not self._done:
self._req.acquire()
while req.acquire() and not self._done:
not_empty = self.probe()
self._queue.put_nowait(0)
if self.observer and not not_empty:
Expand Down Expand Up @@ -279,7 +284,12 @@ def recv(self):
return result

def join(self):
self._done = True
if not self._done:
self._done = True
if self.thread is not None:
self._req.release()
self._ack = None
self._req = None


class CspSelector:
Expand Down Expand Up @@ -345,7 +355,7 @@ def __init__(
"""
nbytes = self.nbytes(shape, dtype)
smm = message_infrastructure.smm
shm = smm.SharedMemory(int(nbytes * size))
shm = smm.create_shared_memory(int(nbytes * size))
req = Semaphore(0)
ack = Semaphore(0)
proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes)
Expand Down
20 changes: 17 additions & 3 deletions src/lava/magma/runtime/message_infrastructure/multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/

Expand All @@ -11,11 +11,14 @@

import multiprocessing as mp
import os
from multiprocessing.managers import SharedMemoryManager
import traceback

from lava.magma.compiler.channels.interfaces import ChannelType, Channel
from lava.magma.compiler.channels.pypychannel import PyPyChannel
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager,
)

try:
from lava.magma.compiler.channels.cpychannel import \
CPyChannel, PyCChannel
Expand Down Expand Up @@ -50,6 +53,7 @@ def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
self._is_done = False

def run(self):
try:
Expand All @@ -59,10 +63,20 @@ def run(self):
tb = traceback.format_exc()
self._cconn.send((e, tb))

def join(self):
if not self._is_done:
super().join()
super().close()
if self._pconn.poll():
self._exception = self._pconn.recv()
self._cconn.close()
self._pconn.close()
self._is_done = True

@property
def exception(self):
"""Exception property."""
if self._pconn.poll():
if not self._is_done and self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (C) 2023 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/

import multiprocessing.managers as managers
from multiprocessing.shared_memory import SharedMemory
import typing as ty


class SharedMemoryManager:
def __init__(self) -> None:
self._shared_memory_handles: ty.List[SharedMemory] = []
self._manager: ty.Optional[
managers.SharedMemoryManager
] = managers.SharedMemoryManager()

def start(self) -> None:
self._manager.start()

def shutdown(self) -> None:
if self._manager is not None:
for handle in self._shared_memory_handles:
handle.close()
self._shared_memory_handles.clear()
self._manager.shutdown()
self._manager = None

def create_shared_memory(self, size: int) -> SharedMemory:
handle = self._manager.SharedMemory(size)
self._shared_memory_handles.append(handle)
return handle
24 changes: 17 additions & 7 deletions src/lava/magma/runtime/runtime.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/

Expand Down Expand Up @@ -34,7 +34,7 @@
from lava.magma.compiler.builders.py_builder import PyProcessBuilder
from lava.magma.compiler.builders.runtimeservice_builder import \
RuntimeServiceBuilder
from lava.magma.compiler.channels.interfaces import Channel
from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel
from lava.magma.compiler.executable import Executable
from lava.magma.compiler.node import NodeConfig
from lava.magma.core.process.ports.ports import create_port_id
Expand Down Expand Up @@ -128,6 +128,7 @@ def __init__(self,
self._req_stop: bool = False
self.runtime_to_service: ty.Iterable[CspSendPort] = []
self.service_to_runtime: ty.Iterable[CspRecvPort] = []
self._open_ports: ty.List[AbstractCspPort] = []

def __del__(self):
"""On destruction, terminate Runtime automatically to
Expand Down Expand Up @@ -191,6 +192,9 @@ def _build_channels(self):
self._messaging_infrastructure
)

self._open_ports.append(channel.src_port)
self._open_ports.append(channel.dst_port)

self._get_process_builder_for_process(
channel_builder.src_process).set_csp_ports(
[channel.src_port])
Expand All @@ -214,6 +218,10 @@ def _build_sync_channels(self):
channel: Channel = sync_channel_builder.build(
self._messaging_infrastructure
)

self._open_ports.append(channel.src_port)
self._open_ports.append(channel.dst_port)

if isinstance(sync_channel_builder, RuntimeChannelBuilderMp):
if isinstance(sync_channel_builder.src_process,
RuntimeServiceBuilder):
Expand Down Expand Up @@ -407,11 +415,11 @@ def stop(self):

def join(self):
"""Join all ports and processes"""
for port in self.runtime_to_service:
joyeshmishra marked this conversation as resolved.
Show resolved Hide resolved
port.join()
for port in self.service_to_runtime:
for port in self._open_ports:
port.join()

self._open_ports.clear()

def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None):
"""Sets value of a variable with id 'var_id'."""
if self._is_running:
Expand Down Expand Up @@ -453,7 +461,8 @@ def set_var(self, var_id: int, value: np.ndarray, idx: np.ndarray = None):
buffer = buffer[idx]
buffer_shape: ty.Tuple[int, ...] = buffer.shape
num_items: int = np.prod(buffer_shape).item()
reshape_order = 'F' if isinstance(ev, LoihiSynapseVarModel) else 'C'
reshape_order = 'F' if isinstance(
ev, LoihiSynapseVarModel) else 'C'
buffer = buffer.reshape((1, num_items), order=reshape_order)

# 3. Send [NUM_ITEMS, DATA1, DATA2, ...]
Expand Down Expand Up @@ -505,7 +514,8 @@ def get_var(self, var_id: int, idx: np.ndarray = None) -> np.ndarray:
buffer[0, i] = data_port.recv()[0]

# 3. Reshape result and return
reshape_order = 'F' if isinstance(ev, LoihiSynapseVarModel) else 'C'
reshape_order = 'F' if isinstance(
ev, LoihiSynapseVarModel) else 'C'
buffer = buffer.reshape(ev.shape, order=reshape_order)
if idx:
return buffer[idx]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# Copyright (C) 2022 Intel Corporation
# Copyright (C) 2022-23 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/

import threading
from abc import ABC
import logging
from multiprocessing.managers import SharedMemoryManager

import numpy as np
import typing as ty

from lava.magma.compiler.channels.interfaces import AbstractCspPort
from lava.magma.compiler.channels.pypychannel import CspSelector, PyPyChannel
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager,
)

try:
from nxcore.arch.base.nxboard import NxBoard
Expand Down
8 changes: 5 additions & 3 deletions tests/lava/magma/compiler/builders/test_builder.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

from multiprocessing.managers import SharedMemoryManager
import typing as ty
import unittest
import numpy as np
Expand All @@ -28,6 +27,9 @@
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.variable import Var
from lava.magma.core.resources import CPU
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager,
)


class MockMessageInterface:
Expand All @@ -41,7 +43,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type:
class TestChannelBuilder(unittest.TestCase):
def test_channel_builder(self):
"""Tests Channel Builder creation"""
smm: SharedMemoryManager = SharedMemoryManager()
smm = SharedMemoryManager()
try:
port_initializer: PortInitializer = PortInitializer(
name="mock", shape=(1, 2), d_type=np.int32,
Expand Down
6 changes: 4 additions & 2 deletions tests/lava/magma/compiler/channels/test_pypychannel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import numpy as np
import unittest
from multiprocessing import Process
from multiprocessing.managers import SharedMemoryManager

from lava.magma.compiler.channels.pypychannel import PyPyChannel
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager
)


class MockInterface:
Expand Down
8 changes: 5 additions & 3 deletions tests/lava/magma/compiler/test_channel_builder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import typing as ty
import unittest
from multiprocessing.managers import SharedMemoryManager

import numpy as np

Expand All @@ -16,6 +15,9 @@
CspSendPort,
CspRecvPort,
)
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager
)


class MockMessageInterface:
Expand All @@ -29,7 +31,7 @@ def channel_class(self, channel_type: ChannelType) -> ty.Type:
class TestChannelBuilder(unittest.TestCase):
def test_channel_builder(self):
"""Tests Channel Builder creation"""
smm: SharedMemoryManager = SharedMemoryManager()
smm = SharedMemoryManager()
try:
port_initializer: PortInitializer = PortInitializer(
name="mock", shape=(1, 2), d_type=np.int32,
Expand Down
6 changes: 4 additions & 2 deletions tests/lava/magma/core/model/py/test_ports.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Copyright (C) 2021-22 Intel Corporation
# Copyright (C) 2021-23 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import unittest
import time
from multiprocessing.managers import SharedMemoryManager
import numpy as np
import typing as ty
import functools as ft
Expand All @@ -21,6 +20,9 @@
PyOutPortVectorDense,
VirtualPortTransformer,
IdentityTransformer)
from lava.magma.runtime.message_infrastructure.shared_memory_manager import (
SharedMemoryManager
)


class MockInterface:
Expand Down
Loading