Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
SyntaxColoring committed Jan 11, 2022
1 parent 2fdc77a commit 83d1cb8
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 24 deletions.
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_engine/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def dispatch_threadsafe(self, action: Action) -> None:
return from_thread.run_sync(self._action_dispatcher.dispatch, action)

def setup(self) -> None:
"""Run any necessary setup steps prior to plugin usage."""
"""Run any necessary setup steps prior to plugin usage.
Called when the plugin is added to a `ProtocolEngine`.
"""
...

def teardown(self) -> None:
Expand Down
107 changes: 90 additions & 17 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols."""
from __future__ import annotations
from typing import Callable, Optional, NamedTuple

import asyncio
from typing import (
Callable,
Optional,
NamedTuple,
)

from opentrons.commands.types import CommandMessage as LegacyCommand
from opentrons.hardware_control import HardwareControlAPI
Expand All @@ -14,6 +20,7 @@
LegacyModuleLoadInfo,
)
from .legacy_command_mapper import LegacyCommandMapper
from .thread_async_queue import ThreadAsyncQueue, QueueClosed


class ContextUnsubscribe(NamedTuple):
Expand All @@ -25,6 +32,34 @@ class ContextUnsubscribe(NamedTuple):
module_broker: Callable[[], None]


# NOTES:
#
# Worker thread queues up actions at its leisure
#
# Shoveler task doesn't need to support cancellation
# because the queue of actions should be bounded and small at that point.
#
#
# legacy.plugin.teardown() called automatically at some point
#
# Is it okay for tearing down the plugin to be blocking?
#
# How do I get an async event loop task to block until either:
# * a single item can be retrieved from a queue.Queue (not an asyncio.Queue)
# * the queue is closed
# And sleep (yield) while blocking


# Before we close the ProtocolEngine, any scheduled actions from the
# legacy mapper must have made it in to that ProtocolEngine

# .finalize method

# TODO: Confirm that things get cleaned up when you cancel a protocol.

# TODO: Run to ground why this is hanging


class LegacyContextPlugin(AbstractPlugin):
"""A ProtocolEngine plugin wrapping a legacy ProtocolContext.
Expand Down Expand Up @@ -56,22 +91,31 @@ def __init__(
self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper()
self._unsubscribe: Optional[ContextUnsubscribe] = None

self._actions_to_dispatch = ThreadAsyncQueue[pe_actions.Action]()
self._action_shoveling_task: Optional[asyncio.Task[None]] = None

def setup(self) -> None:
"""Set up subscriptions to the context's message brokers."""
"""Set up the plugin.
Called by Protocol Engine when the plugin is added to it.
* Subscribe to the legacy context's message brokers.
* Prepare a background task to pass legacy commands to Protocol Engine.
"""
context = self._protocol_context

command_unsubscribe = context.broker.subscribe(
topic="command",
handler=self._dispatch_legacy_command,
handler=self._handle_legacy_command,
)
labware_unsubscribe = context.labware_load_broker.subscribe(
callback=self._dispatch_labware_loaded
callback=self._handle_labware_loaded
)
pipette_unsubscribe = context.instrument_load_broker.subscribe(
callback=self._dispatch_instrument_loaded
callback=self._handle_instrument_loaded
)
module_unsubscribe = context.module_load_broker.subscribe(
callback=self._dispatch_module_loaded
callback=self._handle_module_loaded
)

self._unsubscribe = ContextUnsubscribe(
Expand All @@ -81,14 +125,33 @@ def setup(self) -> None:
module_broker=module_unsubscribe,
)

self._action_shoveling_task = asyncio.create_task(self._shovel_all_actions())

def teardown(self) -> None:
"""Unsubscribe from the context's message brokers."""
"""Unsubscribe from the context's message brokers.
Called by Protocol Engine when the engine stops.
"""
if self._unsubscribe:
for unsubscribe in self._unsubscribe:
unsubscribe()

self._unsubcribe = None

async def finalize(self) -> None:
"""Inform the plugin that the APIv2 script has exited.
This method will wait until the plugin is finished sending commands
into the Protocol Engine, and then return.
You must call this method *before* stopping the Protocol Engine,
so those commands make it in first.
"""
# TODO: Think through whether this can be merged into teardown().
# TODO: What if the engine is never started?
if self._action_shoveling_task is not None:
self._actions_to_dispatch.done_putting()
await self._action_shoveling_task

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
if isinstance(action, pe_actions.PlayAction):
Expand All @@ -100,29 +163,39 @@ def handle_action(self, action: pe_actions.Action) -> None:
):
self._hardware_api.pause(HardwarePauseType.PAUSE)

