Skip to content

Commit

Permalink
Issue #79 and #80 - Data archiving and better interrupt handling (#81)
Browse files Browse the repository at this point in the history
* Issue #79 and #80 - Data archiving and better interrupt handling

Add optional data archiving support. Users can enable data archiving via
ait.gui.enable_data_archiving() prior to calling ait.gui.wait(). By
default InfluxDB will be used but an optional argument can be set when
calling enable_data_archiving() to choose a different class.

Update greenlet handling for monitoring and data archive tasks so that
exits via Ctrl-C are cleaner. Greenlet functions are now wrapped in
gevent.util.wrap_errors so that KeyboardInterrupt exceptions are
returned instead of raised. This allows us to avoid stack traces being
printed when killing the GUI.

Greenlet tracking has been changed from individual global variables to a
list of greenlet instances. Clean up code has been updated with this
change as well.

The ait.gui.wait function has been updated to use gevent.joinall if one
or more of the optional services (data archiving or telemetry monitoring
at the moment) are in use. This gives us a bit more control over how we
handle greenlets that have returned and how exceptions are raised. If
the optional services aren't being used we default to gevent.wait
instead.

* - Fix except block
  • Loading branch information
MJJoyce authored and lorsposto committed Jul 16, 2018
1 parent de220aa commit e659a4b
Showing 1 changed file with 72 additions and 31 deletions.
103 changes: 72 additions & 31 deletions ait/gui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import gevent
import gevent.event
import gevent.util
import gevent.lock
import gevent.monkey; gevent.monkey.patch_all()
import geventwebsocket
Expand All @@ -47,6 +48,7 @@
from collections import defaultdict
import copy
import datetime
import importlib
import json
import os
import socket
Expand All @@ -62,7 +64,7 @@

import ait.core

from ait.core import api, ccsds, cfg, cmd, dmc, evr, gds, limits, log, notify, pcap, tlm
from ait.core import api, ccsds, cfg, cmd, db, dmc, evr, gds, limits, log, notify, pcap, tlm
from ait.core import util


Expand All @@ -89,7 +91,7 @@ class HTMLRoot:

App = bottle.Bottle()
Servers = [ ]
Monitor_Greenlet = None
Greenlets = []

bottle.debug(True)
bottle.TEMPLATE_PATH.append(HTMLRoot.User)
Expand Down Expand Up @@ -340,7 +342,7 @@ def cleanup():
for s in Servers:
s.stop()

cleanup_monitoring()
gevent.killall(Greenlets)


def startBrowser(url, name=None):
Expand All @@ -367,12 +369,16 @@ def startBrowser(url, name=None):


def wait():
gevent.wait()
if len(Greenlets) > 0:
done = gevent.joinall(Greenlets, raise_error=True, count=1)
for d in done:
if issubclass(type(d.value), KeyboardInterrupt):
raise d.value
else:
gevent.wait()


def enable_monitoring():
global Monitor_Greenlet

def telem_handler(session):
limit_dict = defaultdict(dict)
for k, v in limits.getDefaultDict().iteritems():
Expand All @@ -383,36 +389,71 @@ def telem_handler(session):
for k, v in tlm.getDefaultDict().iteritems():
packet_dict[v.uid] = v

while True:
if len(session.telemetry) > 0:
p = session.telemetry.popleft()
packet = packet_dict[p[0]]
decoded = tlm.Packet(packet, data=bytearray(p[1]))

if packet.name in limit_dict:
for field, defn in limit_dict[packet.name].iteritems():
v = decoded._getattr(field)
if defn.error(v):
msg = 'Field {} error out of limit at {} with value {}'.format(
field, datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ"), v)
log.error(msg)
notify.trigger_notification('limit-error', msg)
elif defn.warn(v):
msg = 'Field {} warning out of limit at {} with value {}'.format(
field, datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%SZ"), v)
log.warn(msg)
notify.trigger_notification('limit-warn', msg)

gevent.sleep(0)
log.info('Starting telemetry limit monitoring')
try:
while True:
if len(session.telemetry) > 0:
p = session.telemetry.popleft()
packet = packet_dict[p[0]]
decoded = tlm.Packet(packet, data=bytearray(p[1]))

if packet.name in limit_dict:
for field, defn in limit_dict[packet.name].iteritems():
v = decoded._getattr(field)
if defn.error(v):
msg = 'Field {} error out of limit with value {}'.format(field, v)
log.error(msg)
notify.trigger_notification('limit-error', msg)
raise gevent.GreenletExit()
elif defn.warn(v):
msg = 'Field {} warning out of limit with value {}'.format(field, v)
log.warn(msg)
notify.trigger_notification('limit-warn', msg)
raise gevent.GreenletExit()

gevent.sleep(0)
finally:
log.info('Telemetry limit monitoring terminated')

s = ait.gui.Sessions.create()
Monitor_Greenlet = gevent.spawn(telem_handler, s)
telem_handler = gevent.util.wrap_errors(KeyboardInterrupt, telem_handler)
Greenlets.append(gevent.spawn(telem_handler, s))


def enable_data_archiving(datastore='ait.core.db.InfluxDBBackend', **kwargs):
packet_dict = defaultdict(dict)
for k, v in tlm.getDefaultDict().iteritems():
packet_dict[v.uid] = v

def cleanup_monitoring():
global Monitor_Greenlet
try:
mod, cls = datastore.rsplit('.', 1)
dbconn = getattr(importlib.import_module(mod), cls)()
dbconn.connect(**kwargs)
except ImportError:
log.error("Could not import specified datastore {}".format(datastore))
return
except Exception as e:
log.error("Unable to connect to InfluxDB backend. Disabling data archive ...")
return

if Monitor_Greenlet: Monitor_Greenlet.kill()
def data_archiver(session):
try:
log.info('Starting telemetry data archiving')
while True:
if len(session.telemetry) > 0:
p = session.telemetry.popleft()
packet = packet_dict[p[0]]
decoded = tlm.Packet(packet, data=bytearray(p[1]))
dbconn.insert(decoded, **kwargs)

gevent.sleep(0)
finally:
dbconn.close()
log.info('Telemetry data archiving terminated')

s = ait.gui.Sessions.create()
data_archiver = gevent.util.wrap_errors(KeyboardInterrupt, data_archiver)
Greenlets.append(gevent.spawn(data_archiver, s))


Sessions = SessionStore()
Expand Down

0 comments on commit e659a4b

Please sign in to comment.