From e659a4b2f3818d1f9bebf5ed4a4229a9cb2d9bae Mon Sep 17 00:00:00 2001 From: Michael Joyce Date: Mon, 16 Jul 2018 12:57:54 -0700 Subject: [PATCH] Issue #79 and #80 - Data archiving and better interrupt handling (#81) * Issue #79 and #80 - Data archiving and better interrupt handling Add optional data archiving support. Users can enable data archiving via ait.gui.enable_data_archiving() prior to calling ait.gui.wait(). By default InfluxDB will be used but an optional argument can be set when calling enable_data_archiving() to choose a different class. Update greenlet handling for monitoring and data archive tasks so that exits via Ctrl-C are cleaner. Greenlet functions are now wrapped in gevent.util.wrap_errors so that KeyboardInterrupt exceptions are returned instead of raised. This allows us to avoid stack traces being printed when killing the GUI. Greenlet tracking has been changed from individual global variables to a list of greenlet instances. Clean up code has been updated with this change as well. The ait.gui.wait function has been updated to use gevent.joinall if one or more of the optional services (data archiving or telemetry monitoring at the moment) are in use. This gives us a bit more control over how we handle greenlets that have returned and how exceptions are raised. If the optional services aren't being used we default to gevent.wait instead. * - Fix except block --- ait/gui/__init__.py | 103 +++++++++++++++++++++++++++++++------------- 1 file changed, 72 insertions(+), 31 deletions(-) diff --git a/ait/gui/__init__.py b/ait/gui/__init__.py index ef363caa..f38ecf19 100644 --- a/ait/gui/__init__.py +++ b/ait/gui/__init__.py @@ -39,6 +39,7 @@ import gevent import gevent.event +import gevent.util import gevent.lock import gevent.monkey; gevent.monkey.patch_all() import geventwebsocket @@ -47,6 +48,7 @@ from collections import defaultdict import copy import datetime +import importlib import json import os import socket @@ -62,7 +64,7 @@ import ait.core -from ait.core import api, ccsds, cfg, cmd, dmc, evr, gds, limits, log, notify, pcap, tlm +from ait.core import api, ccsds, cfg, cmd, db, dmc, evr, gds, limits, log, notify, pcap, tlm from ait.core import util @@ -89,7 +91,7 @@ class HTMLRoot: App = bottle.Bottle() Servers = [ ] -Monitor_Greenlet = None +Greenlets = [] bottle.debug(True) bottle.TEMPLATE_PATH.append(HTMLRoot.User) @@ -340,7 +342,7 @@ def cleanup(): for s in Servers: s.stop() - cleanup_monitoring() + gevent.killall(Greenlets) def startBrowser(url, name=None): @@ -367,12 +369,16 @@ def startBrowser(url, name=None): def wait(): - gevent.wait() + if len(Greenlets) > 0: + done = gevent.joinall(Greenlets, raise_error=True, count=1) + for d in done: + if issubclass(type(d.value), KeyboardInterrupt): + raise d.value + else: + gevent.wait() def enable_monitoring(): - global Monitor_Greenlet - def telem_handler(session): limit_dict = defaultdict(dict) for k, v in limits.getDefaultDict().iteritems(): @@ -383,36 +389,71 @@ def telem_handler(session): for k, v in tlm.getDefaultDict().iteritems(): packet_dict[v.uid] = v - while True: - if len(session.telemetry) > 0: - p = session.telemetry.popleft() - packet = packet_dict[p[0]] - decoded = tlm.Packet(packet, data=bytearray(p[1])) - - if packet.name in limit_dict: - for field, defn in limit_dict[packet.name].iteritems(): - v = decoded._getattr(field) - if defn.error(v): - msg = 'Field {} error out of limit at {} with value {}'.format( - field, datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ"), v) - log.error(msg) - notify.trigger_notification('limit-error', msg) - elif defn.warn(v): - msg = 'Field {} warning out of limit at {} with value {}'.format( - field, datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ"), v) - log.warn(msg) - notify.trigger_notification('limit-warn', msg) - - gevent.sleep(0) + log.info('Starting telemetry limit monitoring') + try: + while True: + if len(session.telemetry) > 0: + p = session.telemetry.popleft() + packet = packet_dict[p[0]] + decoded = tlm.Packet(packet, data=bytearray(p[1])) + + if packet.name in limit_dict: + for field, defn in limit_dict[packet.name].iteritems(): + v = decoded._getattr(field) + if defn.error(v): + msg = 'Field {} error out of limit with value {}'.format(field, v) + log.error(msg) + notify.trigger_notification('limit-error', msg) + raise gevent.GreenletExit() + elif defn.warn(v): + msg = 'Field {} warning out of limit with value {}'.format(field, v) + log.warn(msg) + notify.trigger_notification('limit-warn', msg) + raise gevent.GreenletExit() + + gevent.sleep(0) + finally: + log.info('Telemetry limit monitoring terminated') s = ait.gui.Sessions.create() - Monitor_Greenlet = gevent.spawn(telem_handler, s) + telem_handler = gevent.util.wrap_errors(KeyboardInterrupt, telem_handler) + Greenlets.append(gevent.spawn(telem_handler, s)) + +def enable_data_archiving(datastore='ait.core.db.InfluxDBBackend', **kwargs): + packet_dict = defaultdict(dict) + for k, v in tlm.getDefaultDict().iteritems(): + packet_dict[v.uid] = v -def cleanup_monitoring(): - global Monitor_Greenlet + try: + mod, cls = datastore.rsplit('.', 1) + dbconn = getattr(importlib.import_module(mod), cls)() + dbconn.connect(**kwargs) + except ImportError: + log.error("Could not import specified datastore {}".format(datastore)) + return + except Exception as e: + log.error("Unable to connect to InfluxDB backend. Disabling data archive ...") + return - if Monitor_Greenlet: Monitor_Greenlet.kill() + def data_archiver(session): + try: + log.info('Starting telemetry data archiving') + while True: + if len(session.telemetry) > 0: + p = session.telemetry.popleft() + packet = packet_dict[p[0]] + decoded = tlm.Packet(packet, data=bytearray(p[1])) + dbconn.insert(decoded, **kwargs) + + gevent.sleep(0) + finally: + dbconn.close() + log.info('Telemetry data archiving terminated') + + s = ait.gui.Sessions.create() + data_archiver = gevent.util.wrap_errors(KeyboardInterrupt, data_archiver) + Greenlets.append(gevent.spawn(data_archiver, s)) Sessions = SessionStore()