Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(app,api): Add opt-in ping/pong monitoring to RPC websocket #2083

Merged
merged 1 commit into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions api/opentrons/server/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
NOTIFICATION_MESSAGE = 2
CONTROL_MESSAGE = 3
CALL_NACK_MESSAGE = 4
PONG_MESSAGE = 5


class Server(object):
Expand Down Expand Up @@ -147,7 +148,7 @@ def task_done(future):
try:
log.debug('Sending root info to {0}'.format(client_id))
await client.send_json({
'$': {'type': CONTROL_MESSAGE},
'$': {'type': CONTROL_MESSAGE, 'monitor': True},
'root': self.call_and_serialize(lambda: self.root),
'type': self.call_and_serialize(lambda: type(self.root))
})
Expand Down Expand Up @@ -222,17 +223,23 @@ async def process(self, message):
try:
if message.type == aiohttp.WSMsgType.TEXT:
data = json.loads(message.data)
meta = data.pop('$')
token = meta['token']
meta = data.get('$', {})
token = meta.get('token')
_id = data.get('id')

# If no id, or id is null/none/undefined assume
# a system call to system instance
if 'id' not in data or data['id'] is None:
data['id'] = id(self.system)
if meta.get('ping'):
return self.send_pong()

# if id is missing from payload or explicitely set to null,
# use the system object
if _id is None:
_id = id(self.system)

try:
_id = data.pop('id', None)
func = self.build_call(_id, **data)
func = self.build_call(
_id=_id,
name=data.get('name'),
args=data.get('args', []))
self.send_ack(token)
except Exception as e:
log.exception("Excption during rpc.Server.process:")
Expand Down Expand Up @@ -302,6 +309,13 @@ def send_ack(self, token):
}
})

def send_pong(self):
self.send({
'$': {
'type': PONG_MESSAGE
}
})

def send(self, payload):
for socket, value in self.clients.items():
task, queue = value
Expand Down
7 changes: 4 additions & 3 deletions api/tests/opentrons/server/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ async def test_init(session, root):
'v': {'value': 0}
},
'type': serialized_type,
'$': {'type': rpc.CONTROL_MESSAGE}}
'$': {'type': rpc.CONTROL_MESSAGE, 'monitor': True}
}

assert serialized_type['v']['STATIC'] == 'static', \
'Class attributes are serialized correctly'
Expand All @@ -113,7 +114,7 @@ async def test_exception_during_call(session):
'type': rpc.CALL_NACK_MESSAGE,
'token': session.token
}
assert res.pop('reason').startswith('TypeError: build_call()')
assert res.pop('reason').startswith('TypeError:')
assert res == {}


Expand Down Expand Up @@ -419,7 +420,7 @@ async def test_concurrent_and_disconnect(loop, root, session, connect): # noqa

