Skip to content

Commit

Permalink
refactor(robot-server, api): Wire up protocol engine event bubbling t…
Browse files Browse the repository at this point in the history
…o robot server (#14766)

Closes EXEC-358

Wire up PE event bubbling to the robot server for notifications as an alternative to the current polling that occurs. There are no functional changes.

PublisherNotifier is the new interface that handles event management for publishers, using a generic ChangeNotifier that is given to PE as a callback. When PE reports a change in state, the callback fires. PublisherNotifier then iterates through each callback, invoking them.

In the future, each publisher that requires access to PE state updates (eg, RunsPublisher) will add relevant callbacks during their initialization via register_publish_callbacks. Each callback will contain the conditional logic required for an MQTT publish to occur.
  • Loading branch information
mjhuff authored Apr 2, 2024
1 parent 2ec93cd commit cf93d9c
Show file tree
Hide file tree
Showing 25 changed files with 427 additions and 30 deletions.
3 changes: 3 additions & 0 deletions api/src/opentrons/protocol_engine/create_protocol_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async def create_protocol_engine(
config: Config,
load_fixed_trash: bool = False,
deck_configuration: typing.Optional[DeckConfigurationType] = None,
notify_publishers: typing.Optional[typing.Callable[[], None]] = None,
) -> ProtocolEngine:
"""Create a ProtocolEngine instance.
Expand All @@ -28,6 +29,7 @@ async def create_protocol_engine(
config: ProtocolEngine configuration.
load_fixed_trash: Automatically load fixed trash labware in engine.
deck_configuration: The initial deck configuration the engine will be instantiated with.
notify_publishers: Notifies robot server publishers of internal state change.
"""
deck_data = DeckDataProvider(config.deck_type)
deck_definition = await deck_data.get_deck_definition()
Expand All @@ -45,6 +47,7 @@ async def create_protocol_engine(
is_door_open=hardware_api.door_state is DoorState.OPEN,
module_calibration_offsets=module_calibration_offsets,
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)

return ProtocolEngine(state_store=state_store, hardware_api=hardware_api)
Expand Down
5 changes: 5 additions & 0 deletions api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(
change_notifier: Optional[ChangeNotifier] = None,
module_calibration_offsets: Optional[Dict[str, ModuleOffsetData]] = None,
deck_configuration: Optional[DeckConfigurationType] = None,
notify_publishers: Optional[Callable[[], None]] = None,
) -> None:
"""Initialize a StateStore and its substores.
Expand All @@ -159,6 +160,7 @@ def __init__(
change_notifier: Internal state change notifier.
module_calibration_offsets: Module offsets to preload.
deck_configuration: The initial deck configuration the addressable area store will be instantiated with.
notify_publishers: Notifies robot server publishers of internal state change.
"""
self._command_store = CommandStore(config=config, is_door_open=is_door_open)
self._pipette_store = PipetteStore()
Expand Down Expand Up @@ -191,6 +193,7 @@ def __init__(
]
self._config = config
self._change_notifier = change_notifier or ChangeNotifier()
self._notify_robot_server = notify_publishers
self._initialize_state()

def handle_action(self, action: Action) -> None:
Expand Down Expand Up @@ -319,3 +322,5 @@ def _update_state_views(self) -> None:
self._liquid._state = next_state.liquids
self._tips._state = next_state.tips
self._change_notifier.notify()
if self._notify_robot_server is not None:
self._notify_robot_server()
4 changes: 2 additions & 2 deletions robot-server/robot_server/app_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)

from .service.notifications import (
initialize_notification_client,
initialize_notifications,
clean_up_notification_client,
)

