Skip to content

Commit

Permalink
Issue #75 - Telemetry processing updates and cleanup
Browse files Browse the repository at this point in the history
Update telemetry processing function to expect a pickled tuple instead
of just a string. The nonserialized tuple being received before was
interfering with the raw telemetry format and resulting in processing
issues.

The gui.html.directory configuration attribute can now be passed into
the GUI plugin via the optional / additional plugin config attribute
html.directory. Moved bottle.TEMPLATE_PATH update into __init__ after
the html.directory attribute has been processed.

Dropped bottle.debug(True). If we need it for debugging we can add it
back in but it really shouldn't be there by default.

Dropped the non-plugin telemetry monitoring and data archiving stuff.
  • Loading branch information
MJJoyce committed Mar 21, 2019
1 parent 6a71566 commit 2524008
Showing 1 changed file with 11 additions and 146 deletions.
157 changes: 11 additions & 146 deletions ait/gui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import bdb
from collections import defaultdict
import cPickle as pickle
import importlib
import json
import os
Expand Down Expand Up @@ -121,7 +122,6 @@ def remove (self, session):

Sessions = SessionStore()


_RUNNING_SCRIPT = None
_RUNNING_SEQ = None
CMD_API = ait.core.api.CmdAPI(ait.config.get('command.port', ait.DEFAULT_CMD_PORT))
Expand All @@ -147,8 +147,6 @@ class HTMLRoot:
Servers = [ ]
Greenlets = []

bottle.debug(True)
bottle.TEMPLATE_PATH.append(HTMLRoot.User)

try:
with open(os.path.join(HTMLRoot.Static, 'package.json')) as infile:
Expand All @@ -160,11 +158,19 @@ class HTMLRoot:
log.warn('Unable to determine which AIT GUI Version is running')



class AITGUIPlugin(Plugin):

def __init__(self, inputs, outputs, zmq_args=None, **kwargs):
super(AITGUIPlugin, self).__init__(inputs, outputs, zmq_args, **kwargs)

try:
HTMLRoot.User = kwargs['html']['directory']
except:
pass

bottle.TEMPLATE_PATH.append(HTMLRoot.User)

gevent.spawn(self.init)

def process(self, input_data, topic=None):
Expand All @@ -178,8 +184,8 @@ def process(self, input_data, topic=None):
self.process_log_msg(input_data)

def process_telem_msg(self, msg):
split = re.split(r'\((\d),(\'.*\')\)', msg)
Sessions.addTelemetry(int(split[1]), split[2])
msg = pickle.loads(msg)
Sessions.addTelemetry(msg[0], msg[1])

def process_log_msg(self, msg):
parsed = log.parseSyslog(msg)
Expand Down Expand Up @@ -232,45 +238,6 @@ def handle(pathname):

streams = ait.config.get('gui.telemetry')

# if streams is None:
# msg = cfg.AitConfigMissing('gui.telemetry').args[0]
# msg += ' No telemetry will be received (or displayed).'
# log.error(msg)
# else:
# nstreams = 0

# for index, s in enumerate(streams):
# param = 'gui.telemetry[%d].stream' % index
# stream = cfg.AitConfig(config=s).get('stream')

# if stream is None:
# msg = cfg.AitConfigMissing(param).args[0]
# log.warn(msg + ' Skipping stream.')
# continue

# name = stream.get('name', '<unnamed>')
# type = stream.get('type', 'raw').lower()
# tport = stream.get('port', None)

# if tport is None:
# msg = cfg.AitConfigMissing(param + '.port').args[0]
# log.warn(msg + ' Skipping stream.')
# continue

# if type == 'ccsds':
# # Servers.append( UdpCcsdsTelemetryServer(tport) )
# nstreams += 1
# else:
# defn = tlm.getDefaultDict().get(name, None)

# if defn is None:
# values = (name, param)
# msg = 'Packet name "%s" not found (%s.name).' % values
# log.warn(msg + ' Skipping stream.')
# continue

