diff --git a/api/src/opentrons/protocol_engine/create_protocol_engine.py b/api/src/opentrons/protocol_engine/create_protocol_engine.py index 39268f28bc7..ab91b5fabaa 100644 --- a/api/src/opentrons/protocol_engine/create_protocol_engine.py +++ b/api/src/opentrons/protocol_engine/create_protocol_engine.py @@ -20,6 +20,7 @@ async def create_protocol_engine( config: Config, load_fixed_trash: bool = False, deck_configuration: typing.Optional[DeckConfigurationType] = None, + notify_publishers: typing.Optional[typing.Callable[[], None]] = None, ) -> ProtocolEngine: """Create a ProtocolEngine instance. @@ -28,6 +29,7 @@ async def create_protocol_engine( config: ProtocolEngine configuration. load_fixed_trash: Automatically load fixed trash labware in engine. deck_configuration: The initial deck configuration the engine will be instantiated with. + notify_publishers: Notifies robot server publishers of internal state change. """ deck_data = DeckDataProvider(config.deck_type) deck_definition = await deck_data.get_deck_definition() @@ -45,6 +47,7 @@ async def create_protocol_engine( is_door_open=hardware_api.door_state is DoorState.OPEN, module_calibration_offsets=module_calibration_offsets, deck_configuration=deck_configuration, + notify_publishers=notify_publishers, ) return ProtocolEngine(state_store=state_store, hardware_api=hardware_api) diff --git a/api/src/opentrons/protocol_engine/state/state.py b/api/src/opentrons/protocol_engine/state/state.py index a34f016deab..a472b574e6f 100644 --- a/api/src/opentrons/protocol_engine/state/state.py +++ b/api/src/opentrons/protocol_engine/state/state.py @@ -146,6 +146,7 @@ def __init__( change_notifier: Optional[ChangeNotifier] = None, module_calibration_offsets: Optional[Dict[str, ModuleOffsetData]] = None, deck_configuration: Optional[DeckConfigurationType] = None, + notify_publishers: Optional[Callable[[], None]] = None, ) -> None: """Initialize a StateStore and its substores. @@ -159,6 +160,7 @@ def __init__( change_notifier: Internal state change notifier. module_calibration_offsets: Module offsets to preload. deck_configuration: The initial deck configuration the addressable area store will be instantiated with. + notify_publishers: Notifies robot server publishers of internal state change. """ self._command_store = CommandStore(config=config, is_door_open=is_door_open) self._pipette_store = PipetteStore() @@ -191,6 +193,7 @@ def __init__( ] self._config = config self._change_notifier = change_notifier or ChangeNotifier() + self._notify_robot_server = notify_publishers self._initialize_state() def handle_action(self, action: Action) -> None: @@ -319,3 +322,5 @@ def _update_state_views(self) -> None: self._liquid._state = next_state.liquids self._tips._state = next_state.tips self._change_notifier.notify() + if self._notify_robot_server is not None: + self._notify_robot_server() diff --git a/robot-server/robot_server/app_setup.py b/robot-server/robot_server/app_setup.py index 80fda961119..04147753906 100644 --- a/robot-server/robot_server/app_setup.py +++ b/robot-server/robot_server/app_setup.py @@ -36,7 +36,7 @@ ) from .service.notifications import ( - initialize_notification_client, + initialize_notifications, clean_up_notification_client, ) @@ -106,7 +106,7 @@ async def on_startup() -> None: fbl_mark_persistence_init_complete ], ) - initialize_notification_client( + await initialize_notifications( app_state=app.state, ) diff --git a/robot-server/robot_server/maintenance_runs/maintenance_engine_store.py b/robot-server/robot_server/maintenance_runs/maintenance_engine_store.py index 8e42cbf2cae..3b60f38f533 100644 --- a/robot-server/robot_server/maintenance_runs/maintenance_engine_store.py +++ b/robot-server/robot_server/maintenance_runs/maintenance_engine_store.py @@ -1,6 +1,6 @@ """In-memory storage of ProtocolEngine instances.""" from datetime import datetime -from typing import List, NamedTuple, Optional +from typing import List, NamedTuple, Optional, Callable from opentrons.protocol_engine.types import PostRunHardwareState from opentrons_shared_data.robot.dev_types import RobotType @@ -127,6 +127,7 @@ async def create( run_id: str, created_at: datetime, labware_offsets: List[LabwareOffsetCreate], + notify_publishers: Callable[[], None], deck_configuration: Optional[DeckConfigurationType] = [], ) -> StateSummary: """Create and store a ProtocolRunner and ProtocolEngine for a given Run. @@ -135,6 +136,7 @@ async def create( run_id: The run resource the engine is assigned to. created_at: Run creation datetime labware_offsets: Labware offsets to create the engine with. + notify_publishers: Utilized by the engine to notify publishers of state changes. Returns: The initial equipment and status summary of the engine. @@ -154,6 +156,7 @@ async def create( ), ), deck_configuration=deck_configuration, + notify_publishers=notify_publishers, ) # Using LiveRunner as the runner to allow for future refactor of maintenance runs diff --git a/robot-server/robot_server/maintenance_runs/maintenance_run_data_manager.py b/robot-server/robot_server/maintenance_runs/maintenance_run_data_manager.py index 9857c50a200..084a7552a3a 100644 --- a/robot-server/robot_server/maintenance_runs/maintenance_run_data_manager.py +++ b/robot-server/robot_server/maintenance_runs/maintenance_run_data_manager.py @@ -1,6 +1,6 @@ """Manage current maintenance run data.""" from datetime import datetime -from typing import List, Optional +from typing import List, Optional, Callable from opentrons.protocol_engine import ( EngineStatus, @@ -83,6 +83,7 @@ async def create( created_at: datetime, labware_offsets: List[LabwareOffsetCreate], deck_configuration: DeckConfigurationType, + notify_publishers: Callable[[], None], ) -> MaintenanceRun: """Create a new, current maintenance run. @@ -90,6 +91,7 @@ async def create( run_id: Identifier to assign the new run. created_at: Creation datetime. labware_offsets: Labware offsets to initialize the engine with. + notify_publishers: Utilized by the engine to notify publishers of state changes. Returns: The run resource. @@ -102,6 +104,7 @@ async def create( created_at=created_at, labware_offsets=labware_offsets, deck_configuration=deck_configuration, + notify_publishers=notify_publishers, ) maintenance_run_data = _build_run( diff --git a/robot-server/robot_server/maintenance_runs/router/base_router.py b/robot-server/robot_server/maintenance_runs/router/base_router.py index d2eb71a5798..c115d46509f 100644 --- a/robot-server/robot_server/maintenance_runs/router/base_router.py +++ b/robot-server/robot_server/maintenance_runs/router/base_router.py @@ -5,7 +5,7 @@ import logging from datetime import datetime from textwrap import dedent -from typing import Optional +from typing import Optional, Callable from typing_extensions import Literal from fastapi import APIRouter, Depends, status @@ -39,6 +39,7 @@ get_deck_configuration_store, ) from robot_server.deck_configuration.store import DeckConfigurationStore +from robot_server.service.notifications import get_notify_publishers log = logging.getLogger(__name__) base_router = APIRouter() @@ -155,6 +156,7 @@ async def create_run( deck_configuration_store: DeckConfigurationStore = Depends( get_deck_configuration_store ), + notify_publishers: Callable[[], None] = Depends(get_notify_publishers), ) -> PydanticResponse[SimpleBody[MaintenanceRun]]: """Create a new maintenance run. @@ -166,6 +168,7 @@ async def create_run( is_ok_to_create_maintenance_run: Verify if a maintenance run may be created if a protocol run exists. check_estop: Dependency to verify the estop is in a valid state. deck_configuration_store: Dependency to fetch the deck configuration. + notify_publishers: Utilized by the engine to notify publishers of state changes. """ if not is_ok_to_create_maintenance_run: raise ProtocolRunIsActive( @@ -180,6 +183,7 @@ async def create_run( created_at=created_at, labware_offsets=offsets, deck_configuration=deck_configuration, + notify_publishers=notify_publishers, ) log.info(f'Created an empty run "{run_id}"".') diff --git a/robot-server/robot_server/runs/engine_store.py b/robot-server/robot_server/runs/engine_store.py index aa5b26d4a77..673ff5549f3 100644 --- a/robot-server/robot_server/runs/engine_store.py +++ b/robot-server/robot_server/runs/engine_store.py @@ -1,5 +1,5 @@ """In-memory storage of ProtocolEngine instances.""" -from typing import List, NamedTuple, Optional +from typing import List, NamedTuple, Optional, Callable from opentrons.protocol_engine.types import PostRunHardwareState from opentrons_shared_data.robot.dev_types import RobotType @@ -152,6 +152,7 @@ async def create( run_id: str, labware_offsets: List[LabwareOffsetCreate], deck_configuration: DeckConfigurationType, + notify_publishers: Callable[[], None], protocol: Optional[ProtocolResource], ) -> StateSummary: """Create and store a ProtocolRunner and ProtocolEngine for a given Run. @@ -160,6 +161,7 @@ async def create( run_id: The run resource the engine is assigned to. labware_offsets: Labware offsets to create the engine with. protocol: The protocol to load the runner with, if any. + notify_publishers: Utilized by the engine to notify publishers of state changes. Returns: The initial equipment and status summary of the engine. @@ -184,6 +186,7 @@ async def create( ), load_fixed_trash=load_fixed_trash, deck_configuration=deck_configuration, + notify_publishers=notify_publishers, ) post_run_hardware_state = PostRunHardwareState.HOME_AND_STAY_ENGAGED diff --git a/robot-server/robot_server/runs/router/base_router.py b/robot-server/robot_server/runs/router/base_router.py index fc7b3f223e3..e1e62fdf0d4 100644 --- a/robot-server/robot_server/runs/router/base_router.py +++ b/robot-server/robot_server/runs/router/base_router.py @@ -5,7 +5,7 @@ import logging from datetime import datetime from textwrap import dedent -from typing import Optional, Union +from typing import Optional, Union, Callable from typing_extensions import Literal from fastapi import APIRouter, Depends, status, Query @@ -45,7 +45,7 @@ get_deck_configuration_store, ) from robot_server.deck_configuration.store import DeckConfigurationStore - +from robot_server.service.notifications import get_notify_publishers log = logging.getLogger(__name__) base_router = APIRouter() @@ -144,6 +144,7 @@ async def create_run( deck_configuration_store: DeckConfigurationStore = Depends( get_deck_configuration_store ), + notify_publishers: Callable[[], None] = Depends(get_notify_publishers), ) -> PydanticResponse[SimpleBody[Union[Run, BadRun]]]: """Create a new run. @@ -157,6 +158,7 @@ async def create_run( the new run. check_estop: Dependency to verify the estop is in a valid state. deck_configuration_store: Dependency to fetch the deck configuration. + notify_publishers: Utilized by the engine to notify publishers of state changes. """ protocol_id = request_body.data.protocolId if request_body is not None else None offsets = request_body.data.labwareOffsets if request_body is not None else [] @@ -184,6 +186,7 @@ async def create_run( labware_offsets=offsets, deck_configuration=deck_configuration, protocol=protocol_resource, + notify_publishers=notify_publishers, ) except EngineConflictError as e: raise RunAlreadyActive(detail=str(e)).as_error(status.HTTP_409_CONFLICT) from e diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index 92c7d5e12b5..f0fc28dca37 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -1,6 +1,6 @@ """Manage current and historical run data.""" from datetime import datetime -from typing import List, Optional, Union +from typing import List, Optional, Callable, Union from opentrons_shared_data.labware.labware_definition import LabwareDefinition from opentrons_shared_data.errors.exceptions import InvalidStoredData, EnumeratedError @@ -142,6 +142,7 @@ async def create( created_at: datetime, labware_offsets: List[LabwareOffsetCreate], deck_configuration: DeckConfigurationType, + notify_publishers: Callable[[], None], protocol: Optional[ProtocolResource], ) -> Union[Run, BadRun]: """Create a new, current run. @@ -150,6 +151,7 @@ async def create( run_id: Identifier to assign the new run. created_at: Creation datetime. labware_offsets: Labware offsets to initialize the engine with. + notify_publishers: Utilized by the engine to notify publishers of state changes. Returns: The run resource. @@ -171,6 +173,7 @@ async def create( labware_offsets=labware_offsets, deck_configuration=deck_configuration, protocol=protocol, + notify_publishers=notify_publishers, ) run_resource = self._run_store.insert( run_id=run_id, diff --git a/robot-server/robot_server/service/notifications/__init__.py b/robot-server/robot_server/service/notifications/__init__.py index 202c7fc71f1..7a71a61298d 100644 --- a/robot-server/robot_server/service/notifications/__init__.py +++ b/robot-server/robot_server/service/notifications/__init__.py @@ -1,15 +1,19 @@ +"""Notification service creation and management.""" +from .initialize_notifications import initialize_notifications + from .notification_client import ( NotificationClient, get_notification_client, - initialize_notification_client, clean_up_notification_client, ) +from .publisher_notifier import PublisherNotifier, get_notify_publishers from .publishers import ( MaintenanceRunsPublisher, RunsPublisher, get_maintenance_runs_publisher, get_runs_publisher, ) +from .change_notifier import ChangeNotifier __all__ = [ # main export @@ -18,10 +22,14 @@ "MaintenanceRunsPublisher", "RunsPublisher", # initialization and teardown - "initialize_notification_client", + "initialize_notifications", "clean_up_notification_client", # for use by FastAPI "get_notification_client", + "get_notify_publishers", "get_maintenance_runs_publisher", "get_runs_publisher", + # for testing + "PublisherNotifier", + "ChangeNotifier", ] diff --git a/robot-server/robot_server/service/notifications/change_notifier.py b/robot-server/robot_server/service/notifications/change_notifier.py new file mode 100644 index 00000000000..60c36c420af --- /dev/null +++ b/robot-server/robot_server/service/notifications/change_notifier.py @@ -0,0 +1,23 @@ +"""Simple state change notification interface.""" +import asyncio + + +class ChangeNotifier: + """An interface to emit or subscribe to state change notifications.""" + + def __init__(self) -> None: + """Initialize the ChangeNotifier with an internal Event.""" + self._event = asyncio.Event() + + def notify(self) -> None: + """Notify all `waiters` of a change.""" + self._event.set() + + async def wait(self) -> None: + """Wait until the next change notification.""" + self._event.clear() + await self._event.wait() + + def clear(self) -> None: + """Reset the internal event flag.""" + self._event.clear() diff --git a/robot-server/robot_server/service/notifications/initialize_notifications.py b/robot-server/robot_server/service/notifications/initialize_notifications.py new file mode 100644 index 00000000000..d5569d09eff --- /dev/null +++ b/robot-server/robot_server/service/notifications/initialize_notifications.py @@ -0,0 +1,11 @@ +"""Utilities for initializing the notification service.""" +from server_utils.fastapi_utils.app_state import AppState + +from .notification_client import initialize_notification_client +from .publisher_notifier import initialize_publisher_notifier + + +async def initialize_notifications(app_state: AppState) -> None: + """Initialize the notification system for the given app state.""" + initialize_notification_client(app_state) + await initialize_publisher_notifier(app_state) diff --git a/robot-server/robot_server/service/notifications/notification_client.py b/robot-server/robot_server/service/notifications/notification_client.py index 568d161cf53..6b51eba9cc9 100644 --- a/robot-server/robot_server/service/notifications/notification_client.py +++ b/robot-server/robot_server/service/notifications/notification_client.py @@ -1,3 +1,4 @@ +"""An interface for managing interactions with the notification broker and relevant lifecycle utilities.""" import random import logging import paho.mqtt.client as mqtt @@ -208,7 +209,5 @@ def get_notification_client( app_state: AppState = Depends(get_app_state), ) -> Optional[NotificationClient]: """Intended to be used by endpoint functions as a FastAPI dependency.""" - notification_client: Optional[ - NotificationClient - ] = _notification_client_accessor.get_from(app_state) + notification_client = _notification_client_accessor.get_from(app_state) return notification_client diff --git a/robot-server/robot_server/service/notifications/publisher_notifier.py b/robot-server/robot_server/service/notifications/publisher_notifier.py new file mode 100644 index 00000000000..d1769ac4379 --- /dev/null +++ b/robot-server/robot_server/service/notifications/publisher_notifier.py @@ -0,0 +1,81 @@ +"""Provides an interface for alerting notification publishers to events and related lifecycle utilities.""" +import asyncio +from fastapi import Depends +from typing import Optional, Callable, List, Awaitable + +from server_utils.fastapi_utils.app_state import ( + AppState, + AppStateAccessor, + get_app_state, +) + +from .change_notifier import ChangeNotifier + + +class PublisherNotifier: + """An interface that invokes notification callbacks whenever a generic notify event occurs.""" + + def __init__( + self, + change_notifier: Optional[ChangeNotifier] = None, + ): + self._change_notifier = change_notifier or ChangeNotifier() + self._pe_notifier: Optional[asyncio.Task[None]] = None + self._callbacks: List[Callable[[], Awaitable[None]]] = [] + + def register_publish_callbacks( + self, callbacks: List[Callable[[], Awaitable[None]]] + ): + """Extend the list of callbacks with a given list of callbacks.""" + self._callbacks.extend(callbacks) + + async def _initialize(self) -> None: + """Initializes an instance of PublisherNotifier. This method should only be called once.""" + self._pe_notifier = asyncio.create_task(self._wait_for_event()) + + def _notify_publishers(self) -> None: + """A generic notifier, alerting all `waiters` of a change.""" + self._change_notifier.notify() + + async def _wait_for_event(self) -> None: + """Indefinitely wait for an event to occur, then invoke each callback.""" + while True: + await self._change_notifier.wait() + for callback in self._callbacks: + await callback() + + +_publisher_notifier_accessor: AppStateAccessor[PublisherNotifier] = AppStateAccessor[ + PublisherNotifier +]("publisher_notifier") + + +def get_publisher_notifier( + app_state: AppState = Depends(get_app_state), +) -> PublisherNotifier: + """Intended for use by various publishers only.""" + publisher_notifier = _publisher_notifier_accessor.get_from(app_state) + assert publisher_notifier is not None + + return publisher_notifier + + +def get_notify_publishers( + app_state: AppState = Depends(get_app_state), +) -> Callable[[], None]: + """Provides access to the callback used to notify publishers of changes.""" + publisher_notifier = _publisher_notifier_accessor.get_from(app_state) + assert isinstance(publisher_notifier, PublisherNotifier) + + return publisher_notifier._notify_publishers + + +async def initialize_publisher_notifier(app_state: AppState) -> None: + """Create a new `NotificationClient` and store it on `app_state`. + + Intended to be called just once, when the server starts up. + """ + publisher_notifier: PublisherNotifier = PublisherNotifier() + _publisher_notifier_accessor.set_on(app_state, publisher_notifier) + + await publisher_notifier._initialize() diff --git a/robot-server/robot_server/service/notifications/publishers/__init__.py b/robot-server/robot_server/service/notifications/publishers/__init__.py index 1dcdc43d4a9..59a30e7a135 100644 --- a/robot-server/robot_server/service/notifications/publishers/__init__.py +++ b/robot-server/robot_server/service/notifications/publishers/__init__.py @@ -1,3 +1,8 @@ +"""Publisher creation and management. + +A unique publisher is responsible for each router's related set of endpoints. The publisher conditionally determines +whether a relevant event has occurred, and if true, it publishes an appropriate message to the robot's message broker. +""" from .maintenance_runs_publisher import ( MaintenanceRunsPublisher, get_maintenance_runs_publisher, diff --git a/robot-server/robot_server/service/notifications/topics.py b/robot-server/robot_server/service/notifications/topics.py index 9e3d5fe0ea4..34f2fd0eea1 100644 --- a/robot-server/robot_server/service/notifications/topics.py +++ b/robot-server/robot_server/service/notifications/topics.py @@ -1,3 +1,4 @@ +"""Notification topics.""" from enum import Enum diff --git a/robot-server/tests/maintenance_runs/router/test_base_router.py b/robot-server/tests/maintenance_runs/router/test_base_router.py index 4e2b8b399e5..2f61afcac48 100644 --- a/robot-server/tests/maintenance_runs/router/test_base_router.py +++ b/robot-server/tests/maintenance_runs/router/test_base_router.py @@ -36,6 +36,11 @@ from robot_server.deck_configuration.store import DeckConfigurationStore +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def labware_offset_create() -> LabwareOffsetCreate: """Get a labware offset create request value object.""" @@ -79,6 +84,7 @@ async def test_create_run( created_at=run_created_at, labware_offsets=[labware_offset_create], deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(expected_response) @@ -91,6 +97,7 @@ async def test_create_run( created_at=run_created_at, is_ok_to_create_maintenance_run=True, deck_configuration_store=mock_deck_configuration_store, + notify_publishers=mock_notify_publishers, ) assert result.content.data == expected_response diff --git a/robot-server/tests/maintenance_runs/test_engine_store.py b/robot-server/tests/maintenance_runs/test_engine_store.py index d0a3ccfc1c8..15855ab48d1 100644 --- a/robot-server/tests/maintenance_runs/test_engine_store.py +++ b/robot-server/tests/maintenance_runs/test_engine_store.py @@ -24,6 +24,11 @@ ) +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def subject(decoy: Decoy) -> MaintenanceEngineStore: """Get a MaintenanceEngineStore test subject.""" @@ -42,7 +47,10 @@ def subject(decoy: Decoy) -> MaintenanceEngineStore: async def test_create_engine(subject: MaintenanceEngineStore) -> None: """It should create an engine for a run.""" result = await subject.create( - run_id="run-id", labware_offsets=[], created_at=datetime(2023, 1, 1) + run_id="run-id", + labware_offsets=[], + created_at=datetime(2023, 1, 1), + notify_publishers=mock_notify_publishers, ) assert subject.current_run_id == "run-id" @@ -67,7 +75,10 @@ async def test_create_engine_uses_robot_and_deck_type( ) await subject.create( - run_id="run-id", labware_offsets=[], created_at=datetime(2023, 4, 1) + run_id="run-id", + labware_offsets=[], + created_at=datetime(2023, 4, 1), + notify_publishers=mock_notify_publishers, ) assert subject.engine.state_view.config.robot_type == robot_type @@ -88,6 +99,7 @@ async def test_create_engine_with_labware_offsets( run_id="run-id", labware_offsets=[labware_offset], created_at=datetime(2023, 1, 1), + notify_publishers=mock_notify_publishers, ) assert result.labwareOffsets == [ @@ -104,7 +116,10 @@ async def test_create_engine_with_labware_offsets( async def test_clear_engine(subject: MaintenanceEngineStore) -> None: """It should clear a stored engine entry.""" await subject.create( - run_id="run-id", labware_offsets=[], created_at=datetime(2023, 5, 1) + run_id="run-id", + labware_offsets=[], + created_at=datetime(2023, 5, 1), + notify_publishers=mock_notify_publishers, ) await subject.runner.run(deck_configuration=[]) result = await subject.clear() @@ -124,7 +139,10 @@ async def test_clear_engine_not_stopped_or_idle( ) -> None: """It should raise a conflict if the engine is not stopped.""" await subject.create( - run_id="run-id", labware_offsets=[], created_at=datetime(2023, 6, 1) + run_id="run-id", + labware_offsets=[], + created_at=datetime(2023, 6, 1), + notify_publishers=mock_notify_publishers, ) subject.runner.play() @@ -135,7 +153,10 @@ async def test_clear_engine_not_stopped_or_idle( async def test_clear_idle_engine(subject: MaintenanceEngineStore) -> None: """It should successfully clear engine if idle (not started).""" await subject.create( - run_id="run-id", labware_offsets=[], created_at=datetime(2023, 7, 1) + run_id="run-id", + labware_offsets=[], + created_at=datetime(2023, 7, 1), + notify_publishers=mock_notify_publishers, ) assert subject.engine is not None assert subject.runner is not None diff --git a/robot-server/tests/maintenance_runs/test_run_data_manager.py b/robot-server/tests/maintenance_runs/test_run_data_manager.py index f0e63809d68..0046b3098db 100644 --- a/robot-server/tests/maintenance_runs/test_run_data_manager.py +++ b/robot-server/tests/maintenance_runs/test_run_data_manager.py @@ -35,6 +35,11 @@ from opentrons.protocol_engine import Liquid +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def mock_maintenance_engine_store(decoy: Decoy) -> MaintenanceEngineStore: """Get a mock MaintenanceEngineStore.""" @@ -104,6 +109,7 @@ async def test_create( labware_offsets=[], created_at=created_at, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(engine_state_summary) decoy.when(mock_maintenance_engine_store.current_run_created_at).then_return( @@ -114,6 +120,7 @@ async def test_create( created_at=created_at, labware_offsets=[], deck_configuration=[], + notify_publishers=mock_notify_publishers, ) assert result == MaintenanceRun( @@ -153,6 +160,7 @@ async def test_create_with_options( labware_offsets=[labware_offset], created_at=created_at, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(engine_state_summary) decoy.when(mock_maintenance_engine_store.current_run_created_at).then_return( @@ -164,6 +172,7 @@ async def test_create_with_options( created_at=created_at, labware_offsets=[labware_offset], deck_configuration=[], + notify_publishers=mock_notify_publishers, ) assert result == MaintenanceRun( @@ -196,6 +205,7 @@ async def test_create_engine_error( labware_offsets=[], created_at=created_at, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_raise(EngineConflictError("oh no")) decoy.when(mock_maintenance_engine_store.current_run_created_at).then_return( @@ -208,6 +218,7 @@ async def test_create_engine_error( created_at=created_at, labware_offsets=[], deck_configuration=[], + notify_publishers=mock_notify_publishers, ) diff --git a/robot-server/tests/runs/router/test_base_router.py b/robot-server/tests/runs/router/test_base_router.py index 1fd754f224a..5c772e14be7 100644 --- a/robot-server/tests/runs/router/test_base_router.py +++ b/robot-server/tests/runs/router/test_base_router.py @@ -42,6 +42,11 @@ from robot_server.deck_configuration.store import DeckConfigurationStore +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def labware_offset_create() -> LabwareOffsetCreate: """Get a labware offset create request value object.""" @@ -87,6 +92,7 @@ async def test_create_run( labware_offsets=[labware_offset_create], deck_configuration=[], protocol=None, + notify_publishers=mock_notify_publishers, ) ).then_return(expected_response) @@ -99,6 +105,7 @@ async def test_create_run( created_at=run_created_at, run_auto_deleter=mock_run_auto_deleter, deck_configuration_store=mock_deck_configuration_store, + notify_publishers=mock_notify_publishers, ) assert result.content.data == expected_response @@ -162,6 +169,7 @@ async def test_create_protocol_run( labware_offsets=[], deck_configuration=[], protocol=protocol_resource, + notify_publishers=mock_notify_publishers, ) ).then_return(expected_response) @@ -173,6 +181,7 @@ async def test_create_protocol_run( created_at=run_created_at, run_auto_deleter=mock_run_auto_deleter, deck_configuration_store=mock_deck_configuration_store, + notify_publishers=mock_notify_publishers, ) assert result.content.data == expected_response @@ -223,6 +232,7 @@ async def test_create_run_conflict( labware_offsets=[], deck_configuration=[], protocol=None, + notify_publishers=mock_notify_publishers, ) ).then_raise(EngineConflictError("oh no")) @@ -234,6 +244,7 @@ async def test_create_run_conflict( run_data_manager=mock_run_data_manager, run_auto_deleter=mock_run_auto_deleter, deck_configuration_store=mock_deck_configuration_store, + notify_publishers=mock_notify_publishers, ) assert exc_info.value.status_code == 409 diff --git a/robot-server/tests/runs/test_engine_store.py b/robot-server/tests/runs/test_engine_store.py index 1bf74632139..7a1f79b903a 100644 --- a/robot-server/tests/runs/test_engine_store.py +++ b/robot-server/tests/runs/test_engine_store.py @@ -27,6 +27,11 @@ ) +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def subject(decoy: Decoy, hardware_api: HardwareControlAPI) -> EngineStore: """Get a EngineStore test subject.""" @@ -51,7 +56,11 @@ async def json_protocol_source(tmp_path: Path) -> ProtocolSource: async def test_create_engine(subject: EngineStore) -> None: """It should create an engine for a run.""" result = await subject.create( - run_id="run-id", labware_offsets=[], protocol=None, deck_configuration=[] + run_id="run-id", + labware_offsets=[], + protocol=None, + deck_configuration=[], + notify_publishers=mock_notify_publishers, ) assert subject.current_run_id == "run-id" @@ -82,6 +91,7 @@ async def test_create_engine_with_protocol( labware_offsets=[], deck_configuration=[], protocol=protocol, + notify_publishers=mock_notify_publishers, ) assert subject.current_run_id == "run-id" assert isinstance(result, StateSummary) @@ -103,7 +113,11 @@ async def test_create_engine_uses_robot_type( ) await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) assert subject.engine.state_view.config.robot_type == robot_type @@ -122,6 +136,7 @@ async def test_create_engine_with_labware_offsets(subject: EngineStore) -> None: labware_offsets=[labware_offset], deck_configuration=[], protocol=None, + notify_publishers=mock_notify_publishers, ) assert result.labwareOffsets == [ @@ -138,12 +153,20 @@ async def test_create_engine_with_labware_offsets(subject: EngineStore) -> None: async def test_archives_state_if_engine_already_exists(subject: EngineStore) -> None: """It should not create more than one engine / runner pair.""" await subject.create( - run_id="run-id-1", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id-1", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) with pytest.raises(EngineConflictError): await subject.create( - run_id="run-id-2", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id-2", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) assert subject.current_run_id == "run-id-1" @@ -152,7 +175,11 @@ async def test_archives_state_if_engine_already_exists(subject: EngineStore) -> async def test_clear_engine(subject: EngineStore) -> None: """It should clear a stored engine entry.""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) await subject.runner.run(deck_configuration=[]) result = await subject.clear() @@ -172,7 +199,11 @@ async def test_clear_engine_not_stopped_or_idle( ) -> None: """It should raise a conflict if the engine is not stopped.""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) subject.runner.play(deck_configuration=[]) @@ -183,7 +214,11 @@ async def test_clear_engine_not_stopped_or_idle( async def test_clear_idle_engine(subject: EngineStore) -> None: """It should successfully clear engine if idle (not started).""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) assert subject.engine is not None assert subject.runner is not None @@ -216,7 +251,9 @@ async def test_get_default_engine_robot_type( # should pass in some sort of actual, valid HardwareAPI instead of a mock hardware_api = decoy.mock(cls=API) subject = EngineStore( - hardware_api=hardware_api, robot_type=robot_type, deck_type=deck_type + hardware_api=hardware_api, + robot_type=robot_type, + deck_type=deck_type, ) result = await subject.get_default_engine() @@ -227,7 +264,11 @@ async def test_get_default_engine_robot_type( async def test_get_default_engine_current_unstarted(subject: EngineStore) -> None: """It should allow a default engine if another engine current but unstarted.""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) result = await subject.get_default_engine() @@ -237,7 +278,11 @@ async def test_get_default_engine_current_unstarted(subject: EngineStore) -> Non async def test_get_default_engine_conflict(subject: EngineStore) -> None: """It should not allow a default engine if another engine is executing commands.""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) subject.engine.play() @@ -248,7 +293,11 @@ async def test_get_default_engine_conflict(subject: EngineStore) -> None: async def test_get_default_engine_run_stopped(subject: EngineStore) -> None: """It allow a default engine if another engine is terminal.""" await subject.create( - run_id="run-id", labware_offsets=[], deck_configuration=[], protocol=None + run_id="run-id", + labware_offsets=[], + deck_configuration=[], + protocol=None, + notify_publishers=mock_notify_publishers, ) await subject.engine.finish() diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index 92152eb3940..bac302e3065 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -40,6 +40,11 @@ from opentrons_shared_data.errors.exceptions import InvalidStoredData +def mock_notify_publishers() -> None: + """A mock notify_publishers.""" + return None + + @pytest.fixture def mock_engine_store(decoy: Decoy) -> EngineStore: """Get a mock EngineStore.""" @@ -138,6 +143,7 @@ async def test_create( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(engine_state_summary) decoy.when( @@ -154,6 +160,7 @@ async def test_create( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) assert result == Run( @@ -203,6 +210,7 @@ async def test_create_with_options( labware_offsets=[labware_offset], protocol=protocol, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(engine_state_summary) @@ -220,6 +228,7 @@ async def test_create_with_options( labware_offsets=[labware_offset], protocol=protocol, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) assert result == Run( @@ -254,6 +263,7 @@ async def test_create_engine_error( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_raise(EngineConflictError("oh no")) @@ -264,6 +274,7 @@ async def test_create_engine_error( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) decoy.verify( @@ -640,6 +651,7 @@ async def test_create_archives_existing( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) ).then_return(engine_state_summary) @@ -657,6 +669,7 @@ async def test_create_archives_existing( labware_offsets=[], protocol=None, deck_configuration=[], + notify_publishers=mock_notify_publishers, ) decoy.verify( diff --git a/robot-server/tests/service/notifications/__init__.py b/robot-server/tests/service/notifications/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/robot-server/tests/service/notifications/test_change_notifier.py b/robot-server/tests/service/notifications/test_change_notifier.py new file mode 100644 index 00000000000..4967e6d254e --- /dev/null +++ b/robot-server/tests/service/notifications/test_change_notifier.py @@ -0,0 +1,56 @@ +"""Tests for the ChangeNotifier interface.""" +import asyncio +import pytest +from opentrons.protocol_engine.state.change_notifier import ChangeNotifier + + +async def test_single_subscriber() -> None: + """Test that a single subscriber can wait for a notification.""" + subject = ChangeNotifier() + result = asyncio.create_task(subject.wait()) + + # ensure that the wait actually waits by delaying and + # checking that the task has not resolved + await asyncio.sleep(0.1) + assert result.done() is False + + asyncio.get_running_loop().call_soon(subject.notify) + + await result + + +@pytest.mark.parametrize("_test_repetition", range(10)) +async def test_multiple_subscribers(_test_repetition: int) -> None: + """Test that multiple subscribers can wait for a notification. + + This test checks that the subscribers are awoken in the order they + subscribed. This may or may not be guarenteed according to the + implementations of both ChangeNotifier and the event loop. + This test functions as a canary, given that our code may relies + on this ordering for determinism. + + This test runs multiple times to check for flakyness. + """ + subject = ChangeNotifier() + results = [] + + async def _do_task_1() -> None: + await subject.wait() + results.append(1) + + async def _do_task_2() -> None: + await subject.wait() + results.append(2) + + async def _do_task_3() -> None: + await subject.wait() + results.append(3) + + task_1 = asyncio.create_task(_do_task_1()) + task_2 = asyncio.create_task(_do_task_2()) + task_3 = asyncio.create_task(_do_task_3()) + + asyncio.get_running_loop().call_soon(subject.notify) + await asyncio.gather(task_1, task_2, task_3) + + assert results == [1, 2, 3] diff --git a/robot-server/tests/service/notifications/test_publisher_notifier.py b/robot-server/tests/service/notifications/test_publisher_notifier.py new file mode 100644 index 00000000000..125cfdd1806 --- /dev/null +++ b/robot-server/tests/service/notifications/test_publisher_notifier.py @@ -0,0 +1,74 @@ +import asyncio +from unittest.mock import Mock, MagicMock + +from robot_server.service.notifications import ( + PublisherNotifier, + ChangeNotifier, +) + + +async def test_initialize() -> None: + """It should create a new task.""" + publisher_notifier = PublisherNotifier() + + await publisher_notifier._initialize() + + assert asyncio.get_running_loop() + + +def test_notify_publishers() -> None: + """Invoke the change notifier's notify method.""" + change_notifier = MagicMock() + publisher_notifier = PublisherNotifier(change_notifier) + + publisher_notifier._notify_publishers() + + change_notifier.notify.assert_called_once() + + +def test_register_publish_callbacks() -> None: + """It should extend the list of callbacks within a given list of callbacks.""" + publisher_notifier = PublisherNotifier() + callback1 = Mock() + callback2 = Mock() + + publisher_notifier.register_publish_callbacks([callback1, callback2]) + + assert len(publisher_notifier._callbacks) == 2 + assert publisher_notifier._callbacks[0] == callback1 + assert publisher_notifier._callbacks[1] == callback2 + + +async def test_wait_for_event() -> None: + """It should wait for an event to occur, then invoke each callback.""" + change_notifier = ChangeNotifier() + publisher_notifier = PublisherNotifier(change_notifier) + + callback_called = False + callback_2_called = False + + async def callback() -> None: + """Mock callback.""" + nonlocal callback_called + callback_called = True + + async def callback_2() -> None: + """Mock callback.""" + nonlocal callback_2_called + callback_2_called = True + + publisher_notifier.register_publish_callbacks([callback, callback_2]) + + async def trigger_callbacks() -> None: + """Mock trigger for callbacks.""" + await asyncio.sleep(0.1) + change_notifier.notify() + + task = asyncio.create_task(publisher_notifier._initialize()) + + await asyncio.gather(trigger_callbacks(), task) + + assert callback_called + assert callback_2_called + + task.cancel()