Skip to content

Commit

Permalink
Adding cli option for slewing the mount to a target. (#1279)
Browse files Browse the repository at this point in the history
* Command line target slewing for mount

* Add a basic command to slew to target.
* Does some basic checking
* Will not power down or do safety check as of now.

* * Ability to specify horizon (although it doesn't change the mount setting)
* Confirm after showing target info.
* Loop while tracking and show status (needs to be improved).

* * Better shutdown timer.

* * Ability to search for comets.

* * Add a warning.

* * Give options when shutting down
  • Loading branch information
wtgee authored Oct 15, 2024
1 parent f2e52c0 commit 6047ffb
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 26 deletions.
1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
dependencies:
- astroplan
- astropy
- astroquery
- docopt
- fastapi
- google-cloud-firestore
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ install_requires =
importlib-metadata; python_version<"3.8"
astroplan
astropy
astroquery
certifi>=2023.7.22
fastapi<0.106.0
fastapi-utils
human-readable
numpy>=1.24
panoptes-utils[config]>=0.2.40
pandas
pick
Pillow>=10.0.1
pyserial
requests>=2.31.0
Expand Down
171 changes: 145 additions & 26 deletions src/panoptes/pocs/utils/cli/mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@

import serial
import typer
from astropy import units as u
from astropy.coordinates import AltAz, SkyCoord
from astropy.coordinates.name_resolve import NameResolveError
from astroquery.jplhorizons import Horizons
from human_readable import time_delta as friendly_time_delta
from panoptes.utils.config.client import set_config
from panoptes.utils.rs232 import SerialData
from panoptes.utils.serial.device import get_serial_port_info
from panoptes.utils.time import CountdownTimer, current_time
from pick import pick
from rich import print
from typing_extensions import Annotated

from panoptes.pocs.mount import create_mount_from_config
from panoptes.pocs.mount.ioptron import MountInfo
from panoptes.pocs.utils.location import create_location_from_config

app = typer.Typer()

Expand All @@ -31,7 +39,7 @@ def park_mount(
if not confirm:
print('[red]Cancelled.[/red]')
return typer.Abort()

mount = create_mount_from_config()
mount.initialize()
mount.unpark()
Expand All @@ -54,19 +62,19 @@ def set_park_position(
if not confirm:
print('[red]Cancelled.[/red]')
return typer.Abort()

mount = create_mount_from_config()
mount.initialize()

# Confirm that they have previously set the home position.
if not typer.confirm('Have you previously set the home position?'):
print('Please set the home position before setting the park position by running "pocs mount search-home".')
return typer.Exit()

print(f'The mount will first park at the default position and then ask you to confirm the new park position.')
mount.unpark()
mount.park()

# Check if correct side of the pier (i.e. RA axis).
if not typer.confirm('Is the mount on the correct side of the pier?'):
# Switch the RA axis.
Expand All @@ -78,7 +86,7 @@ def set_park_position(
mount.unpark()
mount.slew_to_home(blocking=True)
mount.park()

# Check to make sure cameras are facing down (i.e. Dec axis).
if not typer.confirm('Are the cameras facing down?'):
# Switch the DEC axis.
Expand All @@ -90,7 +98,7 @@ def set_park_position(
mount.unpark()
mount.slew_to_home(blocking=True)
mount.park()

# Double-check the park position.
if not typer.confirm('Is the mount parked in the correct position?'):
# Give warning and bail out.
Expand Down Expand Up @@ -119,14 +127,101 @@ def slew_to_home(
if not confirm:
print('[red]Cancelled.[/red]')
return typer.Abort()

mount = create_mount_from_config()
mount.initialize()
mount.unpark()
mount.slew_to_home(blocking=True)
mount.disconnect()


@app.command(name='slew-to-target')
def slew_to_target(
confirm: Annotated[bool, typer.Option(
..., '--confirm',
help='Confirm slew to target.'
)] = False,
target: Annotated[str, typer.Option(
..., '--target', '-t',
prompt='The name of the target to slew the mount to.',
help='The name of the target to slew the mount to.'
)] = None,
comet: Annotated[bool, typer.Option(
..., '--comet',
help='Include if you want to search for comet named `target`'
)] = False
):
"""Slews the mount target position."""
# Get the observer location
location = create_location_from_config()
observe_horizon = location.location.get('horizon', 30 * u.deg)

coords = get_target_coords(target, location.location, is_comet=comet)
if not coords:
print(f'[red]Could not find a suitable target by name or position.[/red]')
return typer.Abort()

# Check that the target is observable.
is_observable = location.observer.target_is_up(current_time(), coords, horizon=observe_horizon)
if not is_observable:
print(f'[red]Target is not observable[/red]')
return typer.Abort()

# Get AltAz for coordinates.
alt_az = coords.transform_to(AltAz(location=location.earth_location, obstime=current_time()))
print(
f'Current position: '
f'\n\tRA/Dec: {coords.ra:5.02f} {coords.dec:+5.02f}'
f'\n\t AltAz: {alt_az.alt:5.02f} {alt_az.az:5.02f}'
)

# Show target info for observatory.
target_set_time = location.observer.target_set_time(current_time(), coords, horizon=observe_horizon, which='next')
set_delta = (target_set_time - current_time()).to_datetime()
print(f'Target will be above {observe_horizon}° for {friendly_time_delta(set_delta)}')

# If not specified on the command line, ask for confirmation.
if not confirm:
print(
'[red]¡ALERT! This command does not do any safety checking for weather, etc. '
'Please use with caution.[/red]'
)
confirm = typer.confirm('Are you sure you want to slew to the target position?')

if not confirm:
print(f'[red]Dry run, will not move the mount.[/red]')
return typer.Abort()

# Initialize the mount and slew to the target.
mount = create_mount_from_config()
mount.initialize()
mount.set_target_coordinates(coords)
mount.unpark()
mount.slew_to_target(blocking=True)

print('[green]Starting to track target, press Ctrl-C to cancel[/green]')
try:
# Show the status every 5 seconds.
timer = CountdownTimer(30)
while mount.is_tracking:
if timer.expired():
print(f"Coordinates: {mount.status['current_ra']:5.02f} {mount.status['current_dec']:+5.02f}")
timer.restart()
timer.sleep(1)
except KeyboardInterrupt:
print('[red]Tracking interrupted.[/red]')
finally:
option, index = pick(['Home', 'Park', 'Nothing'], 'What would you like to do next?')
if option == 'Home':
print("[green]Moving mount to the home position (don't forget to park!)[/green]")
mount.slew_to_home(blocking=True)
elif option == 'Park':
print('[green]Moving mount to the parking position [/green]')
mount.home_and_park(blocking=True)

mount.disconnect()


@app.command(name='search-home')
def search_for_home(
confirm: Annotated[bool, typer.Option(
Expand All @@ -143,7 +238,7 @@ def search_for_home(
if not confirm:
print('[red]Cancelled.[/red]')
return typer.Abort()

mount = create_mount_from_config()
mount.initialize()
mount.search_for_home()
Expand All @@ -162,34 +257,35 @@ def setup_mount(
if not confirm:
print('[red]Cancelled.[/red]')
return typer.Abort()

# Baudrates to check.
baudrates = [9600, 115200]

# Get all the serial ports.
ports = get_serial_port_info()

print(f'Checking on {len(ports)} ports...')

# Loop through all the ports and baudrates.
for port in ports:
if 'ttyUSB' not in port.device:
if 'usb' not in port.device.lower():
continue
for baudrate in baudrates:
print(f"Trying {port.device=} at {baudrate=}...")
device = SerialData(port=port.device, baudrate=baudrate, timeout=1)

try:
device.write(':MountInfo#')
try:
response = device.read()
except serial.SerialException:
print('\tDevice potentially being accessed by another process.')
continue

if re.match(r'\d{4}', response): # iOptron specific
mount_type = MountInfo(int(response[0:4]))
print(f'Found mount at {port.device=} at {baudrate=} with {response=}.')
print(f'It looks like an iOptron {mount_type.name}.')

# Get the mainboard and handcontroller firmware version.
device.write(':FW1#')
response = device.read()
Expand All @@ -198,20 +294,20 @@ def setup_mount(
print('Firmware:')
print(f'\tMainboard: {mainboard_fw}')
print(f'\tHandcontroller: {handcontroller_fw}')

# Get the RA and DEC firmware version.
device.write(':FW2#')
response = device.read()
ra_fw = int(response[:6])
dec_fw = int(response[6:-1])
print(f'\tRA: {ra_fw}')
print(f'\tDEC: {dec_fw}')

command_set = 'v310' if ra_fw >= 210101 and dec_fw >= 210101 else 'v250'
print(f'Suggested command set: {command_set}')

write_port = port.device

if typer.confirm('Do you want to make a udev entry?'):
print('Creating udev entry for device')
# Get info for writing udev entry.
Expand All @@ -224,22 +320,22 @@ def setup_mount(
)
if port.serial_number is not None:
udev_str += f'ATTRS{{serial}}=="{port.serial_number}", '

# The name we want it known by.
udev_str += f'SYMLINK+="ioptron"'

udev_fn = Path('91-panoptes.rules')
with udev_fn.open('w') as f:
f.write(udev_str)

write_port = '/dev/ioptron'

print(f'Wrote udev entry to [green]{udev_fn}[/green].')
print('Run the following command and then reboot for changes to take effect:')
print(f'\t[green]cat {udev_fn} | sudo tee /etc/udev/rules.d/{udev_fn}[/green]')
except Exception:
pass

# Confirm the user wants to update the config.
if typer.confirm('Do you want to update the config?'):
print('Updating config.')
Expand All @@ -249,7 +345,30 @@ def setup_mount(
set_config('mount.model', mount_type.name.lower())
set_config('mount.driver', f'panoptes.pocs.mount.ioptron.{mount_type.name.lower()}')
set_config('mount.commands_file', f'ioptron/{command_set}')

return typer.Exit()
except serial.SerialTimeoutException:
pass


def get_target_coords(target: str, location: dict, is_comet: bool = False):
"""Get the coordinates of the target. """
print(f'Looking for coordinates for {target}.')
coords = None

if is_comet:
location['lat'] = location['latitude']
location['lon'] = location['longitude']
obj = Horizons(id=target, id_type='smallbody', epochs=current_time().jd1, location=location)
eph = obj.ephemerides()
coords = SkyCoord(eph[0]['RA'], eph[0]['DEC'], unit=(u.deg, u.deg))
else:
try:
coords = SkyCoord(target)
except ValueError:
try:
coords = SkyCoord.from_name(target)
except NameResolveError:
pass

return coords

0 comments on commit 6047ffb

Please sign in to comment.