Skip to content

Commit

Permalink
Use topic terminology from ZeroMQ (#605)
Browse files Browse the repository at this point in the history
We were using both channel and msg_type to refer to what ZeroMQ
calls topics. No need for three terms, so this changes them
all to topic.
  • Loading branch information
jamessynge authored Sep 18, 2018
1 parent 0de5e66 commit c1535f9
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 84 deletions.
6 changes: 3 additions & 3 deletions bin/pocs_shell
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,12 @@ Hardware names: {} (or all for all hardware)'''.format(
if self.cmd_subscriber.socket in sockets and \
sockets[self.cmd_subscriber.socket] == zmq.POLLIN:

msg_type, msg_obj = self.cmd_subscriber.receive_message(
topic, msg_obj = self.cmd_subscriber.receive_message(
flags=zmq.NOBLOCK)
print_info("{} {}".format(msg_type, msg_obj))
print_info("{} {}".format(topic, msg_obj))

# Put the message in a queue to be processed
if msg_type == 'PAWS-CMD':
if topic == 'PAWS-CMD':
try:
print_info("Command received: {}".format(
msg_obj['message']))
Expand Down
6 changes: 3 additions & 3 deletions peas/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ def disconnect(self):
reader = reader_info['reader']
reader.stop()

def send_message(self, msg, channel='environment'):
def send_message(self, msg, topic='environment'):
if self.messaging is None:
self.messaging = PanMessaging.create_publisher(6510)

self.messaging.send_message(channel, msg)
self.messaging.send_message(topic, msg)

def capture(self, store_result=True, send_message=True):
"""
Expand Down Expand Up @@ -114,7 +114,7 @@ def capture(self, store_result=True, send_message=True):
data['date'] = time_stamp
sensor_data[sensor_name] = data
if send_message:
self.send_message({'data': data}, channel='environment')
self.send_message({'data': data}, topic='environment')
except Exception as e:
self.logger.warning('Exception while reading from sensor {}: {}', sensor_name, e)

Expand Down
6 changes: 3 additions & 3 deletions peas/weather.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,11 @@ def get_wind_speed(self, n=3):
self.wind_speed = None
return self.wind_speed

def send_message(self, msg, channel='weather'):
def send_message(self, msg, topic='weather'):
if self.messaging is None:
self.messaging = PanMessaging.create_publisher(6510)

self.messaging.send_message(channel, msg)
self.messaging.send_message(topic, msg)

def capture(self, store_result=False, send_message=False, **kwargs):
""" Query the CloudWatcher """
Expand Down Expand Up @@ -660,7 +660,7 @@ def capture(self, store_result=False, send_message=False, **kwargs):
self.calculate_and_set_PWM()

if send_message:
self.send_message({'data': data}, channel='weather')
self.send_message({'data': data}, topic='weather')

if store_result:
self.db.insert_current('weather', data)
Expand Down
20 changes: 10 additions & 10 deletions pocs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,38 +167,38 @@ def status(self):
except Exception as e: # pragma: no cover
self.logger.warning("Can't get status: {}".format(e))
else:
self.send_message(status, channel='STATUS')
self.send_message(status, topic='STATUS')

return status

def say(self, msg):
""" PANOPTES Units like to talk!
Send a message. Message sent out through zmq has unit name as channel.
Send a message.
Args:
msg(str): Message to be sent
msg(str): Message to be sent to topic PANCHAT.
"""
if self.has_messaging is False:
self.logger.info('Unit says: {}', msg)
self.send_message(msg, channel='PANCHAT')
self.send_message(msg, topic='PANCHAT')

def send_message(self, msg, channel='POCS'):
def send_message(self, msg, topic='POCS'):
""" Send a message
This will use the `self._msg_publisher` to send a message
Note:
The `channel` and `msg` params are switched for convenience
The `topic` and `msg` params are switched for convenience
Arguments:
msg {str} -- Message to be sent
Keyword Arguments:
channel {str} -- Channel to send message on (default: {'POCS'})
topic {str} -- Topic to send message on (default: {'POCS'})
"""
if self.has_messaging:
self._msg_publisher.send_message(channel, msg)
self._msg_publisher.send_message(topic, msg)

def check_messages(self):
""" Check messages for the system
Expand Down Expand Up @@ -556,10 +556,10 @@ def check_message_loop(cmd_queue):
if cmd_subscriber.socket in sockets and \
sockets[cmd_subscriber.socket] == zmq.POLLIN:

msg_type, msg_obj = cmd_subscriber.receive_message(flags=zmq.NOBLOCK)
topic, msg_obj = cmd_subscriber.receive_message(flags=zmq.NOBLOCK)

# Put the message in a queue to be processed
if msg_type == 'POCS-CMD':
if topic == 'POCS-CMD':
cmd_queue.put(msg_obj)

time.sleep(1)
Expand Down
12 changes: 6 additions & 6 deletions pocs/sensors/arduino_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self, board, serial_data, db, pub, sub):
board:
The name of the board, used as the name of the database
table/collection to write to, and the name of the messaging
channels for readings or relay commands.
topics for readings or relay commands.
serial_data:
A SerialData instance connected to the board.
db:
Expand All @@ -141,7 +141,7 @@ def __init__(self, board, serial_data, db, pub, sub):
self._logger = get_root_logger()
self._last_reading = None
self._report_next_reading = True
self._cmd_channel = "{}:commands".format(board)
self._cmd_topic = "{}:commands".format(board)
self._keep_running = True

def run(self):
Expand All @@ -160,7 +160,7 @@ def read_and_record(self):
"""Try to get the next reading and, if successful, record it.
Write the reading to the appropriate PanDB collections and
to the appropriate message channel.
to the appropriate message topic.
If there is an interruption in success in reading from the device,
we announce (log) the start and end of that situation.
Expand Down Expand Up @@ -251,10 +251,10 @@ def handle_commands(self):
"""
timeout_obj = serialutil.Timeout(1.0)
while not timeout_obj.expired():
msg_type, msg_obj = self._sub.receive_message(blocking=False)
if msg_type is None or msg_obj is None:
topic, msg_obj = self._sub.receive_message(blocking=False)
if topic is None or msg_obj is None:
break
if msg_type.lower() == self._cmd_channel:
if topic.lower() == self._cmd_topic:
try:
self.handle_command(msg_obj)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion pocs/state/states/default/observing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def on_enter(event_data):
Frequently check for the exposures to complete, the observation to be
interrupted, messages to be received. Periodically post to the STATUS
channel and to the debug log.
topic and to the debug log.
"""
pocs = event_data.model
pocs.say("I'm finding exoplanets!")
Expand Down
16 changes: 8 additions & 8 deletions pocs/tests/test_arduino_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri
assert got_reading is True

# Check that the reading was sent as a message.
msg_type, msg_obj = msg_subscriber.receive_message(blocking=False)
assert msg_type == board
topic, msg_obj = msg_subscriber.receive_message(blocking=False)
assert topic == board
assert isinstance(msg_obj, dict)
assert len(msg_obj) == 3
assert isinstance(msg_obj.get('data'), dict)
Expand All @@ -186,16 +186,16 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri
assert stored_reading['type'] == board

# There should be no new messages because we haven't called read_and_record again.
msg_type, msg_obj = msg_subscriber.receive_message(blocking=False)
assert msg_type is None
topic, msg_obj = msg_subscriber.receive_message(blocking=False)
assert topic is None
assert msg_obj is None

# Send a command. For now, just a string to be sent.
# TODO(jamessynge): Add named based setting of relays.
# TODO(jamessynge): Add some validation of the effect of the command.
cmd_channel = board + ':commands'
assert cmd_channel == aio._cmd_channel
cmd_publisher.send_message(cmd_channel, dict(command='write_line', line='relay=on'))
cmd_topic = board + ':commands'
assert cmd_topic == aio._cmd_topic
cmd_publisher.send_message(cmd_topic, dict(command='write_line', line='relay=on'))
aio.handle_commands()

# Confirm that it checks the name of the board. If the reading contains
Expand All @@ -208,7 +208,7 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri
# Ask it to stop working. Just records the request in a private variable,
# but if we'd been running it in a separate process this is how we'd get it
# to shutdown cleanly; the alternative would be to kill the process.
cmd_publisher.send_message(cmd_channel, dict(command='shutdown'))
cmd_publisher.send_message(cmd_topic, dict(command='shutdown'))
assert aio._keep_running
aio.handle_commands()
assert not aio._keep_running
14 changes: 7 additions & 7 deletions pocs/tests/test_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,27 @@ def pub_and_sub(forwarder):

def test_send_string(pub_and_sub):
pub, sub = pub_and_sub
pub.send_message('TEST-CHANNEL', 'Hello')
msg_type, msg_obj = sub.receive_message()
pub.send_message('Test-Topic', 'Hello')
topic, msg_obj = sub.receive_message()

assert msg_type == 'TEST-CHANNEL'
assert topic == 'Test-Topic'
assert isinstance(msg_obj, dict)
assert 'message' in msg_obj
assert msg_obj['message'] == 'Hello'


def test_send_datetime(pub_and_sub):
pub, sub = pub_and_sub
pub.send_message('TEST-CHANNEL', {'date': datetime(2017, 1, 1)})
msg_type, msg_obj = sub.receive_message()
pub.send_message('Test-Topic', {'date': datetime(2017, 1, 1)})
topic, msg_obj = sub.receive_message()
assert msg_obj['date'] == '2017-01-01T00:00:00'


def test_storage_id(pub_and_sub, config, db):
id0 = db.insert_current('config', {'foo': 'bar'}, store_permanently=False)
pub, sub = pub_and_sub
pub.send_message('TEST-CHANNEL', db.get_current('config'))
msg_type, msg_obj = sub.receive_message()
pub.send_message('Test-Topic', db.get_current('config'))
topic, msg_obj = sub.receive_message()
assert '_id' in msg_obj
assert isinstance(msg_obj['_id'], str)
assert id0 == msg_obj['_id']
12 changes: 6 additions & 6 deletions pocs/tests/test_pocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def wait_for_running(sub, max_duration=90):
"""Given a message subscriber, wait for a RUNNING message."""
timeout = Timeout(max_duration)
while not timeout.expired():
msg_type, msg_obj = sub.receive_message()
topic, msg_obj = sub.receive_message()
if msg_obj and 'RUNNING' == msg_obj.get('message'):
return True
return False
Expand All @@ -26,8 +26,8 @@ def wait_for_state(sub, state, max_duration=90):
"""Given a message subscriber, wait for the specified state."""
timeout = Timeout(max_duration)
while not timeout.expired():
msg_type, msg_obj = sub.receive_message()
if msg_type == 'STATUS' and msg_obj and msg_obj.get('state') == state:
topic, msg_obj = sub.receive_message()
if topic == 'STATUS' and msg_obj and msg_obj.get('state') == state:
return True
return False

Expand Down Expand Up @@ -216,16 +216,16 @@ def wait_for_message(sub, type=None, attr=None, value=None):
"""Wait for a message of the specified type and contents."""
assert (attr is None) == (value is None)
while True:
msg_type, msg_obj = sub.receive_message()
topic, msg_obj = sub.receive_message()
if not msg_obj:
continue
if type and msg_type != type:
if type and topic != type:
continue
if not attr or attr not in msg_obj:
continue
if value and msg_obj[attr] != value:
continue
return msg_type, msg_obj
return topic, msg_obj


def test_run_wait_until_safe(observatory):
Expand Down
Loading

0 comments on commit c1535f9

Please sign in to comment.