Skip to content

Commit

Permalink
Ensure that channel names don't have spaces (#593)
Browse files Browse the repository at this point in the history
Throws an exception if a channel name doesn't match
a regular expression, one which doesn't include spaces.
Fixes issue #205.

Greatly expand the class level documentation in PanMessaging.

Enforce that the value of a message is either a string
or a dict (that was already effectively required, but
now the failure message will be clear).
  • Loading branch information
jamessynge authored Sep 16, 2018
1 parent ea0c282 commit 3b3bfc6
Showing 1 changed file with 86 additions and 15 deletions.
101 changes: 86 additions & 15 deletions pocs/utils/messaging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import re
import zmq

import yaml
Expand All @@ -14,14 +15,79 @@


class PanMessaging(object):

"""Messaging class for PANOPTES project. Creates a new ZMQ
context that can be shared across parent application.
"""Provides messaging services within a PANOPTES robotic telescope.
Supports broadcasting messages from publishers (e.g. a POCS or
ArduinoIO class instance) to subscribers (also typically class
instances). The publishers and subscribers may be in the same
process, or in separate processes. The messages all go through
a message forwarder; this is a process which listens for messages
from all publishers on one TCP port and forwards each message to
all subscribers that are connected to a second TCP port.
Do not create PanMessaging instances directly. Publishers should
call PanMessaging.create_publisher to create an instance of
PanMessaging, on which they can then call send_message.
Subscribers should call PanMessaging.create_subscriber to create
an instance of PanMessaging, on which they can then call
receive_message.
Messages are sent to channels, a name that can be used to allow
a high-level partitioning of messages. A channel name may not
include whitespace. Among the currently used channel names are:
* PANCHAT (sent from POCS.say)
* PAWS-CMD (sent from PAWS websockets.py)
* POCS (sent by class POCS)
* POCS-CMD (sent by class POCS)
* STATUS (sent by class POCS)
* weather (from peas/sensors.py)
* environment (from peas/sensors.py)
* telemetry:commands (in ArduinoIO... new)
* camera:commands (in ArduinoIO... new)
And some other channels are used in tests:
* TEST-CHANNEL (test_messaging.py)
* RUNNING (test_pocs.py)
* POCS-CMD (test_pocs.py)
The method receive_message will return messages from all channels;
the caller must check the returned channel name to determine if
the message value is of interest.
Note: PAWS doesn't use PanMessaging, which will likely result in
problems as we evolve PanMessaging and the set of channels.
TODO: Figure out how to share PanMessaging with PAWS.
Note: there is some inconsistency in the code. Senders refer to
the channel of a message, but receivers refer to messages as having
a msg_type.
TODO: Make this more consistent.
The value of a message being sent may be a string (in which case it
is wrapped in a dict(message=<value>, timestamp=<now>) or a dict,
in which case it will be "scrubbed", i.e. the dict entries will be
modified as necessary to so that the dict can be serialized using
json.dumps.
TODO Pick an encoding of strings (e.g. UTF-8) so that non-ASCII
strings may be sent and received without corruption of the data
or exceptions being thrown.
ZeroMQ is used to provide the underlying pub-sub support. ZeroMQ
supports only a very basic message format: an array of bytes.
PanMessaging converts the provided message channel and value into
a byte array of this format:
<channel-name><space><serialized-value>
"""
logger = get_root_logger()

# Channel names must consist of the characters.
name_re = re.compile('[a-zA-Z][-a-zA-Z0-9_.:]*')

def __init__(self, **kwargs):
"""Do not call this directly."""
# Create a new context
self.context = zmq.Context()
self.socket = None
Expand Down Expand Up @@ -111,20 +177,24 @@ def send_message(self, channel, message):
""" Responsible for actually sending message across a channel
Args:
channel(str): Name of channel to send on.
message(str): Message to be sent.
channel(str): Name of channel to send on. The name must
match name_re.
message: Message to be sent (a string or a dict).
"""
assert channel > '', self.logger.warning("Cannot send blank channel")
if not isinstance(channel, str):
raise ValueError('Channel name must be a string')
elif not self.name_re.fullmatch(channel):
raise ValueError('Channel name ("{}") is not valid'.format(channel))

if isinstance(message, str):
message = {
'message': message,
'timestamp': current_time().isot.replace(
'T',
' ').split('.')[0]}
else:
'timestamp': current_time(pretty=True),
}
elif isinstance(message, dict):
message = self.scrub_message(message)
else:
raise ValueError('Message value must be a string or dict')

msg_object = dumps(message, skipkeys=True)

Expand Down Expand Up @@ -171,6 +241,7 @@ def close(self):
self.context.term()

def scrub_message(self, message):
result = {}

for k, v in message.items():
if isinstance(v, dict):
Expand All @@ -188,13 +259,13 @@ def scrub_message(self, message):
if isinstance(v, Time):
v = str(v.isot).split('.')[0].replace('T', ' ')

# Hmmmm
# Hmmmm. What is going on here? We need some documentation.
if k.endswith('_time'):
v = str(v).split(' ')[-1]

if isinstance(v, float):
v = round(v, 3)

message[k] = v
result[k] = v

return message
return result

0 comments on commit 3b3bfc6

Please sign in to comment.