Skip to content

Commit

Permalink
added custom errors module
Browse files Browse the repository at this point in the history
added logging module and removed prints
exposed context add/remove to IPCMessenger
added IPCMessenger to OT3API class
  • Loading branch information
vegano1 committed Oct 30, 2023
1 parent 73f5998 commit a2728b5
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 82 deletions.
10 changes: 10 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
Mapping,
Awaitable,
)

from ipc_messenger import IPCProcess, IPCMessenger, ipc_dispatcher, JSONRPCDispatcher
from opentrons.hardware_control.modules.module_calibration import (
ModuleCalibrationOffset,
)
Expand Down Expand Up @@ -231,6 +233,10 @@ def __init__(
self._backend = backend
self._loop = loop

# initialize the ipc messenger and register self context
self._ipc_messenger = IPCMessenger(IPCProcess.HARDWARE, 'localhost', 4000, ipc_dispatcher)
self._ipc_messenger.add_context('self', self)

def estop_cb(event: HardwareEvent) -> None:
self._update_estop_state(event)

Expand Down Expand Up @@ -397,6 +403,7 @@ async def build_hardware_controller(
api_instance._update_door_state(door_state)
backend.add_door_state_listener(api_instance._update_door_state)
checked_loop.create_task(backend.watch(loop=checked_loop))
checked_loop.create_task(api_instance._ipc_messenger.start())
backend.initialized = True
await api_instance.refresh_positions()
return api_instance
Expand Down Expand Up @@ -443,6 +450,7 @@ async def build_hardware_simulator(
)
backend.module_controls = module_controls
await backend.watch(api_instance.loop)
checked_loop.create_task(api_instance._ipc_messenger.start())
await api_instance.refresh_positions()
return api_instance

Expand Down Expand Up @@ -517,6 +525,7 @@ async def set_lights(
"""Control the robot lights."""
await self._backend.set_lights(button, rails)

@ipc_dispatcher.add_method(context_arg='self')
async def get_lights(self) -> Dict[str, bool]:
"""Return the current status of the robot lights."""
return await self._backend.get_lights()
Expand Down Expand Up @@ -614,6 +623,7 @@ async def cache_gripper(self, instrument_data: AttachedGripper) -> bool:
self._gripper_handler.gripper = g
return skipped

@ipc_dispatcher.add_method(context_arg='self')
def get_all_attached_instr(self) -> Dict[OT3Mount, Optional[InstrumentDict]]:
# NOTE (spp, 2023-03-07): The return type of this method indicates that
# if a particular mount has no attached instrument then it will provide a
Expand Down
4 changes: 2 additions & 2 deletions ipc-messenger/ipc_messenger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .manager import JSONRPCResponseManager
from .messenger import IPCMessenger
from .constants import (
Process,
IPCProcess,
Destinations,
DESTINATION_PORT,
)
Expand All @@ -14,7 +14,7 @@
"JSONRPCDispatcher",
"JSONRPCResponseManager",
"IPCMessenger",
"Process",
"IPCProcess",
"Destinations",
"DESTINATION_PORT",
]
12 changes: 6 additions & 6 deletions ipc-messenger/ipc_messenger/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ def __repr__(self) -> str:
return f"<{self.__class__.__name__}: id={self._id} result={self.result}>"


class Process(Enum):
"""Processes we can talk to over ipc."""
class IPCProcess(Enum):
"""IPCProcesses we can talk to over ipc."""
HARDWARE = "hardware"
ROBOT_SERVER = "robot_server"
SYSTEM_SERVER = "system_server"


DESTINATION_PORT = {
Process.HARDWARE: 4000,
Process.ROBOT_SERVER: 4001,
Process.SYSTEM_SERVER: 4002,
IPCProcess.HARDWARE: 4000,
IPCProcess.ROBOT_SERVER: 4001,
IPCProcess.SYSTEM_SERVER: 4002,
}

Destinations = List[Process]
Destinations = List[IPCProcess]
18 changes: 12 additions & 6 deletions ipc-messenger/ipc_messenger/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from typing import Callable, Any
from asyncio import iscoroutine

from jsonrpc.dispatcher import Dispatcher #type: ignore[import]

from .errors import InvalidCoroutineFunction

class JSONRPCDispatcher(Dispatcher): #type: ignore
def __init__(self, *args: str, **kwargs: int ) -> None:
def __init__(self, *args: str, context=None, **kwargs: int) -> None:
"""Constructor"""
super().__init__(*args, **kwargs)
self._context = context
super(JSONRPCDispatcher, self).__init__(*args, **kwargs)

def is_valid(method: Callable[[Any], Any]) -> bool:
"""Returns true if this method can be added."""
return True
def add_method(self, *args, **kwargs) -> None:
# reject non-async methods
#if not iscoroutine(f):
# raise InvalidCoroutineFunction(f.__name__)
super(JSONRPCDispatcher, self).add_method(*args, **kwargs)

ipc_dispatcher: JSONRPCDispatcher = JSONRPCDispatcher()
ipc_dispatcher: JSONRPCDispatcher = Dispatcher()
42 changes: 42 additions & 0 deletions ipc-messenger/ipc_messenger/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Errors and exceptions."""

from jsonrpc.exceptions import JSONRPCError

from typing import Optional


class JSONRPCVersionNotSupported(JSONRPCError):
""" json-rpc version is not supported error. """

CODE = -32605
MESSAGE = "Invalid json-rpc version."


class BaseException(Exception):
"""Base json-rpc exception object."""
def __init__(
self,
method_name: str,
message: Optional[str] = None
) -> None:
"""Constructor."""
self._name = str
self._message = message or ''

def __repr__(self) -> str:
"""String representation of this exception."""
return f"<{self.__class__.__name__}: method: {self._name} message: {self._message}>"


class InvalidCoroutineFunction(BaseException):
"""Error raised when the function being added to the dispatcher is not async."""
def __init__(self, method) -> None:
super().__init__(method, message="Method is not async")


class contextAlreadyRegisteredException(BaseException):
"""Error raised when try to register an already registered context."""
def __init__(self, method) -> None:
super().__init__(method, message="Context arg already registered")


79 changes: 50 additions & 29 deletions ipc-messenger/ipc_messenger/manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""This module is responsible for executing and formulating an rpc response."""

import asyncio
import json
from typing import Any, Optional, AsyncGenerator, List
from typing import Any, Optional, AsyncGenerator, List, Dict

from jsonrpc.jsonrpc2 import JSONRPC20BatchRequest, JSONRPC20BatchResponse
from jsonrpc.utils import is_invalid_params
from jsonrpc.exceptions import (
JSONRPCError,
JSONRPCInvalidRequest,
JSONRPCInvalidRequestException,
JSONRPCParseError,
Expand All @@ -18,24 +17,41 @@

from .constants import JSONRPCRequest, JSONRPCResponse
from .dispatcher import JSONRPCDispatcher

class JSONRPCVersionNotSupported(JSONRPCError):
""" json-rpc version is not supported error. """

CODE = -32605
MESSAGE = "Invalid json-rpc version."
from .errors import JSONRPCVersionNotSupported, contextAlreadyRegisteredException


# we only support json-rpc v2.0
SUPPORTED_JSON_RPC_VERSION = ["2.0"]

class JSONRPCResponseManager:
"""This class is responsible for validating json-rpc messages and executing commands."""
def __init__(self, dispatcher: JSONRPCDispatcher) -> None:
def __init__(self,
dispatcher: JSONRPCDispatcher,
context: Optional[Dict[str, Any]] = None
) -> None:
"""Constructor"""
self._dispatcher = dispatcher

async def handle(self, request_str: str, context: Optional[Any] = None) -> Optional[JSONRPCResponse]:
self._context = context or dict()

@property
def context(self) -> Dict[str, Any]:
"""Dictionary of context args to their objects."""
return self._context

def add_context(self, context_arg: str, context_obj: Any) -> Any:
"""Register a context arg and context object."""
if self._context.get(context_arg):
raise contextAlreadyRegisteredException(context_arg)
self._context[context_arg] = context_obj
return context_obj

def remove_context(self, context_arg: str) -> Optional[Any]:
"""Remove a context arg and object from the dict."""
if self._context.get(context_arg):
return self._context.pop(context_arg)
return None

async def handle(self, request_str: str) -> Optional[JSONRPCResponse]:
"""Validate that this data is a json-rpc message and execute its method, returning a JSONRPCResponse if applicable."""
try:
data = json.loads(request_str)
Expand All @@ -48,21 +64,18 @@ async def handle(self, request_str: str, context: Optional[Any] = None) -> Optio
except JSONRPCInvalidRequestException:
return JSONRPCResponse(error=JSONRPCInvalidRequest()._data)

print(request)
return await self._handle_request(request, context)
return await self._handle_request(request)

async def _handle_request(
self,
request: JSONRPCRequest,
context: Optional[Any] = None
) -> Optional[JSONRPCResponse]:
"""Execute a valid json-rpc request and return a response."""
rs = request if isinstance(request, JSONRPC20BatchRequest) \
else [request]

# lets collect our responses
responses = [resp async for resp in self._get_responses(rs, context)]
print(responses)
responses = [resp async for resp in self._get_responses(rs)]

# dont respond if this is a notification
if not responses:
Expand All @@ -75,42 +88,52 @@ async def _handle_request(
else:
return responses[0]

async def _get_responses(self, requests: List[JSONRPCRequest], context=None
) -> AsyncGenerator[JSONRPCResponse]:
async def _get_responses(
self,
requests: List[JSONRPCRequest],
) -> JSONRPCResponse:
""" Response for each single JSON-RPC Request."""

# response helper
def _make_response(
request: JSONRPCRequest,
**kwargs: int
) -> Optional[JSONRPCResponse]:
# make sure we can serialize this response
response = JSONRPCResponse(_id=request._id, **kwargs)
assert response.json
response.request = request

# only respond if this is not a notify message
return response if not request.is_notification else None

for request in requests:
if request.JSONRPC_VERSION not in SUPPORTED_JSON_RPC_VERSION:
yield _make_response(request, error=JSONRPCVersionNotSupported()._data)
return

# attempt get the method from the dispatcher
try:
method = self._dispatcher[request.method]
except KeyError:
yield _make_response(request, error=JSONRPCMethodNotFound()._data)
return

# get the kwargs if available
# get the args, kwargs, and add context object if available
args = request.args
kwargs = request.kwargs
if context is not None:
context_arg = self._dispatcher.context_arg_for_method.get(
request.method)
if context_arg:
context["request"] = request
kwargs[context_arg] = context
context_arg = self._dispatcher.context_arg_for_method.get(request.method)
context_obj = self._context.get(context_arg)
if context_arg and context_obj:
# add context to the args
args = (context_obj, *args)

# execute the command and get the response
try:
result = await method(*request.args, **kwargs)
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
result = method(*args, **kwargs)
yield _make_response(request, result=result)
except JSONRPCDispatchException as e:
yield _make_response(request, error=e.error._data)
Expand All @@ -121,8 +144,6 @@ def _make_response(
"message": str(e),
}

print("API Exception: {0}".format(data))

if isinstance(e, TypeError) and is_invalid_params(
method, *request.args, **request.kwargs):
yield _make_response(
Expand Down
Loading

0 comments on commit a2728b5

Please sign in to comment.