From c9b5af9e4755fb1925df2fed5e0d55993bbe4f86 Mon Sep 17 00:00:00 2001 From: Michael Joyce Date: Wed, 24 Feb 2021 16:31:16 -0800 Subject: [PATCH] Issue #299 - Update server 0MQ data passing Update server handling of data passing to use 0MQ's `[send|recv]_multipart` calls instead of `[send|recv]_string`. Utilities have been added under ait.core.server.utils for encoding / decoding messages into the proper format to simplify code throughout. Various hacks for handling the string-ified data have been removed. Resolve #299 --- ait/core/api.py | 21 ++++++-- ait/core/server/client.py | 16 +++++-- ait/core/server/plugins/data_archive.py | 2 +- ait/core/server/utils.py | 64 +++++++++++++++++++++++++ doc/source/ait.core.server.rst | 1 + doc/source/ait.core.server.utils.rst | 7 +++ 6 files changed, 103 insertions(+), 8 deletions(-) create mode 100644 ait/core/server/utils.py create mode 100644 doc/source/ait.core.server.utils.rst diff --git a/ait/core/api.py b/ait/core/api.py index 25ae92a8..08c29e66 100644 --- a/ait/core/api.py +++ b/ait/core/api.py @@ -47,6 +47,7 @@ import ait import ait.core from ait.core import cmd, gds, log, pcap, tlm, util +import ait.core.server.utils as serv_utils class APIError (Exception): """All AIT API exceptions are derived from this class""" @@ -205,8 +206,17 @@ def send (self, command, *args, **kwargs): elif self._pub_socket: values = (self._pub_topic, str(cmdobj)) log.command('Sending via publisher: %s %s' % values) - self._pub_socket.send_string('{} {}'.format(self._pub_topic, encoded)) - status = True + msg = server_utils.encode_message(self._pub_topic, encoded) + + if msg is None: + log.error( + "CmdAPI unable to encode cmd message " + f"({self._pub_topic}, {encoded}) for send" + ) + status = False + else: + self._pub_socket.send_multipart(msg) + status = True ## Only add to history file is success status is True if status: @@ -572,8 +582,11 @@ def _run(self): try: while True: gevent.sleep(0) - topic, message = self._sub.recv_string().split(' ', 1) - message = pickle.loads(eval(message)) + msg = self.sub.recv_multipart() + topic, message = serv_utils.decode_message(msg) + if topic is None or message is None: + log.error(f"{self} received invalid topic or message. Skipping") + continue if not isinstance(message, tuple): log.error( diff --git a/ait/core/server/client.py b/ait/core/server/client.py index b163e788..5dda571a 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -8,6 +8,7 @@ import ait.core from ait.core import log +import ait.core.server.utils as utils class ZMQClient(object): @@ -35,7 +36,12 @@ def publish(self, msg): """ Publishes input message with client name as topic. """ - self.pub.send_string('{} {}'.format(self.name, msg)) + msg = utils.encode_message(self.name, msg) + if msg is None: + log.error(f"{self} unable to encode msg {msg} for send.") + return + + self.pub.send_multipart(msg) log.debug('Published message from {}'.format(self)) def process(self, input_data, topic=None): @@ -82,7 +88,12 @@ def _run(self): try: while True: gevent.sleep(0) - topic, message = self.sub.recv_string().split(' ', 1) + msg = self.sub.recv_multipart() + topic, message = utils.decode_message(msg) + if topic is None or message is None: + log.error(f"{self} received invalid topic or message. Skipping") + continue + log.debug('{} received message from {}'.format(self, topic)) self.process(message, topic=topic) @@ -114,7 +125,6 @@ def __init__(self, self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) def publish(self, msg): - msg = eval(msg) self.pub.sendto(msg, ('localhost', int(self.out_port))) log.debug('Published message from {}'.format(self)) diff --git a/ait/core/server/plugins/data_archive.py b/ait/core/server/plugins/data_archive.py index 3824a8ae..00fff11c 100644 --- a/ait/core/server/plugins/data_archive.py +++ b/ait/core/server/plugins/data_archive.py @@ -75,7 +75,7 @@ def process(self, input_data, topic=None, **kwargs): **kwargs: any args required for connected to the backend """ try: - load = pickle.loads(eval(input_data)) + load = pickle.loads(input_data) uid, pkt = int(load[0]), load[1] defn = self.packet_dict[uid] decoded = tlm.Packet(defn, data=bytearray(pkt)) diff --git a/ait/core/server/utils.py b/ait/core/server/utils.py new file mode 100644 index 00000000..58296147 --- /dev/null +++ b/ait/core/server/utils.py @@ -0,0 +1,64 @@ +# Advanced Multi-Mission Operations System (AMMOS) Instrument Toolkit (AIT) +# Bespoke Link to Instruments and Small Satellites (BLISS) +# +# Copyright 2021, by the California Institute of Technology. ALL RIGHTS +# RESERVED. United States Government Sponsorship acknowledged. Any +# commercial use must be negotiated with the Office of Technology Transfer +# at the California Institute of Technology. +# +# This software may be subject to U.S. export control laws. By accepting +# this software, the user agrees to comply with all applicable U.S. export +# laws and regulations. User has the responsibility to obtain export licenses, +# or other export authority as may be required before exporting such +# information to foreign countries or providing access to foreign persons. + + +import pickle + + +def encode_message(topic, data): + """Encode a message for sending via 0MQ + + Given a string topic name and a pickle-able data object, encode and prep + the data for sending via `send_multipart` + + Returns a list of the form: + [ + Bytes object of String (UTF-8), + Pickled data object + ] + + If encoding fails None will be returned. + + """ + try: + enc = [bytes(topic, 'utf-8'), pickle.dumps(data)] + except: + enc = None + + return enc + + +def decode_message(msg): + """Decode a message received via 0MQ + + Given a message received from `recv_multipart`, decode the components. + + Returns a tuple of the form: + ( + UTF-8 string + De-pickled data object + ) + + If decoding fails a tuple of None objects will be returned. + """ + [topic, message] = msg + + try: + tpc = topic.decode('utf-8') + msg = pickle.loads(message) + except: + tpc = None + msg = None + + return (tpc, msg) diff --git a/doc/source/ait.core.server.rst b/doc/source/ait.core.server.rst index 422275b8..9af8cd46 100644 --- a/doc/source/ait.core.server.rst +++ b/doc/source/ait.core.server.rst @@ -20,6 +20,7 @@ Submodules ait.core.server.plugin ait.core.server.server ait.core.server.stream + ait.core.server.utils Module contents --------------- diff --git a/doc/source/ait.core.server.utils.rst b/doc/source/ait.core.server.utils.rst new file mode 100644 index 00000000..5b969398 --- /dev/null +++ b/doc/source/ait.core.server.utils.rst @@ -0,0 +1,7 @@ +ait.core.server.utils module +============================ + +.. automodule:: ait.core.server.utils + :members: + :undoc-members: + :show-inheritance: