Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use threads instead of processes in test_pocs.py #468

Merged
merged 2 commits into from
Feb 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions pocs/tests/test_pocs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
import os
import pytest
import time

from multiprocessing import Process
import threading

from astropy import units as u

from pocs import hardware
from pocs.core import POCS
from pocs.observatory import Observatory
from pocs.utils import Timeout
from pocs.utils.messaging import PanMessaging


def wait_for_running(sub, max_duration=90):
"""Given a message subscriber, wait for a RUNNING message."""
timeout = Timeout(max_duration)
while not timeout.expired():
msg_type, msg_obj = sub.receive_message()
if msg_obj and 'RUNNING' == msg_obj.get('message'):
return True
return False


def wait_for_state(sub, state, max_duration=90):
"""Given a message subscriber, wait for the specified state."""
timeout = Timeout(max_duration)
while not timeout.expired():
msg_type, msg_obj = sub.receive_message()
if msg_type == 'STATUS' and msg_obj and msg_obj.get('state') == state:
return True
return False


@pytest.fixture(scope='function')
def observatory(config, db_type):
observatory = Observatory(
Expand Down Expand Up @@ -199,7 +219,9 @@ def test_run_wait_until_safe(observatory):
observatory.db.clear_current('weather')

def start_pocs():
observatory.config['simulator'] = ['camera', 'mount', 'night']
observatory.logger.info('start_pocs ENTER')
# Remove weather simulator, else it would always be safe.
observatory.config['simulator'] = hardware.get_all_names(without=['weather'])

pocs = POCS(observatory,
messaging=True, safe_delay=5)
Expand All @@ -220,32 +242,28 @@ def start_pocs():
pocs.run(run_once=True, exit_when_done=True)
assert pocs.is_weather_safe() is True
pocs.power_down()
observatory.logger.info('start_pocs EXIT')

pub = PanMessaging.create_publisher(6500)
sub = PanMessaging.create_subscriber(6511)

pocs_process = Process(target=start_pocs)
pocs_process.start()
pocs_thread = threading.Thread(target=start_pocs)
pocs_thread.start()

# Wait for the running message
while True:
msg_type, msg_obj = sub.receive_message()
if msg_obj is None:
continue
try:
# Wait for the RUNNING message,
assert wait_for_running(sub)

if msg_obj.get('message', '') == 'RUNNING':
time.sleep(2)
# Insert a dummy weather record to break wait
observatory.db.insert_current('weather', {'safe': True})
time.sleep(2)
# Insert a dummy weather record to break wait
observatory.db.insert_current('weather', {'safe': True})

if msg_type == 'STATUS':
current_state = msg_obj.get('state', {})
if current_state == 'pointing':
pub.send_message('POCS-CMD', 'shutdown')
break
assert wait_for_state(sub, 'scheduling')
finally:
pub.send_message('POCS-CMD', 'shutdown')
pocs_thread.join(timeout=30)

pocs_process.join()
assert pocs_process.is_alive() is False
assert pocs_thread.is_alive() is False


def test_unsafe_park(pocs):
Expand Down Expand Up @@ -336,6 +354,7 @@ def test_run_complete(pocs):

def test_run_power_down_interrupt(observatory):
def start_pocs():
observatory.logger.info('start_pocs ENTER')
pocs = POCS(observatory, messaging=True)
pocs.initialize()
pocs.observatory.scheduler.clear_available_observations()
Expand All @@ -349,23 +368,21 @@ def start_pocs():
pocs.logger.info('Starting observatory run')
pocs.run()
pocs.power_down()
observatory.logger.info('start_pocs EXIT')

pocs_process = Process(target=start_pocs)
pocs_process.start()
pocs_thread = threading.Thread(target=start_pocs)
pocs_thread.start()

pub = PanMessaging.create_publisher(6500)
sub = PanMessaging.create_subscriber(6511)

while True:
msg_type, msg_obj = sub.receive_message()
if msg_type == 'STATUS':
current_state = msg_obj.get('state', {})
if current_state == 'pointing':
pub.send_message('POCS-CMD', 'shutdown')
break

pocs_process.join()
assert pocs_process.is_alive() is False
try:
assert wait_for_state(sub, 'scheduling')
finally:
pub.send_message('POCS-CMD', 'shutdown')
pocs_thread.join(timeout=30)

assert pocs_thread.is_alive() is False


def test_pocs_park_to_ready_with_observations(pocs):
Expand Down
38 changes: 37 additions & 1 deletion pocs/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import re
import shutil
import subprocess

import time

from astropy import units as u
from astropy.coordinates import AltAz
Expand Down Expand Up @@ -48,6 +48,42 @@ def flatten_time(t):
return t.isot.replace('-', '').replace(':', '').split('.')[0]


# This is a streamlined variant of PySerial's serialutil.Timeout.
class Timeout(object):
"""Simple timer object for tracking whether a time duration has elapsed.

Attribute `is_non_blocking` is true IFF the duration is zero.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand where non-blocking case. Where would you be using a timeout=0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, timeout==0 means that it will always be considered to have expired.

"""

def __init__(self, duration):
"""Initialize a timeout with given duration (seconds)."""
assert duration >= 0
self.is_non_blocking = (duration == 0)
self.duration = duration
self.restart()

def expired(self):
"""Return a boolean, telling if the timeout has expired."""
return self.time_left() <= 0

def time_left(self):
"""Return how many seconds are left until the timeout expires."""
if self.is_non_blocking:
return 0
else:
delta = self.target_time - time.monotonic()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something about where these come from? Should this class inherit from serialutil.Timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to just copy their code and eliminate things we don't need. time.monotonic() is like time.time(), but guarantees that it won't go backwards.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I didn't realize target_time first gets set in the restart method so didn't see where it was coming from. Makes sense now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although maybe just start is a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restart because init calls it, so that it is started at creation. If too confusing, I could remove restart, so you have to create a new instance each time, which is definitely the common way to use it.

if delta > self.duration:
# clock jumped, recalculate
self.restart()
return self.duration
else:
return max(0, delta)

def restart(self):
"""Restart the timed duration."""
self.target_time = time.monotonic() + self.duration


def listify(obj):
""" Given an object, return a list

Expand Down