Expand Down Expand Up @@ -106,7 +106,7 @@ async def on_startup() -> None:
fbl_mark_persistence_init_complete
],
)
initialize_notification_client(
await initialize_notifications(
app_state=app.state,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""In-memory storage of ProtocolEngine instances."""
from datetime import datetime
from typing import List, NamedTuple, Optional
from typing import List, NamedTuple, Optional, Callable

from opentrons.protocol_engine.types import PostRunHardwareState
from opentrons_shared_data.robot.dev_types import RobotType
Expand Down Expand Up @@ -127,6 +127,7 @@ async def create(
run_id: str,
created_at: datetime,
labware_offsets: List[LabwareOffsetCreate],
notify_publishers: Callable[[], None],
deck_configuration: Optional[DeckConfigurationType] = [],
) -> StateSummary:
"""Create and store a ProtocolRunner and ProtocolEngine for a given Run.
Expand All @@ -135,6 +136,7 @@ async def create(
run_id: The run resource the engine is assigned to.
created_at: Run creation datetime
labware_offsets: Labware offsets to create the engine with.
notify_publishers: Utilized by the engine to notify publishers of state changes.
Returns:
The initial equipment and status summary of the engine.
Expand All @@ -154,6 +156,7 @@ async def create(
),
),
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)

# Using LiveRunner as the runner to allow for future refactor of maintenance runs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Manage current maintenance run data."""
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Callable

from opentrons.protocol_engine import (
EngineStatus,
Expand Down Expand Up @@ -83,13 +83,15 @@ async def create(
created_at: datetime,
labware_offsets: List[LabwareOffsetCreate],
deck_configuration: DeckConfigurationType,
notify_publishers: Callable[[], None],
) -> MaintenanceRun:
"""Create a new, current maintenance run.
Args:
run_id: Identifier to assign the new run.
created_at: Creation datetime.
labware_offsets: Labware offsets to initialize the engine with.
notify_publishers: Utilized by the engine to notify publishers of state changes.
Returns:
The run resource.
Expand All @@ -102,6 +104,7 @@ async def create(
created_at=created_at,
labware_offsets=labware_offsets,
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)

maintenance_run_data = _build_run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from datetime import datetime
from textwrap import dedent
from typing import Optional
from typing import Optional, Callable
from typing_extensions import Literal

from fastapi import APIRouter, Depends, status
Expand Down Expand Up @@ -39,6 +39,7 @@
get_deck_configuration_store,
)
from robot_server.deck_configuration.store import DeckConfigurationStore
from robot_server.service.notifications import get_notify_publishers

log = logging.getLogger(__name__)
base_router = APIRouter()
Expand Down Expand Up @@ -155,6 +156,7 @@ async def create_run(
deck_configuration_store: DeckConfigurationStore = Depends(
get_deck_configuration_store
),
notify_publishers: Callable[[], None] = Depends(get_notify_publishers),
) -> PydanticResponse[SimpleBody[MaintenanceRun]]:
"""Create a new maintenance run.
Expand All @@ -166,6 +168,7 @@ async def create_run(
is_ok_to_create_maintenance_run: Verify if a maintenance run may be created if a protocol run exists.
check_estop: Dependency to verify the estop is in a valid state.
deck_configuration_store: Dependency to fetch the deck configuration.
notify_publishers: Utilized by the engine to notify publishers of state changes.
"""
if not is_ok_to_create_maintenance_run:
raise ProtocolRunIsActive(
Expand All @@ -180,6 +183,7 @@ async def create_run(
created_at=created_at,
labware_offsets=offsets,
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)

log.info(f'Created an empty run "{run_id}"".')
Expand Down
5 changes: 4 additions & 1 deletion robot-server/robot_server/runs/engine_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""In-memory storage of ProtocolEngine instances."""
from typing import List, NamedTuple, Optional
from typing import List, NamedTuple, Optional, Callable

from opentrons.protocol_engine.types import PostRunHardwareState
from opentrons_shared_data.robot.dev_types import RobotType
Expand Down Expand Up @@ -152,6 +152,7 @@ async def create(
run_id: str,
labware_offsets: List[LabwareOffsetCreate],
deck_configuration: DeckConfigurationType,
notify_publishers: Callable[[], None],
protocol: Optional[ProtocolResource],
) -> StateSummary:
"""Create and store a ProtocolRunner and ProtocolEngine for a given Run.
Expand All @@ -160,6 +161,7 @@ async def create(
run_id: The run resource the engine is assigned to.
labware_offsets: Labware offsets to create the engine with.
protocol: The protocol to load the runner with, if any.
notify_publishers: Utilized by the engine to notify publishers of state changes.
Returns:
The initial equipment and status summary of the engine.
Expand All @@ -184,6 +186,7 @@ async def create(
),
load_fixed_trash=load_fixed_trash,
deck_configuration=deck_configuration,
notify_publishers=notify_publishers,
)

post_run_hardware_state = PostRunHardwareState.HOME_AND_STAY_ENGAGED
Expand Down
7 changes: 5 additions & 2 deletions robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
from datetime import datetime
from textwrap import dedent
from typing import Optional, Union
from typing import Optional, Union, Callable
from typing_extensions import Literal

from fastapi import APIRouter, Depends, status, Query
Expand Down Expand Up @@ -45,7 +45,7 @@
get_deck_configuration_store,
)
from robot_server.deck_configuration.store import DeckConfigurationStore

from robot_server.service.notifications import get_notify_publishers

log = logging.getLogger(__name__)
base_router = APIRouter()
Expand Down Expand Up @@ -144,6 +144,7 @@ async def create_run(
deck_configuration_store: DeckConfigurationStore = Depends(
get_deck_configuration_store
),
notify_publishers: Callable[[], None] = Depends(get_notify_publishers),
) -> PydanticResponse[SimpleBody[Union[Run, BadRun]]]:
"""Create a new run.
Expand All @@ -157,6 +158,7 @@ async def create_run(
the new run.
check_estop: Dependency to verify the estop is in a valid state.
deck_configuration_store: Dependency to fetch the deck configuration.
notify_publishers: Utilized by the engine to notify publishers of state changes.
"""
protocol_id = request_body.data.protocolId if request_body is not None else None
offsets = request_body.data.labwareOffsets if request_body is not None else []
Expand Down Expand Up @@ -184,6 +186,7 @@ async def create_run(
labware_offsets=offsets,
deck_configuration=deck_configuration,
protocol=protocol_resource,
notify_publishers=notify_publishers,
)
except EngineConflictError as e:
raise RunAlreadyActive(detail=str(e)).as_error(status.HTTP_409_CONFLICT) from e
Expand Down
5 changes: 4 additions & 1 deletion robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Manage current and historical run data."""
from datetime import datetime
from typing import List, Optional, Union
from typing import List, Optional, Callable, Union

from opentrons_shared_data.labware.labware_definition import LabwareDefinition
from opentrons_shared_data.errors.exceptions import InvalidStoredData, EnumeratedError
Expand Down Expand Up @@ -142,6 +142,7 @@ async def create(
created_at: datetime,
labware_offsets: List[LabwareOffsetCreate],
deck_configuration: DeckConfigurationType,
notify_publishers: Callable[[], None],
protocol: Optional[ProtocolResource],
) -> Union[Run, BadRun]:
"""Create a new, current run.
Expand All @@ -150,6 +151,7 @@ async def create(
run_id: Identifier to assign the new run.
created_at: Creation datetime.
labware_offsets: Labware offsets to initialize the engine with.
notify_publishers: Utilized by the engine to notify publishers of state changes.
Returns:
The run resource.
Expand All @@ -171,6 +173,7 @@ async def create(
labware_offsets=labware_offsets,
deck_configuration=deck_configuration,
protocol=protocol,
notify_publishers=notify_publishers,
)
run_resource = self._run_store.insert(
run_id=run_id,
Expand Down
12 changes: 10 additions & 2 deletions robot-server/robot_server/service/notifications/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
"""Notification service creation and management."""
from .initialize_notifications import initialize_notifications

from .notification_client import (
NotificationClient,
get_notification_client,
initialize_notification_client,
clean_up_notification_client,
)
from .publisher_notifier import PublisherNotifier, get_notify_publishers
from .publishers import (
MaintenanceRunsPublisher,
RunsPublisher,
get_maintenance_runs_publisher,
get_runs_publisher,
)
from .change_notifier import ChangeNotifier

__all__ = [
# main export
Expand All @@ -18,10 +22,14 @@
"MaintenanceRunsPublisher",
"RunsPublisher",
# initialization and teardown
"initialize_notification_client",
"initialize_notifications",
"clean_up_notification_client",
# for use by FastAPI
"get_notification_client",
"get_notify_publishers",
"get_maintenance_runs_publisher",
"get_runs_publisher",
# for testing
"PublisherNotifier",
"ChangeNotifier",
]
23 changes: 23 additions & 0 deletions robot-server/robot_server/service/notifications/change_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Simple state change notification interface."""
import asyncio


class ChangeNotifier:
"""An interface to emit or subscribe to state change notifications."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier with an internal Event."""
self._event = asyncio.Event()

def notify(self) -> None:
"""Notify all `waiters` of a change."""
self._event.set()

async def wait(self) -> None:
"""Wait until the next change notification."""
self._event.clear()
await self._event.wait()

def clear(self) -> None:
"""Reset the internal event flag."""
self._event.clear()
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""Utilities for initializing the notification service."""
from server_utils.fastapi_utils.app_state import AppState

from .notification_client import initialize_notification_client
from .publisher_notifier import initialize_publisher_notifier


async def initialize_notifications(app_state: AppState) -> None:
"""Initialize the notification system for the given app state."""
initialize_notification_client(app_state)
await initialize_publisher_notifier(app_state)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""An interface for managing interactions with the notification broker and relevant lifecycle utilities."""
import random
import logging
import paho.mqtt.client as mqtt
Expand Down Expand Up @@ -208,7 +209,5 @@ def get_notification_client(
app_state: AppState = Depends(get_app_state),
) -> Optional[NotificationClient]:
"""Intended to be used by endpoint functions as a FastAPI dependency."""
notification_client: Optional[
NotificationClient
] = _notification_client_accessor.get_from(app_state)
notification_client = _notification_client_accessor.get_from(app_state)
return notification_client
Loading

0 comments on commit cf93d9c

Please sign in to comment.