Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): do not cache removed modules in thread_manager #7690

Merged
merged 6 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions api/src/opentrons/drivers/temp_deck/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,16 @@ def update_temperature(self, default=None) -> str:
updated_temperature = default or self._temperature.copy()
self._temperature.update(updated_temperature)
else:
# comment
try:
self._update_thread = Thread(
target=self._recursive_update_temperature,
args=[DEFAULT_COMMAND_RETRIES],
name='Tempdeck recursive update temperature')
self._update_thread.start()
except (TempDeckError, SerialException, SerialNoResponse) as e:
return str(e)
def _update():
try:
self._recursive_update_temperature(retries=DEFAULT_COMMAND_RETRIES)
except (OSError, TempDeckError, SerialException, SerialNoResponse):
log.exception("Failed to execute _recursive_update_temperature.")

self._update_thread = Thread(
target=_update(),
name='Tempdeck recursive update temperature')
self._update_thread.start()
return ''

@property
Expand Down
11 changes: 4 additions & 7 deletions api/src/opentrons/drivers/thermocycler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,12 @@ def send(self, command, callback):
self._send_write_fd.write(b'c')

def close(self):
log.debug("Halting TCPoller")
self._halt_write_fd.write(b'q')

def __del__(self):
""" Clean up thread fifos"""
log.debug("Cleaning up thread fifos in TCPoller.")
try:
os.unlink(self._send_path)
except NameError:
Expand Down Expand Up @@ -376,9 +378,10 @@ async def connect(self, port: str) -> 'Thermocycler':
return self

def disconnect(self) -> 'Thermocycler':
if self.is_connected():
if self.is_connected() or self._poller:
self._poller.close() # type: ignore
self._poller.join() # type: ignore
log.debug("TC poller stopped.")
self._poller = None
return self

Expand Down Expand Up @@ -624,9 +627,3 @@ async def enter_programming_mode(self):
await asyncio.sleep(0.05)
trigger_connection.close()
self.disconnect()

