From c1535f9ab4d64e752998cc7e100cfe55c8f0c8c3 Mon Sep 17 00:00:00 2001 From: James Synge Date: Tue, 18 Sep 2018 10:33:59 -0400 Subject: [PATCH] Use topic terminology from ZeroMQ (#605) We were using both channel and msg_type to refer to what ZeroMQ calls topics. No need for three terms, so this changes them all to topic. --- bin/pocs_shell | 6 +-- peas/sensors.py | 6 +-- peas/weather.py | 6 +-- pocs/core.py | 20 ++++---- pocs/sensors/arduino_io.py | 12 ++--- pocs/state/states/default/observing.py | 2 +- pocs/tests/test_arduino_io.py | 16 +++--- pocs/tests/test_messaging.py | 14 +++--- pocs/tests/test_pocs.py | 12 ++--- pocs/utils/messaging.py | 67 ++++++++++++-------------- scripts/run_social_messaging.py | 2 +- 11 files changed, 79 insertions(+), 84 deletions(-) diff --git a/bin/pocs_shell b/bin/pocs_shell index 8ca89cd2e..854dba3fb 100755 --- a/bin/pocs_shell +++ b/bin/pocs_shell @@ -455,12 +455,12 @@ Hardware names: {} (or all for all hardware)'''.format( if self.cmd_subscriber.socket in sockets and \ sockets[self.cmd_subscriber.socket] == zmq.POLLIN: - msg_type, msg_obj = self.cmd_subscriber.receive_message( + topic, msg_obj = self.cmd_subscriber.receive_message( flags=zmq.NOBLOCK) - print_info("{} {}".format(msg_type, msg_obj)) + print_info("{} {}".format(topic, msg_obj)) # Put the message in a queue to be processed - if msg_type == 'PAWS-CMD': + if topic == 'PAWS-CMD': try: print_info("Command received: {}".format( msg_obj['message'])) diff --git a/peas/sensors.py b/peas/sensors.py index 3828f0134..1effbb78b 100644 --- a/peas/sensors.py +++ b/peas/sensors.py @@ -73,11 +73,11 @@ def disconnect(self): reader = reader_info['reader'] reader.stop() - def send_message(self, msg, channel='environment'): + def send_message(self, msg, topic='environment'): if self.messaging is None: self.messaging = PanMessaging.create_publisher(6510) - self.messaging.send_message(channel, msg) + self.messaging.send_message(topic, msg) def capture(self, store_result=True, send_message=True): """ @@ -114,7 +114,7 @@ def capture(self, store_result=True, send_message=True): data['date'] = time_stamp sensor_data[sensor_name] = data if send_message: - self.send_message({'data': data}, channel='environment') + self.send_message({'data': data}, topic='environment') except Exception as e: self.logger.warning('Exception while reading from sensor {}: {}', sensor_name, e) diff --git a/peas/weather.py b/peas/weather.py index 967899d59..6044cf382 100755 --- a/peas/weather.py +++ b/peas/weather.py @@ -604,11 +604,11 @@ def get_wind_speed(self, n=3): self.wind_speed = None return self.wind_speed - def send_message(self, msg, channel='weather'): + def send_message(self, msg, topic='weather'): if self.messaging is None: self.messaging = PanMessaging.create_publisher(6510) - self.messaging.send_message(channel, msg) + self.messaging.send_message(topic, msg) def capture(self, store_result=False, send_message=False, **kwargs): """ Query the CloudWatcher """ @@ -660,7 +660,7 @@ def capture(self, store_result=False, send_message=False, **kwargs): self.calculate_and_set_PWM() if send_message: - self.send_message({'data': data}, channel='weather') + self.send_message({'data': data}, topic='weather') if store_result: self.db.insert_current('weather', data) diff --git a/pocs/core.py b/pocs/core.py index ef3d831dc..c1c6fb669 100644 --- a/pocs/core.py +++ b/pocs/core.py @@ -167,38 +167,38 @@ def status(self): except Exception as e: # pragma: no cover self.logger.warning("Can't get status: {}".format(e)) else: - self.send_message(status, channel='STATUS') + self.send_message(status, topic='STATUS') return status def say(self, msg): """ PANOPTES Units like to talk! - Send a message. Message sent out through zmq has unit name as channel. + Send a message. Args: - msg(str): Message to be sent + msg(str): Message to be sent to topic PANCHAT. """ if self.has_messaging is False: self.logger.info('Unit says: {}', msg) - self.send_message(msg, channel='PANCHAT') + self.send_message(msg, topic='PANCHAT') - def send_message(self, msg, channel='POCS'): + def send_message(self, msg, topic='POCS'): """ Send a message This will use the `self._msg_publisher` to send a message Note: - The `channel` and `msg` params are switched for convenience + The `topic` and `msg` params are switched for convenience Arguments: msg {str} -- Message to be sent Keyword Arguments: - channel {str} -- Channel to send message on (default: {'POCS'}) + topic {str} -- Topic to send message on (default: {'POCS'}) """ if self.has_messaging: - self._msg_publisher.send_message(channel, msg) + self._msg_publisher.send_message(topic, msg) def check_messages(self): """ Check messages for the system @@ -556,10 +556,10 @@ def check_message_loop(cmd_queue): if cmd_subscriber.socket in sockets and \ sockets[cmd_subscriber.socket] == zmq.POLLIN: - msg_type, msg_obj = cmd_subscriber.receive_message(flags=zmq.NOBLOCK) + topic, msg_obj = cmd_subscriber.receive_message(flags=zmq.NOBLOCK) # Put the message in a queue to be processed - if msg_type == 'POCS-CMD': + if topic == 'POCS-CMD': cmd_queue.put(msg_obj) time.sleep(1) diff --git a/pocs/sensors/arduino_io.py b/pocs/sensors/arduino_io.py index e67f392fc..2ff5124d7 100644 --- a/pocs/sensors/arduino_io.py +++ b/pocs/sensors/arduino_io.py @@ -121,7 +121,7 @@ def __init__(self, board, serial_data, db, pub, sub): board: The name of the board, used as the name of the database table/collection to write to, and the name of the messaging - channels for readings or relay commands. + topics for readings or relay commands. serial_data: A SerialData instance connected to the board. db: @@ -141,7 +141,7 @@ def __init__(self, board, serial_data, db, pub, sub): self._logger = get_root_logger() self._last_reading = None self._report_next_reading = True - self._cmd_channel = "{}:commands".format(board) + self._cmd_topic = "{}:commands".format(board) self._keep_running = True def run(self): @@ -160,7 +160,7 @@ def read_and_record(self): """Try to get the next reading and, if successful, record it. Write the reading to the appropriate PanDB collections and - to the appropriate message channel. + to the appropriate message topic. If there is an interruption in success in reading from the device, we announce (log) the start and end of that situation. @@ -251,10 +251,10 @@ def handle_commands(self): """ timeout_obj = serialutil.Timeout(1.0) while not timeout_obj.expired(): - msg_type, msg_obj = self._sub.receive_message(blocking=False) - if msg_type is None or msg_obj is None: + topic, msg_obj = self._sub.receive_message(blocking=False) + if topic is None or msg_obj is None: break - if msg_type.lower() == self._cmd_channel: + if topic.lower() == self._cmd_topic: try: self.handle_command(msg_obj) except Exception as e: diff --git a/pocs/state/states/default/observing.py b/pocs/state/states/default/observing.py index 159f0c389..c33187bef 100644 --- a/pocs/state/states/default/observing.py +++ b/pocs/state/states/default/observing.py @@ -14,7 +14,7 @@ def on_enter(event_data): Frequently check for the exposures to complete, the observation to be interrupted, messages to be received. Periodically post to the STATUS - channel and to the debug log. + topic and to the debug log. """ pocs = event_data.model pocs.say("I'm finding exoplanets!") diff --git a/pocs/tests/test_arduino_io.py b/pocs/tests/test_arduino_io.py index 3cb3ae652..c56a14615 100644 --- a/pocs/tests/test_arduino_io.py +++ b/pocs/tests/test_arduino_io.py @@ -168,8 +168,8 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri assert got_reading is True # Check that the reading was sent as a message. - msg_type, msg_obj = msg_subscriber.receive_message(blocking=False) - assert msg_type == board + topic, msg_obj = msg_subscriber.receive_message(blocking=False) + assert topic == board assert isinstance(msg_obj, dict) assert len(msg_obj) == 3 assert isinstance(msg_obj.get('data'), dict) @@ -186,16 +186,16 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri assert stored_reading['type'] == board # There should be no new messages because we haven't called read_and_record again. - msg_type, msg_obj = msg_subscriber.receive_message(blocking=False) - assert msg_type is None + topic, msg_obj = msg_subscriber.receive_message(blocking=False) + assert topic is None assert msg_obj is None # Send a command. For now, just a string to be sent. # TODO(jamessynge): Add named based setting of relays. # TODO(jamessynge): Add some validation of the effect of the command. - cmd_channel = board + ':commands' - assert cmd_channel == aio._cmd_channel - cmd_publisher.send_message(cmd_channel, dict(command='write_line', line='relay=on')) + cmd_topic = board + ':commands' + assert cmd_topic == aio._cmd_topic + cmd_publisher.send_message(cmd_topic, dict(command='write_line', line='relay=on')) aio.handle_commands() # Confirm that it checks the name of the board. If the reading contains @@ -208,7 +208,7 @@ def test_basic_arduino_io(serial_handlers, memory_db, msg_publisher, msg_subscri # Ask it to stop working. Just records the request in a private variable, # but if we'd been running it in a separate process this is how we'd get it # to shutdown cleanly; the alternative would be to kill the process. - cmd_publisher.send_message(cmd_channel, dict(command='shutdown')) + cmd_publisher.send_message(cmd_topic, dict(command='shutdown')) assert aio._keep_running aio.handle_commands() assert not aio._keep_running diff --git a/pocs/tests/test_messaging.py b/pocs/tests/test_messaging.py index a82e542af..d2bbdc673 100644 --- a/pocs/tests/test_messaging.py +++ b/pocs/tests/test_messaging.py @@ -61,10 +61,10 @@ def pub_and_sub(forwarder): def test_send_string(pub_and_sub): pub, sub = pub_and_sub - pub.send_message('TEST-CHANNEL', 'Hello') - msg_type, msg_obj = sub.receive_message() + pub.send_message('Test-Topic', 'Hello') + topic, msg_obj = sub.receive_message() - assert msg_type == 'TEST-CHANNEL' + assert topic == 'Test-Topic' assert isinstance(msg_obj, dict) assert 'message' in msg_obj assert msg_obj['message'] == 'Hello' @@ -72,16 +72,16 @@ def test_send_string(pub_and_sub): def test_send_datetime(pub_and_sub): pub, sub = pub_and_sub - pub.send_message('TEST-CHANNEL', {'date': datetime(2017, 1, 1)}) - msg_type, msg_obj = sub.receive_message() + pub.send_message('Test-Topic', {'date': datetime(2017, 1, 1)}) + topic, msg_obj = sub.receive_message() assert msg_obj['date'] == '2017-01-01T00:00:00' def test_storage_id(pub_and_sub, config, db): id0 = db.insert_current('config', {'foo': 'bar'}, store_permanently=False) pub, sub = pub_and_sub - pub.send_message('TEST-CHANNEL', db.get_current('config')) - msg_type, msg_obj = sub.receive_message() + pub.send_message('Test-Topic', db.get_current('config')) + topic, msg_obj = sub.receive_message() assert '_id' in msg_obj assert isinstance(msg_obj['_id'], str) assert id0 == msg_obj['_id'] diff --git a/pocs/tests/test_pocs.py b/pocs/tests/test_pocs.py index a958a1b98..99f68213a 100644 --- a/pocs/tests/test_pocs.py +++ b/pocs/tests/test_pocs.py @@ -16,7 +16,7 @@ def wait_for_running(sub, max_duration=90): """Given a message subscriber, wait for a RUNNING message.""" timeout = Timeout(max_duration) while not timeout.expired(): - msg_type, msg_obj = sub.receive_message() + topic, msg_obj = sub.receive_message() if msg_obj and 'RUNNING' == msg_obj.get('message'): return True return False @@ -26,8 +26,8 @@ def wait_for_state(sub, state, max_duration=90): """Given a message subscriber, wait for the specified state.""" timeout = Timeout(max_duration) while not timeout.expired(): - msg_type, msg_obj = sub.receive_message() - if msg_type == 'STATUS' and msg_obj and msg_obj.get('state') == state: + topic, msg_obj = sub.receive_message() + if topic == 'STATUS' and msg_obj and msg_obj.get('state') == state: return True return False @@ -216,16 +216,16 @@ def wait_for_message(sub, type=None, attr=None, value=None): """Wait for a message of the specified type and contents.""" assert (attr is None) == (value is None) while True: - msg_type, msg_obj = sub.receive_message() + topic, msg_obj = sub.receive_message() if not msg_obj: continue - if type and msg_type != type: + if type and topic != type: continue if not attr or attr not in msg_obj: continue if value and msg_obj[attr] != value: continue - return msg_type, msg_obj + return topic, msg_obj def test_run_wait_until_safe(observatory): diff --git a/pocs/utils/messaging.py b/pocs/utils/messaging.py index 5efd3b33e..edda1ed65 100644 --- a/pocs/utils/messaging.py +++ b/pocs/utils/messaging.py @@ -32,9 +32,9 @@ class PanMessaging(object): an instance of PanMessaging, on which they can then call receive_message. - Messages are sent to channels, a name that can be used to allow - a high-level partitioning of messages. A channel name may not - include whitespace. Among the currently used channel names are: + Messages are sent to topics, a name that can be used to allow + a high-level partitioning of messages. A topic name may not + include whitespace. Among the currently used topic names are: * PANCHAT (sent from POCS.say) * PAWS-CMD (sent from PAWS websockets.py) @@ -46,25 +46,20 @@ class PanMessaging(object): * telemetry:commands (in ArduinoIO... new) * camera:commands (in ArduinoIO... new) - And some other channels are used in tests: + And some other topics are used in tests: - * TEST-CHANNEL (test_messaging.py) + * Test-Topic (test_messaging.py) * RUNNING (test_pocs.py) * POCS-CMD (test_pocs.py) - The method receive_message will return messages from all channels; - the caller must check the returned channel name to determine if + The method receive_message will return messages from all topics; + the caller must check the returned topic name to determine if the message value is of interest. Note: PAWS doesn't use PanMessaging, which will likely result in - problems as we evolve PanMessaging and the set of channels. + problems as we evolve PanMessaging and the set of topics. TODO: Figure out how to share PanMessaging with PAWS. - Note: there is some inconsistency in the code. Senders refer to - the channel of a message, but receivers refer to messages as having - a msg_type. - TODO: Make this more consistent. - The value of a message being sent may be a string (in which case it is wrapped in a dict(message=, timestamp=) or a dict, in which case it will be "scrubbed", i.e. the dict entries will be @@ -77,14 +72,14 @@ class PanMessaging(object): ZeroMQ is used to provide the underlying pub-sub support. ZeroMQ supports only a very basic message format: an array of bytes. - PanMessaging converts the provided message channel and value into + PanMessaging converts the provided message topic and value into a byte array of this format: - + """ logger = get_root_logger() - # Channel names must consist of the characters. - name_re = re.compile('[a-zA-Z][-a-zA-Z0-9_.:]*') + # Topic names must consist of the characters. + topic_name_re = re.compile('[a-zA-Z][-a-zA-Z0-9_.:]*') def __init__(self, **kwargs): """Do not call this directly.""" @@ -146,16 +141,16 @@ def create_publisher(cls, port, bind=False, connect=True): return obj @classmethod - def create_subscriber(cls, port, channel='', bind=False, connect=True): + def create_subscriber(cls, port, topic='', bind=False, connect=True): """ Create a listener Args: port (int): The port (on localhost) to bind to. - channel (str): Which topic channel to subscribe to. + topic (str): Which topic or topic prefix to subscribe to. """ obj = cls() - obj.logger.debug("Creating subscriber. Port: {} \tChannel: {}".format(port, channel)) + obj.logger.debug("Creating subscriber. Port: {} \tTopic: {}".format(port, topic)) socket = obj.context.socket(zmq.SUB) @@ -167,24 +162,24 @@ def create_subscriber(cls, port, channel='', bind=False, connect=True): elif connect: socket.connect('tcp://localhost:{}'.format(port)) - socket.setsockopt_string(zmq.SUBSCRIBE, channel) + socket.setsockopt_string(zmq.SUBSCRIBE, topic) obj.socket = socket return obj - def send_message(self, channel, message): - """ Responsible for actually sending message across a channel + def send_message(self, topic, message): + """ Responsible for actually sending message across a topic Args: - channel(str): Name of channel to send on. The name must - match name_re. + topic(str): Name of topic to send on. The name must + match topic_name_re. message: Message to be sent (a string or a dict). """ - if not isinstance(channel, str): - raise ValueError('Channel name must be a string') - elif not self.name_re.fullmatch(channel): - raise ValueError('Channel name ("{}") is not valid'.format(channel)) + if not isinstance(topic, str): + raise ValueError('Topic name must be a string') + elif not self.topic_name_re.fullmatch(topic): + raise ValueError('Topic name ("{}") is not valid'.format(topic)) if isinstance(message, str): message = { @@ -198,10 +193,10 @@ def send_message(self, channel, message): msg_object = dumps(message, skipkeys=True) - full_message = '{} {}'.format(channel, msg_object) + full_message = '{} {}'.format(topic, msg_object) - if channel == 'PANCHAT': - self.logger.info("{} {}".format(channel, message['message'])) + if topic == 'PANCHAT': + self.logger.info("{} {}".format(topic, message['message'])) # Send the message self.socket.send_string(full_message, flags=zmq.NOBLOCK) @@ -216,9 +211,9 @@ def receive_message(self, blocking=True, flags=0): flag (int, optional): Any valid recv flag, e.g. zmq.NOBLOCK Returns: - tuple(str, dict): Tuple containing the channel and a dict + tuple(str, dict): Tuple containing the topic and a dict """ - msg_type = None + topic = None msg_obj = None if not blocking: flags = flags | zmq.NOBLOCK @@ -227,13 +222,13 @@ def receive_message(self, blocking=True, flags=0): except Exception: pass else: - msg_type, msg = message.split(' ', maxsplit=1) + topic, msg = message.split(' ', maxsplit=1) try: msg_obj = loads(msg) except Exception: msg_obj = yaml.load(msg) - return msg_type, msg_obj + return topic, msg_obj def close(self): """Close the socket """ diff --git a/scripts/run_social_messaging.py b/scripts/run_social_messaging.py index 6e94eb8c7..dfb006564 100755 --- a/scripts/run_social_messaging.py +++ b/scripts/run_social_messaging.py @@ -42,7 +42,7 @@ def check_social_messages_loop(msg_port, social_twitter, social_slack): if cmd_social_subscriber.socket in sockets and \ sockets[cmd_social_subscriber.socket] == zmq.POLLIN: - msg_type, msg_obj = cmd_social_subscriber.receive_message(flags=zmq.NOBLOCK) + topic, msg_obj = cmd_social_subscriber.receive_message(flags=zmq.NOBLOCK) # Check the various social sinks if social_twitter is not None: