From 1ba616651c85b8a28099b466d811ab8974de5842 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 25 Mar 2024 11:54:29 -0400 Subject: [PATCH] refactor(app-shell-odd): Utilize robot-server unsubscribe flags (#14724) Closes EXEC-319 This is the app-shell-odd equivalent of the app-shell refactor, #14648. It's similar to the app-shell logic, but significantly simpler, since we don't have to manage multiple robots, worry about localhost port blocking, and multiple IPs per robot. The real change lies in the initial connect and final disconnect on app shutdown. Otherwise, the changes are primarily in the ConnectionStore. Because the app no longer utilizes unsubscribe actions in any capacity, we can safely remove those references. --- app-shell-odd/src/actions.ts | 14 - app-shell-odd/src/constants.ts | 7 +- app-shell-odd/src/main.ts | 10 +- app-shell-odd/src/notifications/connect.ts | 121 +++++ .../src/notifications/deserialize.ts | 62 +++ app-shell-odd/src/notifications/index.ts | 65 +++ app-shell-odd/src/notifications/notifyLog.ts | 3 + app-shell-odd/src/notifications/store.ts | 128 +++++ app-shell-odd/src/notifications/subscribe.ts | 120 +++++ .../src/notifications/unsubscribe.ts | 36 ++ app-shell-odd/src/notify.ts | 455 ------------------ app-shell/src/notifications/store.ts | 1 - app/src/redux/shell/__tests__/actions.test.ts | 16 +- app/src/redux/shell/actions.ts | 15 - app/src/redux/shell/types.ts | 12 - .../__tests__/useNotifyService.test.ts | 20 +- app/src/resources/useNotifyService.ts | 9 +- 17 files changed, 554 insertions(+), 540 deletions(-) create mode 100644 app-shell-odd/src/notifications/connect.ts create mode 100644 app-shell-odd/src/notifications/deserialize.ts create mode 100644 app-shell-odd/src/notifications/index.ts create mode 100644 app-shell-odd/src/notifications/notifyLog.ts create mode 100644 app-shell-odd/src/notifications/store.ts create mode 100644 app-shell-odd/src/notifications/subscribe.ts create mode 100644 app-shell-odd/src/notifications/unsubscribe.ts delete mode 100644 app-shell-odd/src/notify.ts diff --git a/app-shell-odd/src/actions.ts b/app-shell-odd/src/actions.ts index 92bef0b73f4..d1427d8468d 100644 --- a/app-shell-odd/src/actions.ts +++ b/app-shell-odd/src/actions.ts @@ -76,7 +76,6 @@ import { VALUE_UPDATED, VIEW_PROTOCOL_SOURCE_FOLDER, NOTIFY_SUBSCRIBE, - NOTIFY_UNSUBSCRIBE, ROBOT_MASS_STORAGE_DEVICE_ADDED, ROBOT_MASS_STORAGE_DEVICE_ENUMERATED, ROBOT_MASS_STORAGE_DEVICE_REMOVED, @@ -105,7 +104,6 @@ import type { AppRestartAction, NotifySubscribeAction, NotifyTopic, - NotifyUnsubscribeAction, ReloadUiAction, RobotMassStorageDeviceAdded, RobotMassStorageDeviceEnumerated, @@ -428,18 +426,6 @@ export const notifySubscribeAction = ( meta: { shell: true }, }) -export const notifyUnsubscribeAction = ( - hostname: string, - topic: NotifyTopic -): NotifyUnsubscribeAction => ({ - type: NOTIFY_UNSUBSCRIBE, - payload: { - hostname, - topic, - }, - meta: { shell: true }, -}) - export function startDiscovery( timeout: number | null = null ): StartDiscoveryAction { diff --git a/app-shell-odd/src/constants.ts b/app-shell-odd/src/constants.ts index c76f302c130..788fdf70cd7 100644 --- a/app-shell-odd/src/constants.ts +++ b/app-shell-odd/src/constants.ts @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV 'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED' export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' = 'shell:NOTIFY_SUBSCRIBE' -export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' = - 'shell:NOTIFY_UNSUBSCRIBE' // copy // TODO(mc, 2020-05-11): i18n @@ -252,3 +250,8 @@ export const HTTP_API_VERSION: 3 = 3 export const SEND_READY_STATUS: 'shell:SEND_READY_STATUS' = 'shell:SEND_READY_STATUS' + +export const FAILURE_STATUSES = { + ECONNREFUSED: 'ECONNREFUSED', + ECONNFAILED: 'ECONNFAILED', +} as const diff --git a/app-shell-odd/src/main.ts b/app-shell-odd/src/main.ts index f536f56f96c..eaea1768078 100644 --- a/app-shell-odd/src/main.ts +++ b/app-shell-odd/src/main.ts @@ -23,7 +23,11 @@ import { } from './config' import systemd from './systemd' import { watchForMassStorage } from './usb' -import { registerNotify, closeAllNotifyConnections } from './notify' +import { + registerNotify, + establishBrokerConnection, + closeBrokerConnection, +} from './notifications' import type { BrowserWindow } from 'electron' import type { Dispatch, Logger } from './types' @@ -58,7 +62,7 @@ if (config.devtools) app.once('ready', installDevtools) app.once('window-all-closed', () => { log.debug('all windows closed, quitting the app') - closeAllNotifyConnections() + closeBrokerConnection() .then(() => { app.quit() }) @@ -96,7 +100,7 @@ function startUp(): void { mainWindow = createUi(dispatch) rendererLogger = createRendererLogger() - + void establishBrokerConnection() mainWindow.once('closed', () => (mainWindow = null)) log.info('Fetching latest software version') diff --git a/app-shell-odd/src/notifications/connect.ts b/app-shell-odd/src/notifications/connect.ts new file mode 100644 index 00000000000..67df09de466 --- /dev/null +++ b/app-shell-odd/src/notifications/connect.ts @@ -0,0 +1,121 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { + sendDeserialized, + sendDeserializedGenericError, + deserializeExpectedMessages, +} from './deserialize' +import { unsubscribe } from './unsubscribe' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. +const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs +const connectOptions: mqtt.IClientOptions = { + clientId: CLIENT_ID, + port: 1883, + keepalive: 60, + protocolVersion: 5, + reconnectPeriod: 1000, + connectTimeout: 30 * 1000, + clean: true, + resubscribe: true, +} + +export function connectAsync(brokerURL: string): Promise { + const client = mqtt.connect(brokerURL, connectOptions) + + return new Promise((resolve, reject) => { + // Listeners added to client to trigger promise resolution + const promiseListeners: { + [key: string]: (...args: any[]) => void + } = { + connect: () => { + removePromiseListeners() + return resolve(client) + }, + // A connection error event will close the connection without a retry. + error: (error: Error | string) => { + removePromiseListeners() + const clientEndPromise = new Promise((resolve, reject) => + client.end(true, {}, () => resolve(error)) + ) + return clientEndPromise.then(() => reject(error)) + }, + end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), + } + + function removePromiseListeners(): void { + Object.keys(promiseListeners).forEach(eventName => { + client.removeListener(eventName, promiseListeners[eventName]) + }) + } + + Object.keys(promiseListeners).forEach(eventName => { + client.on(eventName, promiseListeners[eventName]) + }) + }) +} + +export function establishListeners(): void { + const client = connectionStore.client as mqtt.MqttClient + const { ip, robotName } = connectionStore + + client.on( + 'message', + (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { + deserializeExpectedMessages(message.toString()) + .then(deserializedMessage => { + const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage + if (messageContainsUnsubFlag) { + void unsubscribe(topic).catch((error: Error) => + notifyLog.debug(error.message) + ) + } + + notifyLog.debug('Received notification data from main via IPC', { + ip, + topic, + }) + + sendDeserialized(topic, deserializedMessage) + }) + .catch(error => notifyLog.debug(`${error.message}`)) + } + ) + + client.on('reconnect', () => { + notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`) + }) + // handles transport layer errors only + client.on('error', error => { + notifyLog.warn(`Error - ${error.name}: ${error.message}`) + sendDeserializedGenericError('ALL_TOPICS') + client.end() + }) + + client.on('end', () => { + notifyLog.debug(`Closed connection to ${robotName} on ${ip}`) + // Marking the connection as failed with a generic error status lets the connection re-establish in the future + // and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing). + void connectionStore.setErrorStatus() + }) + + client.on('disconnect', packet => { + notifyLog.warn( + `Disconnected from ${robotName} on ${ip} with code ${ + packet.reasonCode ?? 'undefined' + }` + ) + sendDeserializedGenericError('ALL_TOPICS') + }) +} + +export function closeConnectionForcefully(): Promise { + const { client } = connectionStore + return new Promise((resolve, reject) => + client?.end(true, {}, () => resolve()) + ) +} diff --git a/app-shell-odd/src/notifications/deserialize.ts b/app-shell-odd/src/notifications/deserialize.ts new file mode 100644 index 00000000000..4539bc97faa --- /dev/null +++ b/app-shell-odd/src/notifications/deserialize.ts @@ -0,0 +1,62 @@ +import isEqual from 'lodash/isEqual' + +import { connectionStore } from './store' + +import type { + NotifyBrokerResponses, + NotifyRefetchData, + NotifyResponseData, + NotifyTopic, + NotifyUnsubscribeData, +} from '@opentrons/app/src/redux/shell/types' +import { FAILURE_STATUSES } from '../constants' + +const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [ + { refetchUsingHTTP: true }, + { unsubscribe: true }, +] + +export function sendDeserialized( + topic: NotifyTopic, + message: NotifyResponseData +): void { + try { + const browserWindow = connectionStore.getBrowserWindow() + browserWindow?.webContents.send( + 'notify', + connectionStore.ip, + topic, + message + ) + } catch {} // Prevents shell erroring during app shutdown event. +} + +export function sendDeserializedGenericError(topic: NotifyTopic): void { + sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED) +} + +export function deserializeExpectedMessages( + message: string +): Promise { + return new Promise((resolve, reject) => { + let deserializedMessage: NotifyResponseData | Record + const error = new Error( + `Unexpected data received from notify broker: ${message}` + ) + + try { + deserializedMessage = JSON.parse(message) + } catch { + reject(error) + } + + const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model => + isEqual(model, deserializedMessage) + ) + if (!isValidNotifyResponse) { + reject(error) + } else { + resolve(JSON.parse(message)) + } + }) +} diff --git a/app-shell-odd/src/notifications/index.ts b/app-shell-odd/src/notifications/index.ts new file mode 100644 index 00000000000..cce5758de72 --- /dev/null +++ b/app-shell-odd/src/notifications/index.ts @@ -0,0 +1,65 @@ +import { connectionStore } from './store' +import { + connectAsync, + establishListeners, + closeConnectionForcefully, +} from './connect' +import { subscribe } from './subscribe' +import { notifyLog } from './notifyLog' + +import type { BrowserWindow } from 'electron' +import type { Action, Dispatch } from '../types' + +// Manages the MQTT broker connection through a connection store. Subscriptions are handled "lazily" - a component must +// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if +// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load. + +export function registerNotify( + dispatch: Dispatch, + mainWindow: BrowserWindow +): (action: Action) => unknown { + // Because of the ODD's start sequence, the browser window will always be defined before relevant actions occur. + if (connectionStore.getBrowserWindow() == null) { + connectionStore.setBrowserWindow(mainWindow) + } + + return function handleAction(action: Action) { + switch (action.type) { + case 'shell:NOTIFY_SUBSCRIBE': + return subscribe(action.payload.topic) + } + } +} + +export function establishBrokerConnection(): Promise { + const { ip, robotName } = connectionStore + + return connectAsync(`mqtt://${connectionStore.ip}`) + .then(client => { + notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`) + void connectionStore + .setConnected(client) + .then(() => establishListeners()) + .catch((error: Error) => notifyLog.debug(error.message)) + }) + .catch((error: Error) => { + notifyLog.warn( + `Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} ` + ) + void connectionStore.setErrorStatus() + }) +} + +export function closeBrokerConnection(): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + reject(Error('Failed to close the connection within the time limit.')) + }, 2000) + + notifyLog.debug( + `Stopping notify service connection for host ${connectionStore.robotName}` + ) + const closeConnection = closeConnectionForcefully() + closeConnection.then(resolve).catch(reject) + }) +} diff --git a/app-shell-odd/src/notifications/notifyLog.ts b/app-shell-odd/src/notifications/notifyLog.ts new file mode 100644 index 00000000000..35507fa2c2a --- /dev/null +++ b/app-shell-odd/src/notifications/notifyLog.ts @@ -0,0 +1,3 @@ +import { createLogger } from '../log' + +export const notifyLog = createLogger('notify') diff --git a/app-shell-odd/src/notifications/store.ts b/app-shell-odd/src/notifications/store.ts new file mode 100644 index 00000000000..9553fba3af4 --- /dev/null +++ b/app-shell-odd/src/notifications/store.ts @@ -0,0 +1,128 @@ +import type mqtt from 'mqtt' + +import { FAILURE_STATUSES } from '../constants' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' +import type { BrowserWindow } from 'electron' + +type FailedConnStatus = typeof FAILURE_STATUSES[keyof typeof FAILURE_STATUSES] + +/** + * @description Manages the internal state of MQTT connections to various robot hosts. + */ +class ConnectionStore { + public readonly ip = '127.0.0.1' + + public readonly robotName = 'LOCALHOST' + + public client: mqtt.MqttClient | null = null + + private readonly subscriptions: Set = new Set() + + private readonly pendingSubs: Set = new Set() + + private readonly pendingUnsubs: Set = new Set() + + private unreachableStatus: FailedConnStatus | null = null + + private browserWindow: BrowserWindow | null = null + + public getBrowserWindow(): BrowserWindow | null { + return this.browserWindow + } + + /** + * @returns {FailedConnStatus} "ECONNREFUSED" is a proxy for a port block error and is only returned once + * for analytics reasons. Afterward, a generic "ECONNFAILED" is returned. + */ + public getFailedConnectionStatus(): FailedConnStatus | null { + const failureStatus = this.unreachableStatus + if (failureStatus === FAILURE_STATUSES.ECONNREFUSED) { + this.unreachableStatus = FAILURE_STATUSES.ECONNFAILED + } + return failureStatus + } + + public setBrowserWindow(window: BrowserWindow): void { + this.browserWindow = window + } + + public setConnected(client: mqtt.MqttClient): Promise { + return new Promise((resolve, reject) => { + if (this.client == null) { + this.client = client + resolve() + } else { + reject(new Error(`Connection already exists for ${this.robotName}`)) + } + }) + } + + /** + * @description Marks the host as unreachable. Don't report ECONNREFUSED, since while this is a good enough proxy + * for port block events, it's not perfect, and a port block event can never actually occur on the ODD. + */ + public setErrorStatus(): Promise { + return new Promise((resolve, reject) => { + this.unreachableStatus = FAILURE_STATUSES.ECONNFAILED + resolve() + }) + } + + public setSubStatus( + topic: NotifyTopic, + status: 'pending' | 'subscribed' + ): Promise { + return new Promise((resolve, reject) => { + if (status === 'pending') { + this.pendingSubs.add(topic) + } else { + this.subscriptions.add(topic) + this.pendingSubs.delete(topic) + } + resolve() + }) + } + + public setUnsubStatus( + topic: NotifyTopic, + status: 'pending' | 'unsubscribed' + ): Promise { + return new Promise((resolve, reject) => { + if (this.subscriptions.has(topic)) { + if (status === 'pending') { + this.pendingUnsubs.add(topic) + } else { + this.pendingUnsubs.delete(topic) + this.subscriptions.delete(topic) + } + } + resolve() + }) + } + + public isConnectedToBroker(): boolean { + return this.client?.connected ?? false + } + + public isPendingSub(topic: NotifyTopic): boolean { + return this.pendingSubs.has(topic) + } + + public isActiveSub(topic: NotifyTopic): boolean { + return this.subscriptions.has(topic) + } + + public isPendingUnsub(topic: NotifyTopic): boolean { + return this.pendingUnsubs.has(topic) + } + + /** + * @description A broker connection is terminated if it is errored or not present in the store. + */ + public isConnectionTerminated(): boolean { + return this.unreachableStatus != null + } +} + +export const connectionStore = new ConnectionStore() diff --git a/app-shell-odd/src/notifications/subscribe.ts b/app-shell-odd/src/notifications/subscribe.ts new file mode 100644 index 00000000000..6e334cb89c9 --- /dev/null +++ b/app-shell-odd/src/notifications/subscribe.ts @@ -0,0 +1,120 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { sendDeserialized, sendDeserializedGenericError } from './deserialize' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +/** + * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger + a render update event if duplicate data is received, we can avoid the additional overhead of guaranteeing "exactly once" delivery. + */ +const subscribeOptions: mqtt.IClientSubscribeOptions = { + qos: 1, +} + +const CHECK_CONNECTION_INTERVAL = 500 + +export function subscribe(topic: NotifyTopic): Promise { + if (connectionStore.isConnectionTerminated()) { + const errorMessage = connectionStore.getFailedConnectionStatus() + if (errorMessage != null) { + sendDeserialized(topic, errorMessage) + } + return Promise.resolve() + } else { + return waitUntilActiveOrErrored('client') + .then(() => { + const { client } = connectionStore + if (client == null) { + return Promise.reject(new Error('Expected hostData, received null.')) + } + + if ( + !connectionStore.isActiveSub(topic) && + !connectionStore.isPendingSub(topic) + ) { + connectionStore + .setSubStatus(topic, 'pending') + .then( + () => + new Promise(() => { + client.subscribe(topic, subscribeOptions, subscribeCb) + }) + ) + .catch((error: Error) => notifyLog.debug(error.message)) + } else { + void waitUntilActiveOrErrored('subscription', topic).catch( + (error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(topic) + } + ) + } + }) + .catch((error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(topic) + }) + } + + function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { + const { robotName, ip } = connectionStore + + if (error != null) { + sendDeserializedGenericError(topic) + notifyLog.debug( + `Failed to subscribe to ${robotName} on ${ip} to topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully subscribed to ${robotName} on ${ip} to topic: ${topic}` + ) + connectionStore + .setSubStatus(topic, 'subscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + } +} + +// Check every 500ms for 2 seconds before failing. +function waitUntilActiveOrErrored( + connection: 'client' | 'subscription', + topic?: NotifyTopic +): Promise { + return new Promise((resolve, reject) => { + if (connection === 'subscription') { + if (topic == null) { + reject( + new Error( + 'Must specify a topic when connection is type "subscription".' + ) + ) + } + } + + const MAX_RETRIES = 4 + let counter = 0 + const intervalId = setInterval(() => { + const hasReceivedAck = + connection === 'client' + ? connectionStore.isConnectedToBroker() + : connectionStore.isActiveSub(topic as NotifyTopic) + if (hasReceivedAck) { + clearInterval(intervalId) + resolve() + } + + counter++ + if (counter === MAX_RETRIES) { + clearInterval(intervalId) + reject( + new Error( + `Maximum number of retries exceeded for ${connectionStore.robotName} on ${connectionStore.ip}.` + ) + ) + } + }, CHECK_CONNECTION_INTERVAL) + }) +} diff --git a/app-shell-odd/src/notifications/unsubscribe.ts b/app-shell-odd/src/notifications/unsubscribe.ts new file mode 100644 index 00000000000..da9d0935ed2 --- /dev/null +++ b/app-shell-odd/src/notifications/unsubscribe.ts @@ -0,0 +1,36 @@ +import { connectionStore } from './store' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +export function unsubscribe(topic: NotifyTopic): Promise { + return new Promise((resolve, reject) => { + if (!connectionStore.isPendingUnsub(topic)) { + connectionStore + .setUnsubStatus(topic, 'pending') + .then(() => { + const { client } = connectionStore + if (client == null) { + return reject(new Error('Expected hostData, received null.')) + } + + client.unsubscribe(topic, {}, (error, result) => { + const { robotName, ip } = connectionStore + if (error != null) { + notifyLog.debug( + `Failed to unsubscribe to ${robotName} on ${ip} from topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully unsubscribed to ${robotName} on ${ip} from topic: ${topic}` + ) + connectionStore + .setUnsubStatus(topic, 'unsubscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) + }) + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) +} diff --git a/app-shell-odd/src/notify.ts b/app-shell-odd/src/notify.ts deleted file mode 100644 index f88280369a0..00000000000 --- a/app-shell-odd/src/notify.ts +++ /dev/null @@ -1,455 +0,0 @@ -/* eslint-disable @typescript-eslint/no-dynamic-delete */ -import mqtt from 'mqtt' -import isEqual from 'lodash/isEqual' - -import { createLogger } from './log' - -import type { BrowserWindow } from 'electron' -import type { - NotifyTopic, - NotifyResponseData, - NotifyRefetchData, - NotifyUnsubscribeData, - NotifyNetworkError, -} from '@opentrons/app/src/redux/shell/types' -import type { Action, Dispatch } from './types' - -// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs. - -// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not -// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker. -// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid. -// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic. -// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails. - -const FAILURE_STATUSES = { - ECONNREFUSED: 'ECONNREFUSED', - ECONNFAILED: 'ECONNFAILED', -} as const - -interface ConnectionStore { - [hostname: string]: { - client: mqtt.MqttClient | null - subscriptions: Record - pendingSubs: Set - } -} - -const connectionStore: ConnectionStore = {} -const unreachableHosts = new Set() -const log = createLogger('notify') -// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. -// This clientId is derived from the mqttjs library. -const CLIENT_ID = 'odd-' + Math.random().toString(16).slice(2, 8) - -const connectOptions: mqtt.IClientOptions = { - clientId: CLIENT_ID, - port: 1883, - keepalive: 60, - protocolVersion: 5, - reconnectPeriod: 1000, - connectTimeout: 30 * 1000, - clean: true, - resubscribe: true, -} - -/** - * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger - a render update event if duplicate data is received, we can avoid the additional overhead - to guarantee "exactly once" delivery. - */ -const subscribeOptions: mqtt.IClientSubscribeOptions = { - qos: 1, -} - -export function registerNotify( - dispatch: Dispatch, - mainWindow: BrowserWindow -): (action: Action) => unknown { - return function handleAction(action: Action) { - switch (action.type) { - case 'shell:NOTIFY_SUBSCRIBE': - return subscribe({ - ...action.payload, - browserWindow: mainWindow, - hostname: '127.0.0.1', - }) - - case 'shell:NOTIFY_UNSUBSCRIBE': - return unsubscribe({ - ...action.payload, - browserWindow: mainWindow, - hostname: '127.0.0.1', - }) - } - } -} - -const CHECK_CONNECTION_INTERVAL = 500 -let hasReportedAPortBlockEvent = false - -interface NotifyParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic -} - -function subscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic, browserWindow } = notifyParams - if (unreachableHosts.has(hostname)) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - return Promise.resolve() - } - // true if no subscription (and therefore connection) to host exists - else if (connectionStore[hostname] == null) { - connectionStore[hostname] = { - client: null, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - subscriptions: { [topic]: 1 } as Record, - pendingSubs: new Set(), - } - return connectAsync(`mqtt://${hostname}`) - .then(client => { - const { pendingSubs } = connectionStore[hostname] - log.info(`Successfully connected to ${hostname}`) - connectionStore[hostname].client = client - pendingSubs.add(topic) - establishListeners({ ...notifyParams, client }) - return new Promise(() => { - client.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - }) - .catch((error: Error) => { - log.warn( - `Failed to connect to ${hostname} - ${error.name}: ${error.message} ` - ) - let failureMessage: NotifyNetworkError = FAILURE_STATUSES.ECONNFAILED - if (connectionStore[hostname]?.client == null) { - unreachableHosts.add(hostname) - if ( - error.message.includes(FAILURE_STATUSES.ECONNREFUSED) && - !hasReportedAPortBlockEvent - ) { - failureMessage = FAILURE_STATUSES.ECONNREFUSED - hasReportedAPortBlockEvent = true - } - } - - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: failureMessage, - }) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - } - // true if the connection store has an entry for the hostname. - else { - return waitUntilActiveOrErrored('client') - .then(() => { - const { client, subscriptions, pendingSubs } = connectionStore[hostname] - const activeClient = client as mqtt.Client - const isNotActiveSubscription = (subscriptions[topic] ?? 0) <= 0 - if (!pendingSubs.has(topic) && isNotActiveSubscription) { - pendingSubs.add(topic) - return new Promise(() => { - activeClient.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - } else { - void waitUntilActiveOrErrored('subscription') - .then(() => { - subscriptions[topic] += 1 - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { - const { subscriptions } = connectionStore[hostname] - if (error != null) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - setTimeout(() => { - if (Object.keys(connectionStore[hostname].subscriptions).length <= 0) { - connectionStore[hostname].client?.end() - } - }, RENDER_TIMEOUT) - } else { - if (subscriptions[topic] > 0) { - subscriptions[topic] += 1 - } else { - subscriptions[topic] = 1 - } - } - } - - // Check every 500ms for 2 seconds before failing. - function waitUntilActiveOrErrored( - connection: 'client' | 'subscription' - ): Promise { - return new Promise((resolve, reject) => { - const MAX_RETRIES = 4 - let counter = 0 - const intervalId = setInterval(() => { - const host = connectionStore[hostname] - const hasReceivedAck = - connection === 'client' - ? host?.client != null - : host?.subscriptions[topic] > 0 - if (hasReceivedAck) { - clearInterval(intervalId) - resolve() - } - - counter++ - if (counter === MAX_RETRIES) { - clearInterval(intervalId) - reject(new Error('Maximum subscription retries exceeded.')) - } - }, CHECK_CONNECTION_INTERVAL) - }) - } -} - -// Because subscription logic is directly tied to the component lifecycle, it is possible -// for a component to trigger an unsubscribe event on dismount while a new component mounts and -// triggers a subscribe event. For the connection store and MQTT to reflect correct topic subscriptions, -// do not unsubscribe and close connections before newly mounted components have had time to update the connection store. -const RENDER_TIMEOUT = 10000 // 10 seconds - -function unsubscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic } = notifyParams - return new Promise(() => { - setTimeout(() => { - if (hostname in connectionStore) { - const { client } = connectionStore[hostname] - const subscriptions = connectionStore[hostname]?.subscriptions - const isLastSubscription = subscriptions[topic] <= 1 - - if (isLastSubscription) { - client?.unsubscribe(topic, {}, (error, result) => { - if (error == null) { - handleDecrementSubscriptionCount(hostname, topic) - } else { - log.warn(`Failed to subscribe on ${hostname} to topic: ${topic}`) - } - }) - } else { - subscriptions[topic] -= 1 - } - } - }, RENDER_TIMEOUT) - }) -} - -function connectAsync(brokerURL: string): Promise { - const client = mqtt.connect(brokerURL, connectOptions) - - return new Promise((resolve, reject) => { - // Listeners added to client to trigger promise resolution - const promiseListeners: { - [key: string]: (...args: any[]) => void - } = { - connect: () => { - removePromiseListeners() - return resolve(client) - }, - // A connection error event will close the connection without a retry. - error: (error: Error | string) => { - removePromiseListeners() - const clientEndPromise = new Promise((resolve, reject) => - client.end(true, {}, () => resolve(error)) - ) - return clientEndPromise.then(() => reject(error)) - }, - end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), - } - - function removePromiseListeners(): void { - Object.keys(promiseListeners).forEach(eventName => { - client.removeListener(eventName, promiseListeners[eventName]) - }) - } - - Object.keys(promiseListeners).forEach(eventName => { - client.on(eventName, promiseListeners[eventName]) - }) - }) -} - -function handleDecrementSubscriptionCount( - hostname: string, - topic: NotifyTopic -): void { - const host = connectionStore[hostname] - if (host) { - const { client, subscriptions } = host - if (topic in subscriptions) { - subscriptions[topic] -= 1 - if (subscriptions[topic] <= 0) { - delete subscriptions[topic] - } - } - - if (Object.keys(subscriptions).length <= 0) { - client?.end() - } - } -} - -interface ListenerParams { - client: mqtt.MqttClient - browserWindow: BrowserWindow - hostname: string -} - -function establishListeners({ - client, - browserWindow, - hostname, -}: ListenerParams): void { - client.on( - 'message', - (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { - deserialize(message.toString()) - .then(deserializedMessage => { - log.debug('Received notification data from main via IPC', { - hostname, - topic, - }) - - browserWindow.webContents.send( - 'notify', - hostname, - topic, - deserializedMessage - ) - }) - .catch(error => log.debug(`${error.message}`)) - } - ) - - client.on('reconnect', () => { - log.info(`Attempting to reconnect to ${hostname}`) - }) - // handles transport layer errors only - client.on('error', error => { - log.warn(`Error - ${error.name}: ${error.message}`) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - client.end() - }) - - client.on('end', () => { - log.info(`Closed connection to ${hostname}`) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - - client.on('disconnect', packet => { - log.warn( - `Disconnected from ${hostname} with code ${ - packet.reasonCode ?? 'undefined' - }` - ) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) -} - -export function closeAllNotifyConnections(): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => { - reject(Error('Failed to close all connections within the time limit.')) - }, 2000) - - log.debug('Stopping notify service connections') - const closeConnections = Object.values(connectionStore).map( - ({ client }) => { - return new Promise((resolve, reject) => { - client?.end(true, {}, () => resolve(null)) - }) - } - ) - Promise.all(closeConnections).then(resolve).catch(reject) - }) -} - -interface SendToBrowserParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic - message: NotifyResponseData -} - -function sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message, -}: SendToBrowserParams): void { - browserWindow.webContents.send('notify', hostname, topic, message) -} - -const VALID_MODELS: [NotifyRefetchData, NotifyUnsubscribeData] = [ - { refetchUsingHTTP: true }, - { unsubscribe: true }, -] - -function deserialize(message: string): Promise { - return new Promise((resolve, reject) => { - let deserializedMessage: NotifyResponseData | Record - const error = new Error( - `Unexpected data received from notify broker: ${message}` - ) - - try { - deserializedMessage = JSON.parse(message) - } catch { - reject(error) - } - - const isValidNotifyResponse = VALID_MODELS.some(model => - isEqual(model, deserializedMessage) - ) - if (!isValidNotifyResponse) { - reject(error) - } else { - resolve(JSON.parse(message)) - } - }) -} diff --git a/app-shell/src/notifications/store.ts b/app-shell/src/notifications/store.ts index 63195d62d23..9968080258e 100644 --- a/app-shell/src/notifications/store.ts +++ b/app-shell/src/notifications/store.ts @@ -1,4 +1,3 @@ -/* eslint-disable @typescript-eslint/no-dynamic-delete */ import type mqtt from 'mqtt' import { FAILURE_STATUSES } from '../constants' diff --git a/app/src/redux/shell/__tests__/actions.test.ts b/app/src/redux/shell/__tests__/actions.test.ts index 7edf16f1b64..127e64503e0 100644 --- a/app/src/redux/shell/__tests__/actions.test.ts +++ b/app/src/redux/shell/__tests__/actions.test.ts @@ -1,10 +1,6 @@ import { describe, it, expect } from 'vitest' -import { - uiInitialized, - notifySubscribeAction, - notifyUnsubscribeAction, -} from '../actions' +import { uiInitialized, notifySubscribeAction } from '../actions' import type { NotifyTopic } from '../types' @@ -28,14 +24,4 @@ describe('shell actions', () => { meta: { shell: true }, }) }) - it('should be able to create an UNSUBSCRIBE action', () => { - expect(notifyUnsubscribeAction(MOCK_HOSTNAME, MOCK_TOPIC)).toEqual({ - type: 'shell:NOTIFY_UNSUBSCRIBE', - payload: { - hostname: MOCK_HOSTNAME, - topic: MOCK_TOPIC, - }, - meta: { shell: true }, - }) - }) }) diff --git a/app/src/redux/shell/actions.ts b/app/src/redux/shell/actions.ts index cf910b5c67e..7922eebef4c 100644 --- a/app/src/redux/shell/actions.ts +++ b/app/src/redux/shell/actions.ts @@ -9,7 +9,6 @@ import type { RobotMassStorageDeviceEnumerated, RobotMassStorageDeviceRemoved, NotifySubscribeAction, - NotifyUnsubscribeAction, NotifyTopic, } from './types' @@ -31,8 +30,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV 'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED' export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' = 'shell:NOTIFY_SUBSCRIBE' -export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' = - 'shell:NOTIFY_UNSUBSCRIBE' export const uiInitialized = (): UiInitializedAction => ({ type: UI_INITIALIZED, @@ -124,15 +121,3 @@ export const notifySubscribeAction = ( }, meta: { shell: true }, }) - -export const notifyUnsubscribeAction = ( - hostname: string, - topic: NotifyTopic -): NotifyUnsubscribeAction => ({ - type: NOTIFY_UNSUBSCRIBE, - payload: { - hostname, - topic, - }, - meta: { shell: true }, -}) diff --git a/app/src/redux/shell/types.ts b/app/src/redux/shell/types.ts index 81e85b7c628..b051ec43cbc 100644 --- a/app/src/redux/shell/types.ts +++ b/app/src/redux/shell/types.ts @@ -142,8 +142,6 @@ export type NotifyTopic = | 'robot-server/runs' | `robot-server/runs/${string}` -export type NotifyAction = 'subscribe' | 'unsubscribe' - export interface NotifySubscribeAction { type: 'shell:NOTIFY_SUBSCRIBE' payload: { @@ -153,15 +151,6 @@ export interface NotifySubscribeAction { meta: { shell: true } } -export interface NotifyUnsubscribeAction { - type: 'shell:NOTIFY_UNSUBSCRIBE' - payload: { - hostname: string - topic: NotifyTopic - } - meta: { shell: true } -} - export type ShellAction = | UiInitializedAction | ShellUpdateAction @@ -175,4 +164,3 @@ export type ShellAction = | RobotMassStorageDeviceEnumerated | RobotMassStorageDeviceRemoved | NotifySubscribeAction - | NotifyUnsubscribeAction diff --git a/app/src/resources/__tests__/useNotifyService.test.ts b/app/src/resources/__tests__/useNotifyService.test.ts index 8363691d61c..02fa1c36d68 100644 --- a/app/src/resources/__tests__/useNotifyService.test.ts +++ b/app/src/resources/__tests__/useNotifyService.test.ts @@ -7,10 +7,7 @@ import { useHost } from '@opentrons/react-api-client' import { useNotifyService } from '../useNotifyService' import { appShellListener } from '../../redux/shell/remote' import { useTrackEvent } from '../../redux/analytics' -import { - notifySubscribeAction, - notifyUnsubscribeAction, -} from '../../redux/shell' +import { notifySubscribeAction } from '../../redux/shell' import { useIsFlex } from '../../organisms/Devices/hooks/useIsFlex' import type { Mock } from 'vitest' @@ -44,6 +41,7 @@ describe('useNotifyService', () => { vi.mocked(useDispatch).mockReturnValue(mockDispatch) vi.mocked(useHost).mockReturnValue(MOCK_HOST_CONFIG) vi.mocked(useIsFlex).mockReturnValue(true) + vi.mocked(appShellListener).mockClear() }) afterEach(() => { @@ -69,20 +67,6 @@ describe('useNotifyService', () => { expect(mockAppShellListener).toHaveBeenCalled() }) - it('should trigger an unsubscribe action on dismount', () => { - const { unmount } = renderHook(() => - useNotifyService({ - topic: MOCK_TOPIC, - setRefetchUsingHTTP: mockHTTPRefetch, - options: MOCK_OPTIONS, - } as any) - ) - unmount() - expect(mockDispatch).toHaveBeenCalledWith( - notifyUnsubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) - ) - }) - it('should not subscribe to notifications if forceHttpPolling is true', () => { renderHook(() => useNotifyService({ diff --git a/app/src/resources/useNotifyService.ts b/app/src/resources/useNotifyService.ts index 63c887fb9b5..f6cfaefa2b8 100644 --- a/app/src/resources/useNotifyService.ts +++ b/app/src/resources/useNotifyService.ts @@ -5,7 +5,7 @@ import { useDispatch } from 'react-redux' import { useHost } from '@opentrons/react-api-client' import { appShellListener } from '../redux/shell/remote' -import { notifySubscribeAction, notifyUnsubscribeAction } from '../redux/shell' +import { notifySubscribeAction } from '../redux/shell' import { useTrackEvent, ANALYTICS_NOTIFICATION_PORT_BLOCK_ERROR, @@ -62,13 +62,12 @@ export function useNotifyService({ }) dispatch(notifySubscribeAction(hostname, topic)) hasUsedNotifyService.current = true - } else setRefetchUsingHTTP('always') + } else { + setRefetchUsingHTTP('always') + } return () => { if (hasUsedNotifyService.current) { - if (hostname != null) { - dispatch(notifyUnsubscribeAction(hostname, topic)) - } appShellListener({ hostname: hostname as string, topic,