Skip to content

Commit

Permalink
Issue #107 - Changed playback to use existing socket instead of forci…
Browse files Browse the repository at this point in the history
…ng connections to open and close
  • Loading branch information
seanlu99 committed Jul 23, 2019
1 parent 2e944b3 commit f560e82
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 116 deletions.
159 changes: 66 additions & 93 deletions ait/gui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,50 @@ def remove (self, session):
del self[session.id]


class Playback(object):
"""Playback
A Playback manages the state for the playback component.
playback.dbconn: connection to database
playback.query: time query map of {timestamp: list of (uid, data)} from database
playback.on: true if gui is currently in playback mode
"""

def __init__(self):
"""Creates a new Playback"""
self._db_connect()
self.query = {}
self.on = False

def _db_connect(self):
"""Connect to database"""

# Get datastore from config
plugins = ait.config.get('server.plugins')
datastore = ''
other_args = {}
for i in range(len(plugins)):
if plugins[i]['plugin']['name'] == 'ait.core.server.plugin.DataArchive':
datastore = plugins[i]['plugin']['datastore']
other_args = copy.deepcopy(plugins[i]['plugin'])
other_args.pop('name')
other_args.pop('inputs', None)
other_args.pop('outputs', None)
other_args.pop('datastore', None)
break
mod, cls = datastore.rsplit('.', 1)

# Connect to database
self.dbconn = getattr(importlib.import_module(mod), cls)()
self.dbconn.connect(**other_args)

def reset(self):
"""Reset fields"""
self.query.clear()
self.on = False


Sessions = SessionStore()
playback = Playback()

_RUNNING_SCRIPT = None
_RUNNING_SEQ = None
Expand Down Expand Up @@ -162,6 +205,7 @@ class HTMLRoot:


class AITGUIPlugin(Plugin):
global playback

def __init__(self, inputs, outputs, zmq_args=None, **kwargs):
super(AITGUIPlugin, self).__init__(inputs, outputs, zmq_args, **kwargs)
Expand Down Expand Up @@ -209,7 +253,8 @@ def process(self, input_data, topic=None):

def process_telem_msg(self, msg):
msg = pickle.loads(msg)
Sessions.addTelemetry(msg[0], msg[1])
if playback.on == False:
Sessions.addTelemetry(msg[0], msg[1])

def process_log_msg(self, msg):
parsed = log.parseSyslog(msg)
Expand Down Expand Up @@ -625,27 +670,22 @@ def handle():
pass


# Global variable for realtime socket
global realtime_wsock


@App.route('/tlm/realtime')
def handle():
"""Return telemetry packets in realtime to client"""
global realtime_wsock
with Sessions.current() as session:
# A null-byte pad ensures wsock is treated as binary.
pad = bytearray(1)
realtime_wsock = bottle.request.environ.get('wsgi.websocket')
wsock = bottle.request.environ.get('wsgi.websocket')

if not realtime_wsock:
if not wsock:
bottle.abort(400, 'Expected WebSocket request.')

try:
while not realtime_wsock.closed:
while not wsock.closed:
try:
uid, data = session.telemetry.popleft(timeout=30)
realtime_wsock.send(pad + struct.pack('>I', uid) + data)
wsock.send(pad + struct.pack('>I', uid) + data)
except IndexError:
# If no telemetry has been received by the GUI
# server after timeout seconds, "probe" the client
Expand All @@ -656,8 +696,8 @@ def handle():
# data is ignored by AIT GUI client-side
# Javascript code.

if not realtime_wsock.closed:
realtime_wsock.send(pad + struct.pack('>I', 0))
if not wsock.closed:
wsock.send(pad + struct.pack('>I', 0))
except geventwebsocket.WebSocketError:
pass

Expand Down Expand Up @@ -993,61 +1033,6 @@ def handle():
PromptResponse = json.loads(bottle.request.body.read())


class Playback(object):
"""Playback
A Playback manages the state for the playback component.
playback.dbconn: connection to database
playback.query: time query map of {timestamp: data} from database
playback.queue: deque of timestamps to be sent to the socket
playback.wsock: connection to socket
"""

def __init__(self):
"""Creates a new Playback"""
self._db_connect()
self.query = {}
self.queue = api.GeventDeque()
self.wsock = None

def _db_connect(self):
"""Connect to database"""

# Get datastore from config
plugins = ait.config.get('server.plugins')
datastore = ''
other_args = {}
for i in range(len(plugins)):
if plugins[i]['plugin']['name'] == 'ait.core.server.plugin.DataArchive':
datastore = plugins[i]['plugin']['datastore']
other_args = copy.deepcopy(plugins[i]['plugin'])
other_args.pop('name')
other_args.pop('inputs', None)
other_args.pop('outputs', None)
other_args.pop('datastore', None)
break
mod, cls = datastore.rsplit('.', 1)