def _dispatch_legacy_command(self, command: LegacyCommand) -> None:
def _handle_legacy_command(self, command: LegacyCommand) -> None:
pe_actions = self._legacy_command_mapper.map_command(command=command)
for action in pe_actions:
self.dispatch_threadsafe(action)
self._actions_to_dispatch.put(action)

def _dispatch_labware_loaded(
self, labware_load_info: LegacyLabwareLoadInfo
) -> None:
def _handle_labware_loaded(self, labware_load_info: LegacyLabwareLoadInfo) -> None:
pe_command = self._legacy_command_mapper.map_labware_load(
labware_load_info=labware_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(action)

def _dispatch_instrument_loaded(
def _handle_instrument_loaded(
self, instrument_load_info: LegacyInstrumentLoadInfo
) -> None:
pe_command = self._legacy_command_mapper.map_instrument_load(
instrument_load_info=instrument_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(action)

def _dispatch_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None:
def _handle_module_loaded(self, module_load_info: LegacyModuleLoadInfo) -> None:
pe_command = self._legacy_command_mapper.map_module_load(
module_load_info=module_load_info
)
self.dispatch_threadsafe(pe_actions.UpdateCommandAction(command=pe_command))
action = pe_actions.UpdateCommandAction(command=pe_command)
self._actions_to_dispatch.put(action)

async def _shovel_all_actions(self) -> None:
while True:
try:
action = await self._actions_to_dispatch.get_async()
except QueueClosed:
break
else:
self.dispatch(action)
24 changes: 19 additions & 5 deletions api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Protocol run control and management."""
import asyncio
from dataclasses import dataclass
from typing import List, Optional

Expand Down Expand Up @@ -179,6 +180,9 @@ def _load_python(self, protocol_source: ProtocolSource) -> None:
protocol=protocol,
context=context,
)
# ensure the engine is stopped gracefully once the
# protocol file stops issuing commands
self._task_queue.set_cleanup_func(self._protocol_engine.finish)

def _load_legacy(
self,
Expand All @@ -187,15 +191,25 @@ def _load_legacy(
protocol = self._legacy_file_reader.read(protocol_source)
context = self._legacy_context_creator.create(protocol)

self._protocol_engine.add_plugin(
LegacyContextPlugin(
hardware_api=self._hardware_api,
protocol_context=context,
)
legacy_plugin = LegacyContextPlugin(
hardware_api=self._hardware_api,
protocol_context=context,
)

self._protocol_engine.add_plugin(legacy_plugin)

self._task_queue.set_run_func(
func=self._legacy_executor.execute,
protocol=protocol,
context=context,
)

async def clean_up(error: Optional[Exception]) -> None:
try:
await legacy_plugin.finalize()
finally:
# ensure the engine is stopped gracefully once the
# protocol file stops issuing commands
await self._protocol_engine.finish(error)

self._task_queue.set_cleanup_func(clean_up)
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,14 @@ def test_pause_action(
decoy.verify(hardware_api.pause(PauseType.PAUSE), times=1)


def test_broker_subscribe_unsubscribe(
async def test_broker_subscribe_unsubscribe(
decoy: Decoy,
legacy_context: LegacyProtocolContext,
legacy_command_mapper: LegacyCommandMapper,
subject: LegacyContextPlugin,
) -> None:
"""It should subscribe to the brokers on setup and unsubscribe on teardown."""
print("Yes, the test is running")
main_unsubscribe: Callable[[], None] = decoy.mock()
labware_unsubscribe: Callable[[], None] = decoy.mock()
instrument_unsubscribe: Callable[[], None] = decoy.mock()
Expand All @@ -134,15 +135,20 @@ def test_broker_subscribe_unsubscribe(
legacy_context.module_load_broker.subscribe(callback=matchers.Anything())
).then_return(module_unsubscribe)

print("About to setup")
subject.setup()
print("Done setup, about to teardown")
await subject.finalize()
subject.teardown()
print("Done teardown")

decoy.verify(
main_unsubscribe(),
labware_unsubscribe(),
instrument_unsubscribe(),
module_unsubscribe(),
)
print("Done verify")


async def test_main_broker_messages(
Expand Down Expand Up @@ -184,6 +190,8 @@ async def test_main_broker_messages(

await to_thread.run_sync(handler, legacy_command)

await subject.finalize()

decoy.verify(
action_dispatcher.dispatch(pe_actions.UpdateCommandAction(engine_command))
)
Expand Down

0 comments on commit 83d1cb8

Please sign in to comment.