Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that channel names don't have spaces #593

Merged
merged 3 commits into from
Sep 16, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 86 additions & 15 deletions pocs/utils/messaging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import re
import zmq

import yaml
Expand All @@ -14,14 +15,79 @@


class PanMessaging(object):

"""Messaging class for PANOPTES project. Creates a new ZMQ
context that can be shared across parent application.

"""Provides messaging services within a PANOPTES robotic telescope.

Supports broadcasting messages from publishers (e.g. a POCS or
ArduinoIO class instance) to subscribers (also typically class
instances). The publishers and subscribers may be in the same
process, or in separate processes. The messages all go through
a message forwarder; this is a process which listens for messages
from all publishers on one TCP port and forwards each message to
all subscribers that are connected to a second TCP port.

Do not create PanMessaging instances directly. Publishers should
call PanMessaging.create_publisher to create an instance of
PanMessaging, on which they can then call send_message.
Subscribers should call PanMessaging.create_subscriber to create
an instance of PanMessaging, on which they can then call
receive_message.

Messages are sent to channels, a name that can be used to allow
a high-level partitioning of messages. A channel name may not
include whitespace. Among the currently used channel names are:

* PANCHAT (sent from POCS.say)
jamessynge marked this conversation as resolved.
Show resolved Hide resolved
* PAWS-CMD (sent from PAWS websockets.py)
* POCS (sent by class POCS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall what this is used for.

* POCS-CMD (sent by class POCS)
* STATUS (sent by class POCS)
* weather (from peas/sensors.py)
* environment (from peas/sensors.py)
* telemetry:commands (in ArduinoIO... new)
* camera:commands (in ArduinoIO... new)

And some other channels are used in tests:

* TEST-CHANNEL (test_messaging.py)
* RUNNING (test_pocs.py)
* POCS-CMD (test_pocs.py)

The method receive_message will return messages from all channels;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is true, the intended usage is to pass the channel to the create_subcriber and then no filtering is needed by the user. This should be the recommended behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFTER this PR, we should consider using the name topic instead of channel or msg_type, which is the term used by zeromq. It'll make it easier for developers that are trying to understand this subsystem.

the caller must check the returned channel name to determine if
the message value is of interest.

Note: PAWS doesn't use PanMessaging, which will likely result in
problems as we evolve PanMessaging and the set of channels.
TODO: Figure out how to share PanMessaging with PAWS.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the answer is that we will be changing PAWS.


Note: there is some inconsistency in the code. Senders refer to
the channel of a message, but receivers refer to messages as having
a msg_type.
TODO: Make this more consistent.

The value of a message being sent may be a string (in which case it
is wrapped in a dict(message=<value>, timestamp=<now>) or a dict,
in which case it will be "scrubbed", i.e. the dict entries will be
modified as necessary to so that the dict can be serialized using
json.dumps.

TODO Pick an encoding of strings (e.g. UTF-8) so that non-ASCII
strings may be sent and received without corruption of the data
or exceptions being thrown.

ZeroMQ is used to provide the underlying pub-sub support. ZeroMQ
supports only a very basic message format: an array of bytes.
PanMessaging converts the provided message channel and value into
a byte array of this format:
<channel-name><space><serialized-value>
"""
logger = get_root_logger()

# Channel names must consist of the characters.
name_re = re.compile('[a-zA-Z][-a-zA-Z0-9_.:]*')

def __init__(self, **kwargs):
"""Do not call this directly."""
# Create a new context
self.context = zmq.Context()
self.socket = None
Expand Down Expand Up @@ -111,20 +177,24 @@ def send_message(self, channel, message):
""" Responsible for actually sending message across a channel

Args:
channel(str): Name of channel to send on.
message(str): Message to be sent.

channel(str): Name of channel to send on. The name must
match name_re.
message: Message to be sent (a string or a dict).
"""
assert channel > '', self.logger.warning("Cannot send blank channel")
if not isinstance(channel, str):
raise ValueError('Channel name must be a string')
elif not self.name_re.fullmatch(channel):
raise ValueError('Channel name ("{}") is not valid'.format(channel))

if isinstance(message, str):
message = {
'message': message,
'timestamp': current_time().isot.replace(
'T',
' ').split('.')[0]}
else:
'timestamp': current_time(pretty=True),
}
elif isinstance(message, dict):
message = self.scrub_message(message)
else:
raise ValueError('Message value must be a string or dict')

msg_object = dumps(message, skipkeys=True)

Expand Down Expand Up @@ -171,6 +241,7 @@ def close(self):
self.context.term()

def scrub_message(self, message):
result = {}

for k, v in message.items():
if isinstance(v, dict):
Expand All @@ -188,13 +259,13 @@ def scrub_message(self, message):
if isinstance(v, Time):
v = str(v.isot).split('.')[0].replace('T', ' ')

# Hmmmm
# Hmmmm. What is going on here? We need some documentation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I can't remember what case this was handling. I'll try to find out.

if k.endswith('_time'):
v = str(v).split(' ')[-1]

if isinstance(v, float):
v = round(v, 3)

message[k] = v
result[k] = v

return message
return result