for res in results:
# First message is root info
assert res.pop(0)['$'] == {'type': 3}
assert res.pop(0)['$'] == {'type': 3, 'monitor': True}
expected = []
# All acks received
expected.extend([ack_message(token) for token in tokens])
Expand Down
3 changes: 2 additions & 1 deletion app/src/components/LostConnectionAlert/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ function makeMapStateToProps () {

return (state: State) => {
const robot = robotSelectors.getConnectedRobot(state)
const unexpectedDisconnect = state.robot.connection.unexpectedDisconnect

return {
ok: robot && getHealthOk(state, robot)
ok: robot && !unexpectedDisconnect && getHealthOk(state, robot)
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion app/src/health-check/__tests__/health-check.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ describe('health check', () => {

test('CONNECT_RESPONSE success dispatches START_HEALTH_CHECK', () => {
state.robot.connection.connectedTo = ''
invoke(robotActions.connectResponse())
invoke(robotActions.connectResponse(null, true))
// middleware should pull `robot` from the connection request state
expect(store.dispatch).toHaveBeenCalledWith(
startHealthCheck(expect.objectContaining({
Expand All @@ -169,6 +169,12 @@ describe('health check', () => {
expect(store.dispatch).toHaveBeenCalledTimes(0)
})

test('CONNECT_RESPONSE with pollHealth: false noops', () => {
state.robot.connection.connectedTo = ''
invoke(robotActions.connectResponse(null, false))
expect(store.dispatch).toHaveBeenCalledTimes(0)
})

test('DISCONNECT_RESPONSE dispatches (STOP|RESET)_HEALTH_CHECK', () => {
const expectedRobot = expect.objectContaining({name})
invoke(robotActions.disconnectResponse())
Expand Down
2 changes: 1 addition & 1 deletion app/src/health-check/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export const healthCheckMiddleware: Middleware =
break

case 'robot:CONNECT_RESPONSE':
if (!action.payload.error) {
if (!action.payload.error && action.payload.pollHealth) {
const state = store.getState()
const name = getConnectRequest(state).name
const robot = getDiscovered(state).find(r => r.name === name)
Expand Down
16 changes: 13 additions & 3 deletions app/src/robot/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ export type ConnectAction = {|
export type ConnectResponseAction = {|
type: 'robot:CONNECT_RESPONSE',
payload: {|
error: ?{message: string}
error: ?{message: string},
pollHealth: ?boolean,
|},
|}

Expand All @@ -78,6 +79,10 @@ export type DisconnectResponseAction = {|
payload: {},
|}

export type UnexpectedDisconnectAction = {|
type: 'robot:UNEXPECTED_DISCONNECT',
|}

export type ConfirmProbedAction = {|
type: 'robot:CONFIRM_PROBED',
payload: Mount
Expand Down Expand Up @@ -222,6 +227,7 @@ export type Action =
| DisconnectAction
| DisconnectResponseAction
| ClearConnectResponseAction
| UnexpectedDisconnectAction
| ConfirmProbedAction
| PipetteCalibrationAction
| LabwareCalibrationAction
Expand Down Expand Up @@ -253,10 +259,10 @@ export const actions = {
}
},

connectResponse (error: ?Error): ConnectResponseAction {
connectResponse (error: ?Error, pollHealth: ?boolean): ConnectResponseAction {
return {
type: 'robot:CONNECT_RESPONSE',
payload: {error}
payload: {error, pollHealth}
}
},

Expand All @@ -275,6 +281,10 @@ export const actions = {
}
},

unexpectedDisconnect (): UnexpectedDisconnectAction {
return {type: 'robot:UNEXPECTED_DISCONNECT'}
},

// TODO(mc, 2018-08-10): remove
addDiscovered (service: RobotService): AddDiscoveredAction {
return {type: 'robot:ADD_DISCOVERED', payload: service}
Expand Down
18 changes: 14 additions & 4 deletions app/src/robot/api-client/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export default function client (dispatch) {
rpcClient = c
rpcClient
.on('notification', handleRobotNotification)
.on('close', handleUnexpectedDisconnect)
.on('error', handleClientError)

remote = rpcClient.remote
Expand All @@ -86,21 +87,30 @@ export default function client (dispatch) {
}
}

dispatch(actions.connectResponse())
// only poll health if RPC is not monitoring itself with ping/pong
dispatch(actions.connectResponse(null, !rpcClient.monitoring))
})
.catch((e) => dispatch(actions.connectResponse(e)))
}

function disconnect () {
if (rpcClient) rpcClient.close()
if (rpcClient) {
rpcClient.removeAllListeners('notification')
rpcClient.removeAllListeners('error')
rpcClient.removeAllListeners('close')
rpcClient.close()
rpcClient = null
}

clearRunTimerInterval()
rpcClient = null
remote = null

dispatch(actions.disconnectResponse())
}

function handleUnexpectedDisconnect () {
dispatch(actions.unexpectedDisconnect())
}

function createSession (state, action) {
const file = action.payload.file
const name = file.name
Expand Down
10 changes: 8 additions & 2 deletions app/src/robot/reducer/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type State = {
inProgress: boolean,
error: ?{message: string}
},
unexpectedDisconnect: boolean,
}

const INITIAL_STATE: State = {
connectedTo: '',
connectRequest: {inProgress: false, error: null, name: ''},
disconnectRequest: {inProgress: false, error: null}
disconnectRequest: {inProgress: false, error: null},
unexpectedDisconnect: false
}

export default function connectionReducer (
Expand All @@ -50,6 +52,9 @@ export default function connectionReducer (

case 'robot:DISCONNECT_RESPONSE':
return handleDisconnectResponse(state, action)

case 'robot:UNEXPECTED_DISCONNECT':
return {...state, unexpectedDisconnect: true}
}

return state
Expand Down Expand Up @@ -92,7 +97,8 @@ function handleDisconnectResponse (
return {
...state,
connectedTo: '',
disconnectRequest: {error: null, inProgress: false}
disconnectRequest: {error: null, inProgress: false},
unexpectedDisconnect: false
}
}

Expand Down
29 changes: 16 additions & 13 deletions app/src/robot/selectors.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export const getDiscovered: Selector<State, void, Array<Robot>> =
// because of dependency problem in WebWorker where this selector is used
state => state.discovery.robotsByName,
state => connection(state).connectedTo,
(discoveredByName, connectedTo) => {
(discoveredByName, connectedTo, unexpectedDisconnect) => {
const robots = Object.keys(discoveredByName)
.map(name => {
const robot = discoveredByName[name]
Expand Down Expand Up @@ -94,18 +94,21 @@ export const getConnectedRobot: Selector<State, void, ?Robot> = createSelector(
(discovered) => discovered.find((r) => r.isConnected)
)

export const getConnectionStatus = createSelector(
getConnectedRobotName,
(state: State) => getConnectRequest(state).inProgress,
(state: State) => connection(state).disconnectRequest.inProgress,
(connectedTo, isConnecting, isDisconnecting): ConnectionStatus => {
if (!connectedTo && isConnecting) return 'connecting'
if (connectedTo && !isDisconnecting) return 'connected'
if (connectedTo && isDisconnecting) return 'disconnecting'

return 'disconnected'
}
)
export const getConnectionStatus: Selector<State, void, ConnectionStatus> =
createSelector(
getConnectedRobotName,
state => getConnectRequest(state).inProgress,
state => connection(state).disconnectRequest.inProgress,
state => connection(state).unexpectedDisconnect,
(connectedTo, isConnecting, isDisconnecting, unexpectedDisconnect) => {
if (unexpectedDisconnect) return 'disconnected'
if (!connectedTo && isConnecting) return 'connecting'
if (connectedTo && !isDisconnecting) return 'connected'
if (connectedTo && isDisconnecting) return 'disconnecting'

return 'disconnected'
}
)

export function getSessionLoadInProgress (state: State) {
return sessionRequest(state).inProgress
Expand Down
15 changes: 14 additions & 1 deletion app/src/robot/test/api-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ describe('api client', () => {
// rejection warnings. These warnings are Jest's fault for nextTick stuff
// http://clarkdave.net/2016/09/node-v6-6-and-asynchronously-handled-promise-rejections/
on: jest.fn(() => rpcClient),
removeAllListeners: jest.fn(() => rpcClient),
close: jest.fn(),
remote: {
session_manager: sessionManager,
Expand Down Expand Up @@ -106,7 +107,7 @@ describe('api client', () => {

describe('connect and disconnect', () => {
test('connect RpcClient on CONNECT message', () => {
const expectedResponse = actions.connectResponse()
const expectedResponse = actions.connectResponse(null, true)

expect(RpcClient).toHaveBeenCalledTimes(0)

Expand All @@ -127,6 +128,18 @@ describe('api client', () => {
.then(() => expect(dispatch).toHaveBeenCalledWith(expectedResponse))
})

test('send CONNECT_RESPONSE w pollHealth: false if RPC.monitoring', () => {
const expectedResponse = actions.connectResponse(null, false)

rpcClient.monitoring = true

return sendConnect()
.then(() => {
expect(RpcClient).toHaveBeenCalledWith(`ws://${ROBOT_IP}:31950`)
expect(dispatch).toHaveBeenCalledWith(expectedResponse)
})
})

test('dispatch DISCONNECT_RESPONSE if already disconnected', () => {
const expected = actions.disconnectResponse()

Expand Down
9 changes: 6 additions & 3 deletions app/src/robot/test/connection-reducer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ describe('robot reducer - connection', () => {
expect(getState(state)).toEqual({
connectedTo: '',
connectRequest: {inProgress: false, error: null, name: ''},
disconnectRequest: {inProgress: false, error: null}
disconnectRequest: {inProgress: false, error: null},
unexpectedDisconnect: false
})
})

Expand Down Expand Up @@ -124,14 +125,16 @@ describe('robot reducer - connection', () => {
const state = {
connection: {
connectedTo: 'ot',
disconnectRequest: {inProgress: true, error: null}
disconnectRequest: {inProgress: true, error: null},
unexpectedDisconnect: true
}
}
const action = {type: 'robot:DISCONNECT_RESPONSE', payload: {}}

expect(getState(reducer(state, action))).toEqual({
connectedTo: '',
disconnectRequest: {inProgress: false, error: null}
disconnectRequest: {inProgress: false, error: null},
unexpectedDisconnect: false
})
})
})
Loading