Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app-shell): Migrate desktop app notify connectivity to discovery-client #14648

Merged
merged 39 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
421958b
update constants
mjhuff Mar 12, 2024
9b07898
redo notify to support new connectivity and unsubscribes
mjhuff Mar 12, 2024
c519ee1
add the callback to discovery
mjhuff Mar 12, 2024
4704772
clean up unsubscribe
mjhuff Mar 13, 2024
f0de47d
try/catch the webcontents send to prevent an error on app close
mjhuff Mar 13, 2024
417b402
we don't need to pass in browserWindow now
mjhuff Mar 13, 2024
2fa5cd8
denest and consolidate everything
mjhuff Mar 13, 2024
c5e2ee2
let's try a new structure here
mjhuff Mar 13, 2024
2b10635
everything in its own file
mjhuff Mar 13, 2024
02d9d77
clean up index.ts
mjhuff Mar 13, 2024
4504ad4
clean up deserialize.ts
mjhuff Mar 13, 2024
1f63b4a
clean up connect.ts
mjhuff Mar 13, 2024
fd1c44d
clean up subscribe
mjhuff Mar 13, 2024
780e0c1
random small cleanup
mjhuff Mar 13, 2024
b14bd61
perhaps closures are the poor man's class
mjhuff Mar 14, 2024
ce0c2b4
pycharm fun
mjhuff Mar 14, 2024
3d5635b
spruce up deserialize
mjhuff Mar 14, 2024
da27bb1
spruce up index
mjhuff Mar 14, 2024
466d03d
spruce up connect
mjhuff Mar 15, 2024
d0ca3da
spruce ub subscribe
mjhuff Mar 15, 2024
fc73504
spruce up unsubscribe
mjhuff Mar 15, 2024
9a00811
some cleanup
mjhuff Mar 15, 2024
75939dc
log.ts -> notifyLog.ts
mjhuff Mar 15, 2024
f403610
promise
mjhuff Mar 15, 2024
9b2cb25
random cleanup
mjhuff Mar 15, 2024
785744c
DRY up the sendToBrowser logic
mjhuff Mar 15, 2024
cb7f8d1
use the top level name property from discovery-client to prevent redu…
mjhuff Mar 18, 2024
107efcd
disconnect from hosts as they become unreachable
mjhuff Mar 18, 2024
dcd45a7
cleanup
mjhuff Mar 18, 2024
5731094
properly handle associated host connection & disconnection
mjhuff Mar 18, 2024
954288f
we mean ip, not hostname
mjhuff Mar 18, 2024
1d6800e
P R O M I S I F Y (and add errors)
mjhuff Mar 18, 2024
264ee27
fix a couple booleans
mjhuff Mar 18, 2024
e2f5485
let's add robotName to error messages
mjhuff Mar 18, 2024
dcd0b5b
retry on other IPs if current is port blocked
mjhuff Mar 18, 2024
c25f0ce
cleanup connectivity
mjhuff Mar 19, 2024
2fb099c
feedback
mjhuff Mar 20, 2024
5313b98
a few random fixes
mjhuff Mar 21, 2024
d18d2f8
some tests
mjhuff Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app-shell/src/__tests__/discovery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ vi.mock('../log', () => {
},
}
})
vi.mock('../notifications')

