Skip to content

Commit

Permalink
Merge pull request #322 from NASA-AMMOS/issue-299
Browse files Browse the repository at this point in the history
Issue #299 - Update server 0MQ data passing
  • Loading branch information
MJJoyce authored Mar 10, 2021
2 parents e0d4bee + c9b5af9 commit 57d26d1
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 8 deletions.
21 changes: 17 additions & 4 deletions ait/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import ait
import ait.core
from ait.core import cmd, gds, log, pcap, tlm, util
import ait.core.server.utils as serv_utils

class APIError (Exception):
"""All AIT API exceptions are derived from this class"""
Expand Down Expand Up @@ -205,8 +206,17 @@ def send (self, command, *args, **kwargs):
elif self._pub_socket:
values = (self._pub_topic, str(cmdobj))
log.command('Sending via publisher: %s %s' % values)
self._pub_socket.send_string('{} {}'.format(self._pub_topic, encoded))
status = True
msg = server_utils.encode_message(self._pub_topic, encoded)

if msg is None:
log.error(
"CmdAPI unable to encode cmd message "
f"({self._pub_topic}, {encoded}) for send"
)
status = False
else:
self._pub_socket.send_multipart(msg)
status = True

## Only add to history file is success status is True
if status:
Expand Down Expand Up @@ -572,8 +582,11 @@ def _run(self):
try:
while True:
gevent.sleep(0)
topic, message = self._sub.recv_string().split(' ', 1)
message = pickle.loads(eval(message))
msg = self.sub.recv_multipart()
topic, message = serv_utils.decode_message(msg)
if topic is None or message is None:
log.error(f"{self} received invalid topic or message. Skipping")
continue

if not isinstance(message, tuple):
log.error(
Expand Down
16 changes: 13 additions & 3 deletions ait/core/server/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import ait.core
from ait.core import log
import ait.core.server.utils as utils


class ZMQClient(object):
Expand Down Expand Up @@ -35,7 +36,12 @@ def publish(self, msg):
"""
Publishes input message with client name as topic.
"""
self.pub.send_string('{} {}'.format(self.name, msg))
msg = utils.encode_message(self.name, msg)
if msg is None:
log.error(f"{self} unable to encode msg {msg} for send.")
return

self.pub.send_multipart(msg)
log.debug('Published message from {}'.format(self))

def process(self, input_data, topic=None):
Expand Down Expand Up @@ -82,7 +88,12 @@ def _run(self):
try:
while True:
gevent.sleep(0)
topic, message = self.sub.recv_string().split(' ', 1)
msg = self.sub.recv_multipart()
topic, message = utils.decode_message(msg)
if topic is None or message is None:
log.error(f"{self} received invalid topic or message. Skipping")
continue

log.debug('{} received message from {}'.format(self, topic))
self.process(message, topic=topic)

Expand Down Expand Up @@ -114,7 +125,6 @@ def __init__(self,
self.pub = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def publish(self, msg):
msg = eval(msg)
self.pub.sendto(msg, ('localhost', int(self.out_port)))
log.debug('Published message from {}'.format(self))

Expand Down
2 changes: 1 addition & 1 deletion ait/core/server/plugins/data_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def process(self, input_data, topic=None, **kwargs):
**kwargs: any args required for connected to the backend
"""
try:
load = pickle.loads(eval(input_data))
load = pickle.loads(input_data)
uid, pkt = int(load[0]), load[1]
defn = self.packet_dict[uid]
decoded = tlm.Packet(defn, data=bytearray(pkt))
Expand Down
64 changes: 64 additions & 0 deletions ait/core/server/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Advanced Multi-Mission Operations System (AMMOS) Instrument Toolkit (AIT)
# Bespoke Link to Instruments and Small Satellites (BLISS)
#
# Copyright 2021, by the California Institute of Technology. ALL RIGHTS
# RESERVED. United States Government Sponsorship acknowledged. Any
# commercial use must be negotiated with the Office of Technology Transfer
# at the California Institute of Technology.
#
# This software may be subject to U.S. export control laws. By accepting
# this software, the user agrees to comply with all applicable U.S. export
# laws and regulations. User has the responsibility to obtain export licenses,
# or other export authority as may be required before exporting such
# information to foreign countries or providing access to foreign persons.


import pickle


def encode_message(topic, data):
"""Encode a message for sending via 0MQ
Given a string topic name and a pickle-able data object, encode and prep
the data for sending via `send_multipart`
Returns a list of the form:
[
Bytes object of String (UTF-8),
Pickled data object
]
If encoding fails None will be returned.
"""
try:
enc = [bytes(topic, 'utf-8'), pickle.dumps(data)]
except:
enc = None

return enc


def decode_message(msg):
"""Decode a message received via 0MQ
Given a message received from `recv_multipart`, decode the components.
Returns a tuple of the form:
(
UTF-8 string
De-pickled data object
)
If decoding fails a tuple of None objects will be returned.
"""
[topic, message] = msg

try:
tpc = topic.decode('utf-8')
msg = pickle.loads(message)
except:
tpc = None
msg = None

return (tpc, msg)
1 change: 1 addition & 0 deletions doc/source/ait.core.server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Submodules
ait.core.server.plugin
ait.core.server.server
ait.core.server.stream
ait.core.server.utils

Module contents
---------------
Expand Down
7 changes: 7 additions & 0 deletions doc/source/ait.core.server.utils.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ait.core.server.utils module
============================

.. automodule:: ait.core.server.utils
:members:
:undoc-members:
:show-inheritance:

0 comments on commit 57d26d1

Please sign in to comment.