# Connect to database
self.dbconn = getattr(importlib.import_module(mod), cls)()
self.dbconn.connect(**other_args)

def wsock_connect(self):
"""Connect to socket"""
self.wsock = bottle.request.environ.get('wsgi.websocket')
if not self.wsock:
bottle.abort(400, 'Expected WebSocket request.')

def reset(self):
"""Reset fields"""
self.query.clear()
self.queue.clear()
playback.wsock.closed = True


# Global playback variable
playback = Playback()


@App.route('/playback/range', method='GET')
def handle():
"""Return a JSON array of [packet_name, start_time, end_time] to represent the time range
Expand Down Expand Up @@ -1109,48 +1094,36 @@ def handle():
field_names = []
for i in range(len(fields)):
field_names.append(fields[i].name)
# Put query into a map of {timestamp: data}
# Put query into a map of {timestamp: list of (uid, data)}
for i in range(len(points)):
# Round time down to nearest 0.1 second
timestamp = str(points[i]['time'][:21] + 'Z')
data = bytearray(1) + struct.pack('>I', uid)
data = ''
for j in range(len(field_names)):
data += struct.pack('>H', points[i][field_names[j]])
if playback.query.has_key(timestamp):
playback.query[timestamp].append(data)
playback.query[timestamp].append((uid, data))
else:
playback.query[timestamp] = [data]
playback.query[timestamp] = [(uid, data)]


@App.route('/playback/send', method='POST')
@App.route('/playback/on', method='PUT')
def handle():
"""Send timestamp to be put into playback queue if in database"""
"""Indicate that playback is on"""
global playback
timestamp = bottle.request.forms.get('timestamp')
if playback.query.has_key(timestamp):
playback.queue.append(timestamp)
playback.on = True


@App.route('/playback/playback')
@App.route('/playback/send', method='POST')
def handle():
"""Return historical telemetry packets as if it were realtime to client"""
"""Send timestamp to be put into playback queue if in database"""
global playback
global realtime_wsock

# Close realtime socket
realtime_wsock.closed = True
# Open playback socket
playback.wsock_connect()
timestamp = bottle.request.forms.get('timestamp')

# Send data from queue to socket
try:
while not playback.wsock.closed:
timestamp = playback.queue.popleft()
data_list = playback.query[timestamp]
for i in range(len(data_list)):
playback.wsock.send(data_list[i])
except geventwebsocket.WebSocketError:
pass
if playback.query.has_key(timestamp):
query_list = playback.query[timestamp]
for i in range(len(query_list)):
Sessions.addTelemetry(query_list[i][0], query_list[i][1])


@App.route('/playback/abort', method='PUT')
Expand Down
2 changes: 1 addition & 1 deletion ait/gui/static/index.css
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@import "./css/ait/gui/Command.css";
@import "./css/ait/gui/Field.css";
@import "./css/ait/gui/Messages.css";
@import "css/ait/gui/Playback.css";
@import "./css/ait/gui/Playback.css";
@import "./css/ait/gui/Plot.css";
@import "./css/ait/gui/Query.css";
@import "./css/ait/gui/Script.css";
Expand Down
26 changes: 4 additions & 22 deletions ait/gui/static/js/ait/gui/Playback.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,10 @@ const Playback = {
// Run when play button clicked for first time
if (this._first_click) {
// Emit event that playback is on
ait.events.emit('ait:playback:on')

// Start endpoint on backend to playback historical packets received
ait.tlm = {dict: {}}
ait.tlm.promise = m.request({ url: '/tlm/dict' })
ait.tlm.promise.then((dict) => {
const proto = location.protocol === 'https:' ? 'wss' : 'ws'
const url = proto + '://' + location.host + '/playback/playback'

ait.tlm.dict = TelemetryDictionary.parse(dict)
ait.tlm.stream = new TelemetryStream(url, ait.tlm.dict)
ait.events.emit('ait:playback:on')
m.request({
url: '/playback/on',
method: 'PUT'
})

this._first_click = false
Expand Down Expand Up @@ -216,17 +209,6 @@ const Playback = {
url: '/playback/abort',
method: 'PUT'
})

// Restart endpoints on backend to play realtime packets received
ait.tlm = {dict: {}}
ait.tlm.promise = m.request({url: '/tlm/dict'})
ait.tlm.promise.then((dict) => {
const proto = location.protocol === 'https:' ? 'wss' : 'ws'
const url = proto + '://' + location.host + '/tlm/realtime'

ait.tlm.dict = TelemetryDictionary.parse(dict)
ait.tlm.stream = new TelemetryStream(url, ait.tlm.dict)
})
}
},
}, 'Abort')
Expand Down

0 comments on commit f560e82

Please sign in to comment.