let mockGet = vi.fn(property => {
return []
Expand Down
14 changes: 0 additions & 14 deletions app-shell/src/config/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ import {
VALUE_UPDATED,
VIEW_PROTOCOL_SOURCE_FOLDER,
NOTIFY_SUBSCRIBE,
NOTIFY_UNSUBSCRIBE,
ROBOT_MASS_STORAGE_DEVICE_ADDED,
ROBOT_MASS_STORAGE_DEVICE_ENUMERATED,
ROBOT_MASS_STORAGE_DEVICE_REMOVED,
Expand All @@ -99,7 +98,6 @@ import type {
AppRestartAction,
NotifySubscribeAction,
NotifyTopic,
NotifyUnsubscribeAction,
ReloadUiAction,
RobotMassStorageDeviceAdded,
RobotMassStorageDeviceEnumerated,
Expand Down Expand Up @@ -421,15 +419,3 @@ export const notifySubscribeAction = (
},
meta: { shell: true },
})

export const notifyUnsubscribeAction = (
hostname: string,
topic: NotifyTopic
): NotifyUnsubscribeAction => ({
type: NOTIFY_UNSUBSCRIBE,
payload: {
hostname,
topic,
},
meta: { shell: true },
})
7 changes: 5 additions & 2 deletions app-shell/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV
'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED'
export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' =
'shell:NOTIFY_SUBSCRIBE'
export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' =
'shell:NOTIFY_UNSUBSCRIBE'

// copy
// TODO(mc, 2020-05-11): i18n
Expand All @@ -247,3 +245,8 @@ export const DISCOVERY_UPDATE_LIST: DISCOVERY_UPDATE_LIST_TYPE =
export const DISCOVERY_REMOVE: DISCOVERY_REMOVE_TYPE = 'discovery:REMOVE'

export const CLEAR_CACHE: CLEAR_CACHE_TYPE = 'discovery:CLEAR_CACHE'
export const HEALTH_STATUS_OK: 'ok' = 'ok'
export const FAILURE_STATUSES = {
ECONNREFUSED: 'ECONNREFUSED',
ECONNFAILED: 'ECONNFAILED',
} as const
17 changes: 10 additions & 7 deletions app-shell/src/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,29 @@ import {
DEFAULT_PORT,
} from '@opentrons/discovery-client'
import {
CLEAR_CACHE,
DISCOVERY_FINISH,
DISCOVERY_REMOVE,
DISCOVERY_START,
OPENTRONS_USB,
UI_INITIALIZED,
USB_HTTP_REQUESTS_START,
USB_HTTP_REQUESTS_STOP,
} from './constants'
} from '@opentrons/app/src/redux/shell/actions'
import {
DISCOVERY_START,
DISCOVERY_FINISH,
DISCOVERY_REMOVE,
CLEAR_CACHE,
} from '@opentrons/app/src/redux/discovery/actions'
import { OPENTRONS_USB } from '@opentrons/app/src/redux/discovery/constants'

import { getFullConfig, handleConfigChange } from './config'
import { createLogger } from './log'
import { getSerialPortHttpAgent } from './usb'
import { handleNotificationConnectionsFor } from './notifications'

import type {
Address,
DiscoveryClientRobot,
LegacyService,
DiscoveryClient,
} from '@opentrons/discovery-client'

import type { Action, Dispatch } from './types'
import type { ConfigV1 } from '@opentrons/app/src/redux/config/schema-types'

Expand Down Expand Up @@ -199,6 +201,7 @@ export function registerDiscovery(

function handleRobots(): void {
const robots = client.getRobots()
handleNotificationConnectionsFor(robots)

if (!disableCache) store.set('robots', robots)

Expand Down
2 changes: 1 addition & 1 deletion app-shell/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { registerProtocolStorage } from './protocol-storage'
import { getConfig, getStore, getOverrides, registerConfig } from './config'
import { registerUsb } from './usb'
import { createUsbDeviceMonitor } from './system-info/usb-devices'
import { registerNotify, closeAllNotifyConnections } from './notify'
import { registerNotify, closeAllNotifyConnections } from './notifications'

import type { BrowserWindow } from 'electron'
import type { Dispatch, Logger } from './types'
Expand Down
215 changes: 215 additions & 0 deletions app-shell/src/notifications/connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import {
sendDeserialized,
sendDeserializedGenericError,
deserializeExpectedMessages,
} from './deserialize'
import { unsubscribe } from './unsubscribe'
import { notifyLog } from './notifyLog'
import { FAILURE_STATUSES, HEALTH_STATUS_OK } from '../constants'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'
import type { DiscoveryClientRobot } from '@opentrons/discovery-client'

// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied.
const CLIENT_ID = 'app-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs
const connectOptions: mqtt.IClientOptions = {
clientId: CLIENT_ID,
port: 1883,
keepalive: 60,
protocolVersion: 5,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true,
}

export interface RobotData {
ip: string
robotName: string
}

// This is the discovery-client equivalent of "available" robots when viewing the Devices page in the app.
export function getHealthyRobotDataForNotifyConnections(
robots: DiscoveryClientRobot[]
): RobotData[] {
return robots.flatMap(robot =>
robot.addresses
.filter(address => address.healthStatus === HEALTH_STATUS_OK)
.map(address => ({ ip: address.ip, robotName: robot.name }))
)
}

/**
*
* @description Remove broker connections from the connection store by forcibly disconnecting from brokers
* as robots are no longer discoverable.
*/
export function cleanUpUnreachableRobots(
healthyRobots: RobotData[]
): Promise<void> {
return new Promise(() => {
const healthyRobotIPs = healthyRobots.map(({ ip }) => ip)
const healthyRobotIPsSet = new Set(healthyRobotIPs)
const unreachableRobots = connectionStore
.getReachableHosts()
.filter(hostname => {
return !healthyRobotIPsSet.has(hostname)
})
void closeConnectionsForcefullyFor(unreachableRobots)
})
}

export function establishConnections(
healthyRobots: RobotData[]
): Promise<void> {
return new Promise(() => {
const newConnections = healthyRobots.filter(({ ip, robotName }) => {
if (connectionStore.isConnectedToBroker(ip)) {
mjhuff marked this conversation as resolved.
Show resolved Hide resolved
return false
} else if (connectionStore.isAssociatedWithExistingHostData(robotName)) {
if (!connectionStore.isAssociatedBrokerErrored(robotName)) {
// If not connected, wait for another poll of discovery-client to resolve the current connection.
if (connectionStore.isAssociatedBrokerConnected(robotName)) {
void connectionStore.associateIPWithExistingHostData(ip, robotName)
}
return false
} else {
// Mark this IP as a new potential broker connection to check if the broker is reachable on this IP.
if (!connectionStore.isKnownPortBlockedIP(ip)) {
void connectionStore.deleteAllAssociatedIPsGivenRobotName(robotName)
return true
} else {
return false
}
}
} else {
return !connectionStore.isKnownPortBlockedIP(ip)
}
})
newConnections.forEach(({ ip, robotName }) => {
void connectionStore
.setPendingConnection(ip, robotName)
.then(() => {
connectAsync(`mqtt://${ip}`)
.then(client => {
notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`)
void connectionStore
.setConnected(ip, client)
.then(() => establishListeners(client, ip, robotName))
.catch((error: Error) => notifyLog.debug(error.message))
})
.catch((error: Error) => {
notifyLog.warn(
`Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} `
)
void connectionStore.setErrorStatus(ip, error.message)
})
})
.catch((error: Error) => notifyLog.debug(error.message))
})
})
}

function connectAsync(brokerURL: string): Promise<mqtt.Client> {
const client = mqtt.connect(brokerURL, connectOptions)

return new Promise((resolve, reject) => {
// Listeners added to client to trigger promise resolution
const promiseListeners: {
[key: string]: (...args: any[]) => void
} = {
connect: () => {
removePromiseListeners()
return resolve(client)
},
// A connection error event will close the connection without a retry.
error: (error: Error | string) => {
removePromiseListeners()
const clientEndPromise = new Promise((resolve, reject) =>
client.end(true, {}, () => resolve(error))
)
return clientEndPromise.then(() => reject(error))
},
end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`),
}

function removePromiseListeners(): void {
Object.keys(promiseListeners).forEach(eventName => {
client.removeListener(eventName, promiseListeners[eventName])
})
}

Object.keys(promiseListeners).forEach(eventName => {
client.on(eventName, promiseListeners[eventName])
})
})
}

function establishListeners(
client: mqtt.MqttClient,
ip: string,
robotName: string
): void {
client.on(
'message',
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => {
deserializeExpectedMessages(message.toString())
.then(deserializedMessage => {
const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage
if (messageContainsUnsubFlag) {
void unsubscribe(ip, topic).catch((error: Error) =>
notifyLog.debug(error.message)
)
}

notifyLog.debug('Received notification data from main via IPC', {
ip,
topic,
})

sendDeserialized({ ip, topic, message: deserializedMessage })
})
.catch(error => notifyLog.debug(`${error.message}`))
}
)

client.on('reconnect', () => {
notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`)
})
// handles transport layer errors only
client.on('error', error => {
notifyLog.warn(`Error - ${error.name}: ${error.message}`)
sendDeserializedGenericError(ip, 'ALL_TOPICS')
client.end()
})

client.on('end', () => {
notifyLog.debug(`Closed connection to ${robotName} on ${ip}`)
void connectionStore.setErrorStatus(ip, FAILURE_STATUSES.ECONNFAILED)
})

client.on('disconnect', packet => {
notifyLog.warn(
`Disconnected from ${robotName} on ${ip} with code ${
packet.reasonCode ?? 'undefined'
}`
)
sendDeserializedGenericError(ip, 'ALL_TOPICS')
})
}

export function closeConnectionsForcefullyFor(
hosts: string[]
): Array<Promise<void>> {
return hosts.map(ip => {
const client = connectionStore.getClient(ip)
return new Promise<void>((resolve, reject) => {
if (client != null) {
client.end(true, {}, () => resolve())
}
})
})
}
71 changes: 71 additions & 0 deletions app-shell/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import isEqual from 'lodash/isEqual'

import { connectionStore } from './store'

import type {
NotifyBrokerResponses,
NotifyRefetchData,
NotifyResponseData,
NotifyTopic,
NotifyUnsubscribeData,
} from '@opentrons/app/src/redux/shell/types'
import { FAILURE_STATUSES } from '../constants'

interface SendToBrowserParams {
ip: string
topic: NotifyTopic
message: NotifyResponseData
}

const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [
{ refetchUsingHTTP: true },
{ unsubscribe: true },
]

export function sendDeserialized({
ip,
topic,
message,
}: SendToBrowserParams): void {
try {
const browserWindow = connectionStore.getBrowserWindow()
browserWindow?.webContents.send('notify', ip, topic, message)
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedGenericError(
ip: string,
topic: NotifyTopic
): void {
sendDeserialized({
ip,
topic,
message: FAILURE_STATUSES.ECONNFAILED,
})
}

export function deserializeExpectedMessages(
message: string
): Promise<NotifyBrokerResponses> {
return new Promise((resolve, reject) => {
let deserializedMessage: NotifyResponseData | Record<string, unknown>
const error = new Error(
`Unexpected data received from notify broker: ${message}`
)

try {
deserializedMessage = JSON.parse(message)
} catch {
reject(error)
}

const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model =>
isEqual(model, deserializedMessage)
)
if (!isValidNotifyResponse) {
reject(error)
} else {
resolve(JSON.parse(message))
}
})
}
Loading
Loading