From 02c1037d457ebc21c13d3ae6c7a9b7354fa1285d Mon Sep 17 00:00:00 2001 From: Mike Cousins Date: Fri, 17 Aug 2018 17:41:05 -0400 Subject: [PATCH] feat(app,api): Add opt-in ping/pong monitoring to RPC websocket Closes #2052 --- api/opentrons/server/rpc.py | 32 ++++++++---- api/tests/opentrons/server/test_server.py | 7 +-- .../components/LostConnectionAlert/index.js | 3 +- .../__tests__/health-check.test.js | 8 ++- app/src/health-check/index.js | 2 +- app/src/robot/actions.js | 16 ++++-- app/src/robot/api-client/client.js | 18 +++++-- app/src/robot/reducer/connection.js | 10 +++- app/src/robot/selectors.js | 29 ++++++----- app/src/robot/test/api-client.test.js | 15 +++++- app/src/robot/test/connection-reducer.test.js | 9 ++-- app/src/rpc/client.js | 50 +++++++++++++++---- app/src/rpc/message-types.js | 1 + 13 files changed, 149 insertions(+), 51 deletions(-) diff --git a/api/opentrons/server/rpc.py b/api/opentrons/server/rpc.py index 065463d1ea6..197bc428828 100755 --- a/api/opentrons/server/rpc.py +++ b/api/opentrons/server/rpc.py @@ -22,6 +22,7 @@ NOTIFICATION_MESSAGE = 2 CONTROL_MESSAGE = 3 CALL_NACK_MESSAGE = 4 +PONG_MESSAGE = 5 class Server(object): @@ -147,7 +148,7 @@ def task_done(future): try: log.debug('Sending root info to {0}'.format(client_id)) await client.send_json({ - '$': {'type': CONTROL_MESSAGE}, + '$': {'type': CONTROL_MESSAGE, 'monitor': True}, 'root': self.call_and_serialize(lambda: self.root), 'type': self.call_and_serialize(lambda: type(self.root)) }) @@ -222,17 +223,23 @@ async def process(self, message): try: if message.type == aiohttp.WSMsgType.TEXT: data = json.loads(message.data) - meta = data.pop('$') - token = meta['token'] + meta = data.get('$', {}) + token = meta.get('token') + _id = data.get('id') - # If no id, or id is null/none/undefined assume - # a system call to system instance - if 'id' not in data or data['id'] is None: - data['id'] = id(self.system) + if meta.get('ping'): + return self.send_pong() + + # if id is missing from payload or explicitely set to null, + # use the system object + if _id is None: + _id = id(self.system) try: - _id = data.pop('id', None) - func = self.build_call(_id, **data) + func = self.build_call( + _id=_id, + name=data.get('name'), + args=data.get('args', [])) self.send_ack(token) except Exception as e: log.exception("Excption during rpc.Server.process:") @@ -302,6 +309,13 @@ def send_ack(self, token): } }) + def send_pong(self): + self.send({ + '$': { + 'type': PONG_MESSAGE + } + }) + def send(self, payload): for socket, value in self.clients.items(): task, queue = value diff --git a/api/tests/opentrons/server/test_server.py b/api/tests/opentrons/server/test_server.py index 8726c37798a..54f968311c3 100755 --- a/api/tests/opentrons/server/test_server.py +++ b/api/tests/opentrons/server/test_server.py @@ -97,7 +97,8 @@ async def test_init(session, root): 'v': {'value': 0} }, 'type': serialized_type, - '$': {'type': rpc.CONTROL_MESSAGE}} + '$': {'type': rpc.CONTROL_MESSAGE, 'monitor': True} + } assert serialized_type['v']['STATIC'] == 'static', \ 'Class attributes are serialized correctly' @@ -113,7 +114,7 @@ async def test_exception_during_call(session): 'type': rpc.CALL_NACK_MESSAGE, 'token': session.token } - assert res.pop('reason').startswith('TypeError: build_call()') + assert res.pop('reason').startswith('TypeError:') assert res == {} @@ -419,7 +420,7 @@ async def test_concurrent_and_disconnect(loop, root, session, connect): # noqa for res in results: # First message is root info - assert res.pop(0)['$'] == {'type': 3} + assert res.pop(0)['$'] == {'type': 3, 'monitor': True} expected = [] # All acks received expected.extend([ack_message(token) for token in tokens]) diff --git a/app/src/components/LostConnectionAlert/index.js b/app/src/components/LostConnectionAlert/index.js index 4f340486206..583a6e22ec3 100644 --- a/app/src/components/LostConnectionAlert/index.js +++ b/app/src/components/LostConnectionAlert/index.js @@ -57,9 +57,10 @@ function makeMapStateToProps () { return (state: State) => { const robot = robotSelectors.getConnectedRobot(state) + const unexpectedDisconnect = state.robot.connection.unexpectedDisconnect return { - ok: robot && getHealthOk(state, robot) + ok: robot && !unexpectedDisconnect && getHealthOk(state, robot) } } } diff --git a/app/src/health-check/__tests__/health-check.test.js b/app/src/health-check/__tests__/health-check.test.js index 3f523b72aee..d2a93a7c5e9 100644 --- a/app/src/health-check/__tests__/health-check.test.js +++ b/app/src/health-check/__tests__/health-check.test.js @@ -154,7 +154,7 @@ describe('health check', () => { test('CONNECT_RESPONSE success dispatches START_HEALTH_CHECK', () => { state.robot.connection.connectedTo = '' - invoke(robotActions.connectResponse()) + invoke(robotActions.connectResponse(null, true)) // middleware should pull `robot` from the connection request state expect(store.dispatch).toHaveBeenCalledWith( startHealthCheck(expect.objectContaining({ @@ -169,6 +169,12 @@ describe('health check', () => { expect(store.dispatch).toHaveBeenCalledTimes(0) }) + test('CONNECT_RESPONSE with pollHealth: false noops', () => { + state.robot.connection.connectedTo = '' + invoke(robotActions.connectResponse(null, false)) + expect(store.dispatch).toHaveBeenCalledTimes(0) + }) + test('DISCONNECT_RESPONSE dispatches (STOP|RESET)_HEALTH_CHECK', () => { const expectedRobot = expect.objectContaining({name}) invoke(robotActions.disconnectResponse()) diff --git a/app/src/health-check/index.js b/app/src/health-check/index.js index d39609239bf..e9a1cf61232 100644 --- a/app/src/health-check/index.js +++ b/app/src/health-check/index.js @@ -122,7 +122,7 @@ export const healthCheckMiddleware: Middleware = break case 'robot:CONNECT_RESPONSE': - if (!action.payload.error) { + if (!action.payload.error && action.payload.pollHealth) { const state = store.getState() const name = getConnectRequest(state).name const robot = getDiscovered(state).find(r => r.name === name) diff --git a/app/src/robot/actions.js b/app/src/robot/actions.js index e27e22bac90..417f8c714cc 100755 --- a/app/src/robot/actions.js +++ b/app/src/robot/actions.js @@ -51,7 +51,8 @@ export type ConnectAction = {| export type ConnectResponseAction = {| type: 'robot:CONNECT_RESPONSE', payload: {| - error: ?{message: string} + error: ?{message: string}, + pollHealth: ?boolean, |}, |} @@ -78,6 +79,10 @@ export type DisconnectResponseAction = {| payload: {}, |} +export type UnexpectedDisconnectAction = {| + type: 'robot:UNEXPECTED_DISCONNECT', +|} + export type ConfirmProbedAction = {| type: 'robot:CONFIRM_PROBED', payload: Mount @@ -222,6 +227,7 @@ export type Action = | DisconnectAction | DisconnectResponseAction | ClearConnectResponseAction + | UnexpectedDisconnectAction | ConfirmProbedAction | PipetteCalibrationAction | LabwareCalibrationAction @@ -253,10 +259,10 @@ export const actions = { } }, - connectResponse (error: ?Error): ConnectResponseAction { + connectResponse (error: ?Error, pollHealth: ?boolean): ConnectResponseAction { return { type: 'robot:CONNECT_RESPONSE', - payload: {error} + payload: {error, pollHealth} } }, @@ -275,6 +281,10 @@ export const actions = { } }, + unexpectedDisconnect (): UnexpectedDisconnectAction { + return {type: 'robot:UNEXPECTED_DISCONNECT'} + }, + // TODO(mc, 2018-08-10): remove addDiscovered (service: RobotService): AddDiscoveredAction { return {type: 'robot:ADD_DISCOVERED', payload: service} diff --git a/app/src/robot/api-client/client.js b/app/src/robot/api-client/client.js index d967a36b7b4..bbb2d16d571 100755 --- a/app/src/robot/api-client/client.js +++ b/app/src/robot/api-client/client.js @@ -68,6 +68,7 @@ export default function client (dispatch) { rpcClient = c rpcClient .on('notification', handleRobotNotification) + .on('close', handleUnexpectedDisconnect) .on('error', handleClientError) remote = rpcClient.remote @@ -86,21 +87,30 @@ export default function client (dispatch) { } } - dispatch(actions.connectResponse()) + // only poll health if RPC is not monitoring itself with ping/pong + dispatch(actions.connectResponse(null, !rpcClient.monitoring)) }) .catch((e) => dispatch(actions.connectResponse(e))) } function disconnect () { - if (rpcClient) rpcClient.close() + if (rpcClient) { + rpcClient.removeAllListeners('notification') + rpcClient.removeAllListeners('error') + rpcClient.removeAllListeners('close') + rpcClient.close() + rpcClient = null + } clearRunTimerInterval() - rpcClient = null remote = null - dispatch(actions.disconnectResponse()) } + function handleUnexpectedDisconnect () { + dispatch(actions.unexpectedDisconnect()) + } + function createSession (state, action) { const file = action.payload.file const name = file.name diff --git a/app/src/robot/reducer/connection.js b/app/src/robot/reducer/connection.js index 45a48ba2613..5a9158490b3 100644 --- a/app/src/robot/reducer/connection.js +++ b/app/src/robot/reducer/connection.js @@ -21,12 +21,14 @@ type State = { inProgress: boolean, error: ?{message: string} }, + unexpectedDisconnect: boolean, } const INITIAL_STATE: State = { connectedTo: '', connectRequest: {inProgress: false, error: null, name: ''}, - disconnectRequest: {inProgress: false, error: null} + disconnectRequest: {inProgress: false, error: null}, + unexpectedDisconnect: false } export default function connectionReducer ( @@ -50,6 +52,9 @@ export default function connectionReducer ( case 'robot:DISCONNECT_RESPONSE': return handleDisconnectResponse(state, action) + + case 'robot:UNEXPECTED_DISCONNECT': + return {...state, unexpectedDisconnect: true} } return state @@ -92,7 +97,8 @@ function handleDisconnectResponse ( return { ...state, connectedTo: '', - disconnectRequest: {error: null, inProgress: false} + disconnectRequest: {error: null, inProgress: false}, + unexpectedDisconnect: false } } diff --git a/app/src/robot/selectors.js b/app/src/robot/selectors.js index 1c272cd11e9..b11d1cd8981 100644 --- a/app/src/robot/selectors.js +++ b/app/src/robot/selectors.js @@ -51,7 +51,7 @@ export const getDiscovered: Selector> = // because of dependency problem in WebWorker where this selector is used state => state.discovery.robotsByName, state => connection(state).connectedTo, - (discoveredByName, connectedTo) => { + (discoveredByName, connectedTo, unexpectedDisconnect) => { const robots = Object.keys(discoveredByName) .map(name => { const robot = discoveredByName[name] @@ -94,18 +94,21 @@ export const getConnectedRobot: Selector = createSelector( (discovered) => discovered.find((r) => r.isConnected) ) -export const getConnectionStatus = createSelector( - getConnectedRobotName, - (state: State) => getConnectRequest(state).inProgress, - (state: State) => connection(state).disconnectRequest.inProgress, - (connectedTo, isConnecting, isDisconnecting): ConnectionStatus => { - if (!connectedTo && isConnecting) return 'connecting' - if (connectedTo && !isDisconnecting) return 'connected' - if (connectedTo && isDisconnecting) return 'disconnecting' - - return 'disconnected' - } -) +export const getConnectionStatus: Selector = + createSelector( + getConnectedRobotName, + state => getConnectRequest(state).inProgress, + state => connection(state).disconnectRequest.inProgress, + state => connection(state).unexpectedDisconnect, + (connectedTo, isConnecting, isDisconnecting, unexpectedDisconnect) => { + if (unexpectedDisconnect) return 'disconnected' + if (!connectedTo && isConnecting) return 'connecting' + if (connectedTo && !isDisconnecting) return 'connected' + if (connectedTo && isDisconnecting) return 'disconnecting' + + return 'disconnected' + } + ) export function getSessionLoadInProgress (state: State) { return sessionRequest(state).inProgress diff --git a/app/src/robot/test/api-client.test.js b/app/src/robot/test/api-client.test.js index d93feb95e6c..bdac4d23c42 100755 --- a/app/src/robot/test/api-client.test.js +++ b/app/src/robot/test/api-client.test.js @@ -48,6 +48,7 @@ describe('api client', () => { // rejection warnings. These warnings are Jest's fault for nextTick stuff // http://clarkdave.net/2016/09/node-v6-6-and-asynchronously-handled-promise-rejections/ on: jest.fn(() => rpcClient), + removeAllListeners: jest.fn(() => rpcClient), close: jest.fn(), remote: { session_manager: sessionManager, @@ -106,7 +107,7 @@ describe('api client', () => { describe('connect and disconnect', () => { test('connect RpcClient on CONNECT message', () => { - const expectedResponse = actions.connectResponse() + const expectedResponse = actions.connectResponse(null, true) expect(RpcClient).toHaveBeenCalledTimes(0) @@ -127,6 +128,18 @@ describe('api client', () => { .then(() => expect(dispatch).toHaveBeenCalledWith(expectedResponse)) }) + test('send CONNECT_RESPONSE w pollHealth: false if RPC.monitoring', () => { + const expectedResponse = actions.connectResponse(null, false) + + rpcClient.monitoring = true + + return sendConnect() + .then(() => { + expect(RpcClient).toHaveBeenCalledWith(`ws://${ROBOT_IP}:31950`) + expect(dispatch).toHaveBeenCalledWith(expectedResponse) + }) + }) + test('dispatch DISCONNECT_RESPONSE if already disconnected', () => { const expected = actions.disconnectResponse() diff --git a/app/src/robot/test/connection-reducer.test.js b/app/src/robot/test/connection-reducer.test.js index c954471896d..8c98d7807f3 100644 --- a/app/src/robot/test/connection-reducer.test.js +++ b/app/src/robot/test/connection-reducer.test.js @@ -10,7 +10,8 @@ describe('robot reducer - connection', () => { expect(getState(state)).toEqual({ connectedTo: '', connectRequest: {inProgress: false, error: null, name: ''}, - disconnectRequest: {inProgress: false, error: null} + disconnectRequest: {inProgress: false, error: null}, + unexpectedDisconnect: false }) }) @@ -124,14 +125,16 @@ describe('robot reducer - connection', () => { const state = { connection: { connectedTo: 'ot', - disconnectRequest: {inProgress: true, error: null} + disconnectRequest: {inProgress: true, error: null}, + unexpectedDisconnect: true } } const action = {type: 'robot:DISCONNECT_RESPONSE', payload: {}} expect(getState(reducer(state, action))).toEqual({ connectedTo: '', - disconnectRequest: {inProgress: false, error: null} + disconnectRequest: {inProgress: false, error: null}, + unexpectedDisconnect: false }) }) }) diff --git a/app/src/rpc/client.js b/app/src/rpc/client.js index 2c6fe075682..e3e122f9ab7 100644 --- a/app/src/rpc/client.js +++ b/app/src/rpc/client.js @@ -13,7 +13,8 @@ import { ACK, NACK, NOTIFICATION, - CONTROL_MESSAGE + CONTROL_MESSAGE, + PONG } from './message-types' // TODO(mc, 2017-08-29): see note about uuid above @@ -26,6 +27,10 @@ const RECEIVE_CONTROL_TIMEOUT = 10000 const CALL_ACK_TIMEOUT = 10000 // const CALL_RESULT_TIMEOUT = 240000 +// ping pong +const PING_INTERVAL_MS = 3000 +const MISSED_PING_THRESHOLD = 2 + // metadata constants const REMOTE_TARGET_OBJECT = 0 const REMOTE_TYPE_OBJECT = 1 @@ -44,6 +49,10 @@ class RpcContext extends EventEmitter { this._ws = ws this._resultTypes = new Map() this._typeObjectCache = new Map() + this._pingInterval = null + this._missedPings = 0 + + this.monitoring = false this.remote = null // default max listeners is 10, we need more than that // keeping this at a finite number just in case we get a leak later @@ -51,7 +60,7 @@ class RpcContext extends EventEmitter { ws.on('error', this._handleError.bind(this)) ws.on('message', this._handleMessage.bind(this)) - ws.on('close', this._handleClose.bind(this)) + ws.once('close', this.close.bind(this)) } callRemote (id, name, args = []) { @@ -137,11 +146,16 @@ class RpcContext extends EventEmitter { return this.callRemote(null, 'get_object_by_id', [typeId]) } - // remove all event listeners and close the websocket + // close the websocket and cleanup self close () { - this.removeAllListeners() + clearInterval(this._pingInterval) this._ws.removeAllListeners() this._ws.close() + this.eventNames() + .filter(n => n !== 'close') + .forEach(n => this.removeAllListeners(n)) + + this.emit('close') } // cache required metadata from call results @@ -169,23 +183,33 @@ class RpcContext extends EventEmitter { } } + _startMonitoring () { + this.monitoring = true + this._pingInterval = setInterval(this._ping.bind(this), PING_INTERVAL_MS) + } + + _ping () { + if (this._missedPings > MISSED_PING_THRESHOLD) return this.close() + + this._send({$: {ping: true}}) + this._missedPings = this._missedPings + 1 + } + + _handlePong () { + this._missedPings = 0 + } + _send (message) { // log.debug('Sending: %j', message) this._ws.send(message) } - _handleClose () { - this.emit('close') - } - _handleError (error) { this.emit('error', error) } // TODO(mc): split this method up _handleMessage (message) { - // log.debug('Received message %j', message) - const {$: meta, data} = message const type = meta.type @@ -198,6 +222,8 @@ class RpcContext extends EventEmitter { this._cacheCallResultMetadata(root) this._cacheCallResultMetadata(rootType) + if (meta.monitor) this._startMonitoring() + RemoteObject(this, root) .then((remote) => { this.remote = remote @@ -234,6 +260,10 @@ class RpcContext extends EventEmitter { break + case PONG: + this._handlePong() + break + default: break } diff --git a/app/src/rpc/message-types.js b/app/src/rpc/message-types.js index c7938ceb0aa..f4075d711ff 100644 --- a/app/src/rpc/message-types.js +++ b/app/src/rpc/message-types.js @@ -7,6 +7,7 @@ export const ACK = 1 export const NOTIFICATION = 2 export const CONTROL_MESSAGE = 3 export const NACK = 4 +export const PONG = 5 // statuses export const statuses = {