-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove camera creation from Observatory #612
Changes from 18 commits
85ffef8
bdd105e
7c07fc2
b9a2e1e
0088074
ea99f15
19e81df
8d5a9ac
ed5e115
e56465a
0d5efcb
46d36be
df3d577
81cc3b3
4cf2034
2e8780e
da3f59c
56a1797
d47bcfa
bc8fca3
867cb08
831c04b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,187 @@ | ||
from collections import OrderedDict | ||
import re | ||
import shutil | ||
import subprocess | ||
|
||
from pocs.utils import error | ||
from pocs.utils import load_module | ||
from pocs.utils.config import load_config | ||
|
||
from pocs.camera.camera import AbstractCamera # pragma: no flakes | ||
from pocs.camera.camera import AbstractGPhotoCamera # pragma: no flakes | ||
|
||
from pocs.utils import logger as logger_module | ||
|
||
|
||
def list_connected_cameras(): | ||
"""Detect connected cameras. | ||
|
||
Uses gphoto2 to try and detect which cameras are connected. Cameras should | ||
be known and placed in config but this is a useful utility. | ||
|
||
Returns: | ||
list: A list of the ports with detected cameras. | ||
""" | ||
|
||
gphoto2 = shutil.which('gphoto2') | ||
command = [gphoto2, '--auto-detect'] | ||
result = subprocess.check_output(command) | ||
lines = result.decode('utf-8').split('\n') | ||
|
||
ports = [] | ||
|
||
for line in lines: | ||
camera_match = re.match(r'([\w\d\s_\.]{30})\s(usb:\d{3},\d{3})', line) | ||
if camera_match: | ||
# camera_name = camera_match.group(1).strip() | ||
port = camera_match.group(2).strip() | ||
ports.append(port) | ||
|
||
return ports | ||
|
||
|
||
def create_cameras_from_config(config=None, logger=None, **kwargs): | ||
"""Create camera object(s) based on the config. | ||
|
||
Creates a camera for each camera item listed in the config. Ensures the | ||
appropriate camera module is loaded. | ||
|
||
Note: | ||
This does not actually make a connection to the camera. To do so, | ||
call 'camera.connect()' explicitly on each camera. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is true, yet. All the existing camera classes (and associated focuser classes) call their own On a related note, those calls to |
||
|
||
Args: | ||
**kwargs (dict): Can pass a `cameras` object that overrides the info in | ||
the configuration file. Can also pass `auto_detect`(bool) to try and | ||
automatically discover the ports. | ||
|
||
Returns: | ||
OrderedDict: An ordered dictionary of created camera objects, with the | ||
camera name as key and camera instance as value. Returns an empty | ||
OrderedDict if there is no camera configuration items. | ||
AnthonyHorton marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Raises: | ||
error.CameraNotFound: Raised if camera cannot be found at specified port or if | ||
auto_detect=True and no cameras are found. | ||
error.PanError: Description | ||
""" | ||
if not logger: | ||
logger = logger_module.get_root_logger() | ||
|
||
if not config: | ||
config = load_config(**kwargs) | ||
|
||
# Helper method to first check kwargs then config | ||
def kwargs_or_config(item, default=None): | ||
return kwargs.get(item, config.get(item, default)) | ||
|
||
cameras = OrderedDict() | ||
camera_info = kwargs_or_config('cameras') | ||
if not camera_info: | ||
logger.info('No camera information in config.') | ||
return cameras | ||
|
||
logger.debug("Camera config: {}".format(camera_info)) | ||
|
||
a_simulator = 'camera' in kwargs_or_config('simulator', default=list()) | ||
auto_detect = kwargs_or_config('auto_detect', default=False) | ||
|
||
ports = list() | ||
|
||
# Lookup the connected ports if not using a simulator | ||
if not a_simulator and auto_detect: | ||
logger.debug("Auto-detecting ports for cameras") | ||
try: | ||
ports = list_connected_cameras() | ||
except Exception as e: | ||
logger.warning(e) | ||
|
||
if len(ports) == 0: | ||
raise error.PanError( | ||
msg="No cameras detected. Use --simulator=camera for simulator.") | ||
else: | ||
logger.debug("Detected Ports: {}".format(ports)) | ||
|
||
primary_camera = None | ||
|
||
device_info = camera_info['devices'] | ||
for cam_num, device_config in enumerate(device_info): | ||
cam_name = 'Cam{:02d}'.format(cam_num) | ||
|
||
if not a_simulator: | ||
camera_model = device_config.get('model') | ||
|
||
# Assign an auto-detected port. If none are left, skip | ||
if auto_detect: | ||
try: | ||
camera_port = ports.pop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm perplexed by this. If the config file lists two canon cameras with the prefixes of their serial numbers, how do we know the order in which to pop the ports? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is only if auto_detect is true, which means that it is ignoring the config. Serial number is set when camera object is initialized by lookup (via gphoto2) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah. Then I may have misunderstood how to auto_detect and primary work. I included the serial number prefixes and primary:True in pocs_local.yaml on PAN006. My goal is to make sure that a specific camera is treated as primary every time. And ideally we'd get a warning if a specified camera is not found, or if another camera is found. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think #95 is still true unfortunately. This function could certainly be cleaned up some. Happy to do it here if we want just to make sure it gets done soon but I imagine it will all need to be revisited anyway if we set up separate process controls for the camera. Which hopefully happens soon. :) |
||
except IndexError: | ||
logger.warning( | ||
"No ports left for {}, skipping.".format(cam_name)) | ||
continue | ||
else: | ||
try: | ||
camera_port = device_config['port'] | ||
except KeyError: | ||
raise error.CameraNotFound( | ||
msg="No port specified and auto_detect=False") | ||
|
||
camera_focuser = device_config.get('focuser', None) | ||
camera_readout = device_config.get('readout_time', 6.0) | ||
|
||
else: | ||
logger.debug('Using camera simulator.') | ||
# Set up a simulated camera with fully configured simulated | ||
# focuser | ||
camera_model = 'simulator' | ||
camera_port = '/dev/camera/simulator' | ||
camera_focuser = {'model': 'simulator', | ||
'focus_port': '/dev/ttyFAKE', | ||
'initial_position': 20000, | ||
'autofocus_range': (40, 80), | ||
'autofocus_step': (10, 20), | ||
'autofocus_seconds': 0.1, | ||
'autofocus_size': 500} | ||
camera_readout = 0.5 | ||
|
||
camera_set_point = device_config.get('set_point', None) | ||
camera_filter = device_config.get('filter_type', None) | ||
|
||
logger.debug('Creating camera: {}'.format(camera_model)) | ||
|
||
try: | ||
module = load_module('pocs.camera.{}'.format(camera_model)) | ||
logger.debug('Camera module: {}'.format(module)) | ||
except ImportError: | ||
raise error.CameraNotFound(msg=camera_model) | ||
else: | ||
# Create the camera object | ||
cam = module.Camera(name=cam_name, | ||
model=camera_model, | ||
port=camera_port, | ||
set_point=camera_set_point, | ||
filter_type=camera_filter, | ||
focuser=camera_focuser, | ||
readout_time=camera_readout) | ||
|
||
is_primary = '' | ||
if camera_info.get('primary', '') == cam.uid: | ||
primary_camera = cam | ||
is_primary = ' [Primary]' | ||
|
||
logger.debug("Camera created: {} {} {}".format( | ||
cam.name, cam.uid, is_primary)) | ||
|
||
cameras[cam_name] = cam | ||
|
||
if len(cameras) == 0: | ||
raise error.CameraNotFound( | ||
msg="No cameras available. Exiting.", exit=True) | ||
|
||
# If no camera was specified as primary use the first | ||
if primary_camera is None: | ||
primary_camera = cameras['Cam00'] | ||
|
||
logger.debug("Finished creating cameras from config") | ||
|
||
return cameras |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, while thinking about adding a supervisor and better messaging, and also while reviewing Anthony's distributed cameras PR (which includes a nameserver), I wondered about introducing some kind of device registry. Something we can discuss in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we will definitely want to change how the Observatory interacts with and discovers devices. Hopefully this gets us closer to doing that.