# nstreams += 1

if streams and nstreams == 0:
msg = 'No valid telemetry stream configurations found.'
msg += ' No telemetry will be received (or displayed).'
Expand Down Expand Up @@ -328,108 +295,6 @@ def wait(self):
else:
gevent.wait()

def enable_monitoring(self):
def telem_handler(session):
limit_dict = defaultdict(dict)
for k, v in limits.getDefaultDict().iteritems():
packet, field = k.split('.')
limit_dict[packet][field] = v

packet_dict = defaultdict(dict)
for k, v in tlm.getDefaultDict().iteritems():
packet_dict[v.uid] = v

notif_thrshld = ait.config.get('notifications.options.threshold', 1)
notif_freq = ait.config.get('notifications.options.frequency', float('inf'))

log.info('Starting telemetry limit monitoring')
try:
limit_trip_repeats = {}
while True:
if len(session.telemetry) > 0:
p = session.telemetry.popleft()
packet = packet_dict[p[0]]
decoded = tlm.Packet(packet, data=bytearray(p[1]))

if packet.name in limit_dict:
for field, defn in limit_dict[packet.name].iteritems():
v = decoded._getattr(field)

if packet.name not in limit_trip_repeats.keys():
limit_trip_repeats[packet.name] = {}

if field not in limit_trip_repeats[packet.name].keys():
limit_trip_repeats[packet.name][field] = 0

if defn.error(v):
msg = 'Field {} error out of limit with value {}'.format(field, v)
log.error(msg)

limit_trip_repeats[packet.name][field] += 1
repeats = limit_trip_repeats[packet.name][field]

if (repeats == notif_thrshld or
(repeats > notif_thrshld and
(repeats - notif_thrshld) % notif_freq == 0)):
notify.trigger_notification('limit-error', msg)

elif defn.warn(v):
msg = 'Field {} warning out of limit with value {}'.format(field, v)
log.warn(msg)

limit_trip_repeats[packet.name][field] += 1
repeats = limit_trip_repeats[packet.name][field]

if (repeats == notif_thrshld or
(repeats > notif_thrshld and
(repeats - notif_thrshld) % notif_freq == 0)):
notify.trigger_notification('limit-warn', msg)

else:
limit_trip_repeats[packet.name][field] = 0

gevent.sleep(0)
finally:
log.info('Telemetry limit monitoring terminated')

s = ait.gui.Sessions.create()
telem_handler = gevent.util.wrap_errors(KeyboardInterrupt, telem_handler)
Greenlets.append(gevent.spawn(telem_handler, s))

def enable_data_archiving(self, datastore='ait.core.db.InfluxDBBackend', **kwargs):
packet_dict = defaultdict(dict)
for k, v in tlm.getDefaultDict().iteritems():
packet_dict[v.uid] = v

try:
mod, cls = datastore.rsplit('.', 1)
dbconn = getattr(importlib.import_module(mod), cls)()
dbconn.connect(**kwargs)
except ImportError:
log.error("Could not import specified datastore {}".format(datastore))
return
except Exception as e:
log.error("Unable to connect to InfluxDB backend. Disabling data archive ...")
return

def data_archiver(session):
try:
log.info('Starting telemetry data archiving')
while True:
if len(session.telemetry) > 0:
p = session.telemetry.popleft()
packet = packet_dict[p[0]]
decoded = tlm.Packet(packet, data=bytearray(p[1]))
dbconn.insert(decoded, **kwargs)

gevent.sleep(0)
finally:
dbconn.close()
log.info('Telemetry data archiving terminated')

s = ait.gui.Sessions.create()
data_archiver = gevent.util.wrap_errors(KeyboardInterrupt, data_archiver)
Greenlets.append(gevent.spawn(data_archiver, s))

def send(self, command, *args, **kwargs):
"""Creates, validates, and sends the given command as a UDP
Expand Down

0 comments on commit 2524008

Please sign in to comment.