-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API: add module support to hardware control #2423
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
import re | ||
from typing import List, Optional, Tuple | ||
|
||
from .mod_abc import AbstractModule | ||
# Must import tempdeck and magdeck (and other modules going forward) so they | ||
# actually create the subclasses | ||
from . import update, tempdeck, magdeck # noqa(W0611) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class UnsupportedModuleError(Exception): | ||
pass | ||
|
||
|
||
class AbsentModuleError(Exception): | ||
pass | ||
|
||
|
||
# mypy isn’t quite expressive enough to handle what we’re doing here, which | ||
# is get all the class objects that are subclasses of an abstract module | ||
# (strike 1) and call a classmethod on them (strike 2) and actually store | ||
# the class objects (strike 3). So, type: ignore | ||
MODULE_TYPES = {cls.name(): cls | ||
for cls in AbstractModule.__subclasses__()} # type: ignore | ||
|
||
|
||
def build(port: str, which: str, simulate: bool) -> AbstractModule: | ||
return MODULE_TYPES[which].build(port, simulate) | ||
|
||
|
||
def discover() -> List[Tuple[str, str]]: | ||
""" Scan for connected modules and instantiate handler classes | ||
""" | ||
if os.environ.get('RUNNING_ON_PI') and os.path.isdir('/dev/modules'): | ||
devices = os.listdir('/dev/modules') | ||
else: | ||
devices = [] | ||
|
||
discovered_modules = [] | ||
|
||
module_port_regex = re.compile('|'.join(MODULE_TYPES.keys()), re.I) | ||
for port in devices: | ||
match = module_port_regex.search(port) | ||
if match: | ||
name = match.group().lower() | ||
if name not in MODULE_TYPES: | ||
log.warning("Unexpected module connected: {} on {}" | ||
.format(name, port)) | ||
continue | ||
absolute_port = '/dev/modules/{}'.format(port) | ||
discovered_modules.append((absolute_port, name)) | ||
log.info('Discovered modules: {}'.format(discovered_modules)) | ||
|
||
return discovered_modules | ||
|
||
|
||
class UpdateError(RuntimeError): | ||
def __init__(self, msg): | ||
self.msg = msg | ||
|
||
|
||
async def update_firmware( | ||
module: AbstractModule, | ||
firmware_file: str, | ||
loop: Optional[asyncio.AbstractEventLoop]) -> AbstractModule: | ||
""" Update a module. | ||
|
||
If the update succeeds, an Module instance will be returned. | ||
|
||
Otherwise, raises an UpdateError with the reason for the failure. | ||
""" | ||
simulated = module.is_simulated | ||
cls = type(module) | ||
old_port = module.port | ||
flash_port = await module.prep_for_update() | ||
del module | ||
after_port, results = await update.update_firmware(flash_port, | ||
firmware_file, | ||
loop) | ||
await asyncio.sleep(1.0) | ||
new_port = after_port or old_port | ||
if not results[0]: | ||
raise UpdateError(results[1]) | ||
return cls.build(new_port, simulated) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
from opentrons.drivers.mag_deck import MagDeck as MagDeckDriver | ||
from . import update, mod_abc | ||
|
||
LABWARE_ENGAGE_HEIGHT = {'biorad-hardshell-96-PCR': 18} # mm | ||
MAX_ENGAGE_HEIGHT = 45 # mm from home position | ||
|
||
|
||
class MissingDevicePortError(Exception): | ||
pass | ||
|
||
|
||
class SimulatingDriver: | ||
def __init__(self): | ||
self._port = None | ||
|
||
def probe_plate(self): | ||
pass | ||
|
||
def home(self): | ||
pass | ||
|
||
def move(self, location): | ||
pass | ||
|
||
def get_device_info(self): | ||
return {'serial': 'dummySerial', | ||
'model': 'dummyModel', | ||
'version': 'dummyVersion'} | ||
|
||
def connect(self, port): | ||
pass | ||
|
||
def disconnect(self): | ||
pass | ||
|
||
def enter_programming_mode(self): | ||
pass | ||
|
||
|
||
class MagDeck(mod_abc.AbstractModule): | ||
""" | ||
Under development. API subject to change | ||
""" | ||
@classmethod | ||
def build(cls, port, simulating=False): | ||
mod = cls(port, simulating) | ||
mod._connect() | ||
return mod | ||
|
||
@classmethod | ||
def name(cls) -> str: | ||
return 'magdeck' | ||
|
||
def __init__(self, port, simulating): | ||
self._engaged = False | ||
self._port = port | ||
if simulating: | ||
self._driver = SimulatingDriver() | ||
else: | ||
self._driver = MagDeckDriver() | ||
self._device_info = None | ||
|
||
def calibrate(self): | ||
""" | ||
Calibration involves probing for top plate to get the plate height | ||
""" | ||
self._driver.probe_plate() | ||
# return if successful or not? | ||
self._engaged = False | ||
|
||
def engage(self, height): | ||
""" | ||
Move the magnet to a specific height, in mm from home position | ||
""" | ||
if height > MAX_ENGAGE_HEIGHT or height < 0: | ||
raise ValueError('Invalid engage height. Should be 0 to {}'.format( | ||
MAX_ENGAGE_HEIGHT)) | ||
self._driver.move(height) | ||
self._engaged = True | ||
|
||
def disengage(self): | ||
""" | ||
Home the magnet | ||
""" | ||
self._driver.home() | ||
self._engaged = False | ||
|
||
@property | ||
def device_info(self): | ||
""" | ||
Returns a dict: | ||
{ 'serial': 'abc123', 'model': '8675309', 'version': '9001' } | ||
""" | ||
return self._device_info | ||
|
||
@property | ||
def status(self): | ||
return 'engaged' if self._engaged else 'disengaged' | ||
|
||
@property | ||
def live_data(self): | ||
return { | ||
'status': self.status, | ||
'data': {} | ||
} | ||
|
||
@property | ||
def port(self): | ||
return self._port | ||
|
||
@property | ||
def is_simulated(self): | ||
return isinstance(self._driver, SimulatingDriver) | ||
|
||
# Internal Methods | ||
|
||
def _connect(self): | ||
""" | ||
Connect to the serial port | ||
""" | ||
self._driver.connect(self._port) | ||
self._device_info = self._driver.get_device_info() | ||
|
||
def _disconnect(self): | ||
""" | ||
Disconnect from the serial port | ||
""" | ||
if self._driver: | ||
self._driver.disconnect() | ||
|
||
def __del__(self): | ||
self._disconnect() | ||
|
||
async def prep_for_update(self) -> str: | ||
new_port = await update.enter_bootloader(self._driver, | ||
self.device_info['model']) | ||
return new_port or self.port |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this only return the output of modules.discover()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the code's a little annoyingly multilayered here. The module object caching code (and therefore module object creation code) wants to live in
hardware_control/__init__.py:API
, and the actual work of discovering modules lives inmodules/__init__.py
and shouldn't be used if we're simulating, so the backend really only either a) returns the preconfigured list of modules (assimulator.py
does) or just returns the output ofmodules.discover()
(as this does).