def __del__(self):
try:
self._poller.close() # type: ignore
except Exception:
log.exception('Exception while cleaning up Thermocycler:')
2 changes: 1 addition & 1 deletion api/src/opentrons/hardware_control/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,7 @@ def _unregister_modules(self,
for removed_mod in removed_modules:
self._log.info(f"Module {removed_mod.name()} detached"
f" from port {removed_mod.port}")
del removed_mod
removed_mod.cleanup()

async def register_modules(
self,
Expand Down
5 changes: 4 additions & 1 deletion api/src/opentrons/hardware_control/modules/magdeck.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,12 @@ def _disconnect(self):
if self._driver:
self._driver.disconnect(port=self._port)

def __del__(self):
def cleanup(self) -> None:
self._disconnect()

def __del__(self):
self.cleanup()

async def prep_for_update(self) -> str:
self._driver.enter_programming_mode()
new_port = await update.find_bootloader_port()
Expand Down
8 changes: 8 additions & 0 deletions api/src/opentrons/hardware_control/modules/mod_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,11 @@ def name(cls) -> str:
def bootloader(cls) -> UploadFunction:
""" Method used to upload file to this module's bootloader. """
pass

def cleanup(self) -> None:
""" Clean up the module instance.

Clean up, i.e. stop pollers, disconnect serial, etc in preparation for
object destruction.
"""
pass
6 changes: 5 additions & 1 deletion api/src/opentrons/hardware_control/modules/tempdeck.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,14 @@ async def _connect(self):
self._poller = Poller(self._driver)
self._poller.start()

def __del__(self):
def cleanup(self) -> None:
if hasattr(self, '_poller') and self._poller:
log.debug("Stopping tempdeck poller.")
self._poller.stop()

def __del__(self):
self.cleanup()

async def prep_for_update(self) -> str:
model = self._device_info and self._device_info.get('model')
if model in ('temp_deck_v1', 'temp_deck_v1.1', 'temp_deck_v2'):
Expand Down
9 changes: 9 additions & 0 deletions api/src/opentrons/hardware_control/modules/thermocycler.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,12 @@ async def prep_for_update(self):
new_port = await update.find_bootloader_port()

return new_port or self.port

def cleanup(self) -> None:
try:
self._driver.disconnect()
except Exception:
MODULE_LOG.exception('Exception while cleaning up Thermocycler')

def __del__(self):
self.cleanup()
28 changes: 22 additions & 6 deletions api/src/opentrons/hardware_control/thread_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import asyncio
import functools
import weakref
from typing import Generic, TypeVar, Any, Optional
from .adapters import SynchronousAdapter
from .modules.mod_abc import AbstractModule
Expand Down Expand Up @@ -108,7 +109,8 @@ def __init__(self, builder, *args, **kwargs):
self._sync_managed_obj: Optional[SynchronousAdapter] = None
is_running = threading.Event()
self._is_running = is_running

self._cached_modules: weakref.WeakKeyDictionary[
AbstractModule, CallBridger[AbstractModule]] = weakref.WeakKeyDictionary()
# TODO: remove this if we switch to python 3.8
# https://docs.python.org/3/library/asyncio-subprocess.html#subprocess-and-threads # noqa
# On windows, the event loop and system interface is different and
Expand Down Expand Up @@ -175,13 +177,27 @@ def clean_up(self):
loop.call_soon_threadsafe(loop.stop)
except Exception:
pass
object.__getattribute__(self, 'wrap_module').cache_clear()
object.__setattr__(self, '_cached_modules', weakref.WeakKeyDictionary({}))
mcous marked this conversation as resolved.
Show resolved Hide resolved
object.__getattribute__(self, '_thread').join()

@functools.lru_cache(8)
def wrap_module(
self, module: AbstractModule) -> CallBridger[AbstractModule]:
return CallBridger(module, object.__getattribute__(self, '_loop'))
def wrap_module(self, module: AbstractModule) -> CallBridger[AbstractModule]:
""" Return the module object wrapped in a CallBridger and cache it.

The wrapped module objects are cached in `self._cached_modules` so they can be
re-used throughout the module object's life, as creating a wrapper is expensive.
We use a WeakKeyDictionary for caching so that module objects can be
garbage collected when modules are detached (since entries in WeakKeyDictionary
get discarded when there is no longer a strong reference to the key).
"""
wrapper_cache = object.__getattribute__(self, '_cached_modules')
this_module_wrapper = wrapper_cache.get(module)

if this_module_wrapper is None:
this_module_wrapper = CallBridger(module,
object.__getattribute__(self, '_loop'))
wrapper_cache.update({module: this_module_wrapper})

return this_module_wrapper

def __getattribute__(self, attr_name):
# hardware_control.api.API.attached_modules is the only hardware
Expand Down
42 changes: 42 additions & 0 deletions api/tests/opentrons/hardware_control/test_thread_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import pytest
import weakref
from opentrons.hardware_control.modules import ModuleAtPort
from opentrons.hardware_control.thread_manager import ThreadManagerException,\
ThreadManager
from opentrons.hardware_control.api import API


def test_build_fail_raises_exception():
Expand All @@ -12,3 +15,42 @@ def f():
raise Exception()
with pytest.raises(ThreadManagerException):
ThreadManager(f)


def test_module_cache_add_entry():
""" Test that _cached_modules updates correctly."""

mod_names = ['tempdeck']
thread_manager = ThreadManager(API.build_hardware_simulator,
attached_modules=mod_names)

# Test that module gets added to the cache
mods = thread_manager.attached_modules
wrapper_cache = thread_manager._cached_modules.copy()
assert isinstance(wrapper_cache, weakref.WeakKeyDictionary)
assert len(wrapper_cache) == 1
assert mods[0] in wrapper_cache.values()

# Test that calling attached modules doesn't add duplicate entries to cache
mods2 = thread_manager.attached_modules
wrapper_cache2 = thread_manager._cached_modules.copy()
assert len(wrapper_cache2) == 1
assert mods == mods2


async def test_module_cache_remove_entry():
"""Test that module entry gets removed from cache when module detaches. """
mod_names = ['tempdeck', 'magdeck']
thread_manager = ThreadManager(API.build_hardware_simulator,
attached_modules=mod_names)

mods_before = thread_manager.attached_modules
assert len(mods_before) == 2

await thread_manager.register_modules(
removed_mods_at_ports=[
ModuleAtPort(port='/dev/ot_module_sim_tempdeck0',
name='tempdeck')
])
mods_after = thread_manager.attached_modules
assert len(mods_after) == 1