diff --git a/environment.yaml b/environment.yaml index 9dd8c4225..fdacde534 100644 --- a/environment.yaml +++ b/environment.yaml @@ -3,6 +3,7 @@ channels: dependencies: - astroplan - astropy + - astroquery - docopt - fastapi - google-cloud-firestore diff --git a/setup.cfg b/setup.cfg index 6f19fc9e5..ea76acfae 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,12 +36,15 @@ install_requires = importlib-metadata; python_version<"3.8" astroplan astropy + astroquery certifi>=2023.7.22 fastapi<0.106.0 fastapi-utils + human-readable numpy>=1.24 panoptes-utils[config]>=0.2.40 pandas + pick Pillow>=10.0.1 pyserial requests>=2.31.0 diff --git a/src/panoptes/pocs/utils/cli/mount.py b/src/panoptes/pocs/utils/cli/mount.py index c9f0bf825..8e9085c2f 100644 --- a/src/panoptes/pocs/utils/cli/mount.py +++ b/src/panoptes/pocs/utils/cli/mount.py @@ -3,14 +3,22 @@ import serial import typer +from astropy import units as u +from astropy.coordinates import AltAz, SkyCoord +from astropy.coordinates.name_resolve import NameResolveError +from astroquery.jplhorizons import Horizons +from human_readable import time_delta as friendly_time_delta from panoptes.utils.config.client import set_config from panoptes.utils.rs232 import SerialData from panoptes.utils.serial.device import get_serial_port_info +from panoptes.utils.time import CountdownTimer, current_time +from pick import pick from rich import print from typing_extensions import Annotated from panoptes.pocs.mount import create_mount_from_config from panoptes.pocs.mount.ioptron import MountInfo +from panoptes.pocs.utils.location import create_location_from_config app = typer.Typer() @@ -31,7 +39,7 @@ def park_mount( if not confirm: print('[red]Cancelled.[/red]') return typer.Abort() - + mount = create_mount_from_config() mount.initialize() mount.unpark() @@ -54,19 +62,19 @@ def set_park_position( if not confirm: print('[red]Cancelled.[/red]') return typer.Abort() - + mount = create_mount_from_config() mount.initialize() - + # Confirm that they have previously set the home position. if not typer.confirm('Have you previously set the home position?'): print('Please set the home position before setting the park position by running "pocs mount search-home".') return typer.Exit() - + print(f'The mount will first park at the default position and then ask you to confirm the new park position.') mount.unpark() mount.park() - + # Check if correct side of the pier (i.e. RA axis). if not typer.confirm('Is the mount on the correct side of the pier?'): # Switch the RA axis. @@ -78,7 +86,7 @@ def set_park_position( mount.unpark() mount.slew_to_home(blocking=True) mount.park() - + # Check to make sure cameras are facing down (i.e. Dec axis). if not typer.confirm('Are the cameras facing down?'): # Switch the DEC axis. @@ -90,7 +98,7 @@ def set_park_position( mount.unpark() mount.slew_to_home(blocking=True) mount.park() - + # Double-check the park position. if not typer.confirm('Is the mount parked in the correct position?'): # Give warning and bail out. @@ -119,7 +127,7 @@ def slew_to_home( if not confirm: print('[red]Cancelled.[/red]') return typer.Abort() - + mount = create_mount_from_config() mount.initialize() mount.unpark() @@ -127,6 +135,93 @@ def slew_to_home( mount.disconnect() +@app.command(name='slew-to-target') +def slew_to_target( + confirm: Annotated[bool, typer.Option( + ..., '--confirm', + help='Confirm slew to target.' + )] = False, + target: Annotated[str, typer.Option( + ..., '--target', '-t', + prompt='The name of the target to slew the mount to.', + help='The name of the target to slew the mount to.' + )] = None, + comet: Annotated[bool, typer.Option( + ..., '--comet', + help='Include if you want to search for comet named `target`' + )] = False +): + """Slews the mount target position.""" + # Get the observer location + location = create_location_from_config() + observe_horizon = location.location.get('horizon', 30 * u.deg) + + coords = get_target_coords(target, location.location, is_comet=comet) + if not coords: + print(f'[red]Could not find a suitable target by name or position.[/red]') + return typer.Abort() + + # Check that the target is observable. + is_observable = location.observer.target_is_up(current_time(), coords, horizon=observe_horizon) + if not is_observable: + print(f'[red]Target is not observable[/red]') + return typer.Abort() + + # Get AltAz for coordinates. + alt_az = coords.transform_to(AltAz(location=location.earth_location, obstime=current_time())) + print( + f'Current position: ' + f'\n\tRA/Dec: {coords.ra:5.02f} {coords.dec:+5.02f}' + f'\n\t AltAz: {alt_az.alt:5.02f} {alt_az.az:5.02f}' + ) + + # Show target info for observatory. + target_set_time = location.observer.target_set_time(current_time(), coords, horizon=observe_horizon, which='next') + set_delta = (target_set_time - current_time()).to_datetime() + print(f'Target will be above {observe_horizon}° for {friendly_time_delta(set_delta)}') + + # If not specified on the command line, ask for confirmation. + if not confirm: + print( + '[red]¡ALERT! This command does not do any safety checking for weather, etc. ' + 'Please use with caution.[/red]' + ) + confirm = typer.confirm('Are you sure you want to slew to the target position?') + + if not confirm: + print(f'[red]Dry run, will not move the mount.[/red]') + return typer.Abort() + + # Initialize the mount and slew to the target. + mount = create_mount_from_config() + mount.initialize() + mount.set_target_coordinates(coords) + mount.unpark() + mount.slew_to_target(blocking=True) + + print('[green]Starting to track target, press Ctrl-C to cancel[/green]') + try: + # Show the status every 5 seconds. + timer = CountdownTimer(30) + while mount.is_tracking: + if timer.expired(): + print(f"Coordinates: {mount.status['current_ra']:5.02f} {mount.status['current_dec']:+5.02f}") + timer.restart() + timer.sleep(1) + except KeyboardInterrupt: + print('[red]Tracking interrupted.[/red]') + finally: + option, index = pick(['Home', 'Park', 'Nothing'], 'What would you like to do next?') + if option == 'Home': + print("[green]Moving mount to the home position (don't forget to park!)[/green]") + mount.slew_to_home(blocking=True) + elif option == 'Park': + print('[green]Moving mount to the parking position [/green]') + mount.home_and_park(blocking=True) + + mount.disconnect() + + @app.command(name='search-home') def search_for_home( confirm: Annotated[bool, typer.Option( @@ -143,7 +238,7 @@ def search_for_home( if not confirm: print('[red]Cancelled.[/red]') return typer.Abort() - + mount = create_mount_from_config() mount.initialize() mount.search_for_home() @@ -162,21 +257,22 @@ def setup_mount( if not confirm: print('[red]Cancelled.[/red]') return typer.Abort() - + # Baudrates to check. baudrates = [9600, 115200] - + # Get all the serial ports. ports = get_serial_port_info() - + print(f'Checking on {len(ports)} ports...') + # Loop through all the ports and baudrates. for port in ports: - if 'ttyUSB' not in port.device: + if 'usb' not in port.device.lower(): continue for baudrate in baudrates: print(f"Trying {port.device=} at {baudrate=}...") device = SerialData(port=port.device, baudrate=baudrate, timeout=1) - + try: device.write(':MountInfo#') try: @@ -184,12 +280,12 @@ def setup_mount( except serial.SerialException: print('\tDevice potentially being accessed by another process.') continue - + if re.match(r'\d{4}', response): # iOptron specific mount_type = MountInfo(int(response[0:4])) print(f'Found mount at {port.device=} at {baudrate=} with {response=}.') print(f'It looks like an iOptron {mount_type.name}.') - + # Get the mainboard and handcontroller firmware version. device.write(':FW1#') response = device.read() @@ -198,7 +294,7 @@ def setup_mount( print('Firmware:') print(f'\tMainboard: {mainboard_fw}') print(f'\tHandcontroller: {handcontroller_fw}') - + # Get the RA and DEC firmware version. device.write(':FW2#') response = device.read() @@ -206,12 +302,12 @@ def setup_mount( dec_fw = int(response[6:-1]) print(f'\tRA: {ra_fw}') print(f'\tDEC: {dec_fw}') - + command_set = 'v310' if ra_fw >= 210101 and dec_fw >= 210101 else 'v250' print(f'Suggested command set: {command_set}') - + write_port = port.device - + if typer.confirm('Do you want to make a udev entry?'): print('Creating udev entry for device') # Get info for writing udev entry. @@ -224,22 +320,22 @@ def setup_mount( ) if port.serial_number is not None: udev_str += f'ATTRS{{serial}}=="{port.serial_number}", ' - + # The name we want it known by. udev_str += f'SYMLINK+="ioptron"' - + udev_fn = Path('91-panoptes.rules') with udev_fn.open('w') as f: f.write(udev_str) - + write_port = '/dev/ioptron' - + print(f'Wrote udev entry to [green]{udev_fn}[/green].') print('Run the following command and then reboot for changes to take effect:') print(f'\t[green]cat {udev_fn} | sudo tee /etc/udev/rules.d/{udev_fn}[/green]') except Exception: pass - + # Confirm the user wants to update the config. if typer.confirm('Do you want to update the config?'): print('Updating config.') @@ -249,7 +345,30 @@ def setup_mount( set_config('mount.model', mount_type.name.lower()) set_config('mount.driver', f'panoptes.pocs.mount.ioptron.{mount_type.name.lower()}') set_config('mount.commands_file', f'ioptron/{command_set}') - + return typer.Exit() except serial.SerialTimeoutException: pass + + +def get_target_coords(target: str, location: dict, is_comet: bool = False): + """Get the coordinates of the target. """ + print(f'Looking for coordinates for {target}.') + coords = None + + if is_comet: + location['lat'] = location['latitude'] + location['lon'] = location['longitude'] + obj = Horizons(id=target, id_type='smallbody', epochs=current_time().jd1, location=location) + eph = obj.ephemerides() + coords = SkyCoord(eph[0]['RA'], eph[0]['DEC'], unit=(u.deg, u.deg)) + else: + try: + coords = SkyCoord(target) + except ValueError: + try: + coords = SkyCoord.from_name(target) + except NameResolveError: + pass + + return coords