diff --git a/app-shell/src/__fixtures__/index.ts b/app-shell/src/__fixtures__/index.ts index f934b01b6f5..90f50c9a737 100644 --- a/app-shell/src/__fixtures__/index.ts +++ b/app-shell/src/__fixtures__/index.ts @@ -1 +1,2 @@ export * from './config' +export * from './robots' diff --git a/app-shell/src/__fixtures__/robots.ts b/app-shell/src/__fixtures__/robots.ts new file mode 100644 index 00000000000..183dc7d0ff3 --- /dev/null +++ b/app-shell/src/__fixtures__/robots.ts @@ -0,0 +1,123 @@ +import { HEALTH_STATUS_NOT_OK, HEALTH_STATUS_OK } from '../constants' + +export const mockLegacyHealthResponse = { + name: 'opentrons-dev', + api_version: '1.2.3', + fw_version: '4.5.6', + system_version: '7.8.9', + robot_model: 'OT-2 Standard', +} + +export const mockLegacyServerHealthResponse = { + name: 'opentrons-dev', + apiServerVersion: '1.2.3', + serialNumber: '12345', + updateServerVersion: '1.2.3', + smoothieVersion: '4.5.6', + systemVersion: '7.8.9', +} + +export const MOCK_DISCOVERY_ROBOTS = [ + { + name: 'opentrons-dev', + health: mockLegacyHealthResponse, + serverHealth: mockLegacyServerHealthResponse, + addresses: [ + { + ip: '10.14.19.50', + port: 31950, + seen: true, + healthStatus: HEALTH_STATUS_OK, + serverHealthStatus: HEALTH_STATUS_OK, + healthError: null, + serverHealthError: null, + advertisedModel: null, + }, + ], + }, + { + name: 'opentrons-dev2', + health: mockLegacyHealthResponse, + serverHealth: mockLegacyServerHealthResponse, + addresses: [ + { + ip: '10.14.19.51', + port: 31950, + seen: true, + healthStatus: HEALTH_STATUS_OK, + serverHealthStatus: HEALTH_STATUS_OK, + healthError: null, + serverHealthError: null, + advertisedModel: null, + }, + ], + }, + { + name: 'opentrons-dev3', + health: mockLegacyHealthResponse, + serverHealth: mockLegacyServerHealthResponse, + addresses: [ + { + ip: '10.14.19.52', + port: 31950, + seen: true, + healthStatus: HEALTH_STATUS_NOT_OK, + serverHealthStatus: HEALTH_STATUS_NOT_OK, + healthError: null, + serverHealthError: null, + advertisedModel: null, + }, + ], + }, + { + name: 'opentrons-dev4', + health: mockLegacyHealthResponse, + serverHealth: mockLegacyServerHealthResponse, + addresses: [ + { + ip: '10.14.19.53', + port: 31950, + seen: true, + healthStatus: HEALTH_STATUS_OK, + serverHealthStatus: HEALTH_STATUS_OK, + healthError: null, + serverHealthError: null, + advertisedModel: null, + }, + ], + }, +] + +export const MOCK_STORE_ROBOTS = [ + { + robotName: 'opentrons-dev', + ip: '10.14.19.50', + }, + { + robotName: 'opentrons-dev2', + ip: '10.14.19.51', + }, + { + robotName: 'opentrons-dev3', + ip: '10.14.19.52', + }, + { + robotName: 'opentrons-dev4', + ip: '10.14.19.53', + }, +] + +export const MOCK_HEALTHY_ROBOTS = [ + { + robotName: 'opentrons-dev', + ip: '10.14.19.50', + }, + { + robotName: 'opentrons-dev2', + ip: '10.14.19.51', + }, + { + robotName: 'opentrons-dev4', + ip: '10.14.19.53', + }, +] diff --git a/app-shell/src/__tests__/discovery.test.ts b/app-shell/src/__tests__/discovery.test.ts index bd9db44c3ee..166020c2125 100644 --- a/app-shell/src/__tests__/discovery.test.ts +++ b/app-shell/src/__tests__/discovery.test.ts @@ -27,6 +27,7 @@ vi.mock('../log', () => { }, } }) +vi.mock('../notifications') let mockGet = vi.fn(property => { return [] diff --git a/app-shell/src/config/actions.ts b/app-shell/src/config/actions.ts index ef1958044f6..eabc9b47a16 100644 --- a/app-shell/src/config/actions.ts +++ b/app-shell/src/config/actions.ts @@ -76,7 +76,6 @@ import { VALUE_UPDATED, VIEW_PROTOCOL_SOURCE_FOLDER, NOTIFY_SUBSCRIBE, - NOTIFY_UNSUBSCRIBE, ROBOT_MASS_STORAGE_DEVICE_ADDED, ROBOT_MASS_STORAGE_DEVICE_ENUMERATED, ROBOT_MASS_STORAGE_DEVICE_REMOVED, @@ -99,7 +98,6 @@ import type { AppRestartAction, NotifySubscribeAction, NotifyTopic, - NotifyUnsubscribeAction, ReloadUiAction, RobotMassStorageDeviceAdded, RobotMassStorageDeviceEnumerated, @@ -421,15 +419,3 @@ export const notifySubscribeAction = ( }, meta: { shell: true }, }) - -export const notifyUnsubscribeAction = ( - hostname: string, - topic: NotifyTopic -): NotifyUnsubscribeAction => ({ - type: NOTIFY_UNSUBSCRIBE, - payload: { - hostname, - topic, - }, - meta: { shell: true }, -}) diff --git a/app-shell/src/constants.ts b/app-shell/src/constants.ts index 66deaab5839..3e86c503c83 100644 --- a/app-shell/src/constants.ts +++ b/app-shell/src/constants.ts @@ -225,8 +225,6 @@ export const ROBOT_MASS_STORAGE_DEVICE_ENUMERATED: 'shell:ROBOT_MASS_STORAGE_DEV 'shell:ROBOT_MASS_STORAGE_DEVICE_ENUMERATED' export const NOTIFY_SUBSCRIBE: 'shell:NOTIFY_SUBSCRIBE' = 'shell:NOTIFY_SUBSCRIBE' -export const NOTIFY_UNSUBSCRIBE: 'shell:NOTIFY_UNSUBSCRIBE' = - 'shell:NOTIFY_UNSUBSCRIBE' // copy // TODO(mc, 2020-05-11): i18n @@ -247,3 +245,9 @@ export const DISCOVERY_UPDATE_LIST: DISCOVERY_UPDATE_LIST_TYPE = export const DISCOVERY_REMOVE: DISCOVERY_REMOVE_TYPE = 'discovery:REMOVE' export const CLEAR_CACHE: CLEAR_CACHE_TYPE = 'discovery:CLEAR_CACHE' +export const HEALTH_STATUS_OK: 'ok' = 'ok' +export const HEALTH_STATUS_NOT_OK: 'notOk' = 'notOk' +export const FAILURE_STATUSES = { + ECONNREFUSED: 'ECONNREFUSED', + ECONNFAILED: 'ECONNFAILED', +} as const diff --git a/app-shell/src/discovery.ts b/app-shell/src/discovery.ts index d099ef9d99b..f7e90bf0fd9 100644 --- a/app-shell/src/discovery.ts +++ b/app-shell/src/discovery.ts @@ -9,19 +9,22 @@ import { DEFAULT_PORT, } from '@opentrons/discovery-client' import { - CLEAR_CACHE, - DISCOVERY_FINISH, - DISCOVERY_REMOVE, - DISCOVERY_START, - OPENTRONS_USB, UI_INITIALIZED, USB_HTTP_REQUESTS_START, USB_HTTP_REQUESTS_STOP, -} from './constants' +} from '@opentrons/app/src/redux/shell/actions' +import { + DISCOVERY_START, + DISCOVERY_FINISH, + DISCOVERY_REMOVE, + CLEAR_CACHE, +} from '@opentrons/app/src/redux/discovery/actions' +import { OPENTRONS_USB } from '@opentrons/app/src/redux/discovery/constants' import { getFullConfig, handleConfigChange } from './config' import { createLogger } from './log' import { getSerialPortHttpAgent } from './usb' +import { handleNotificationConnectionsFor } from './notifications' import type { Address, @@ -29,7 +32,6 @@ import type { LegacyService, DiscoveryClient, } from '@opentrons/discovery-client' - import type { Action, Dispatch } from './types' import type { ConfigV1 } from '@opentrons/app/src/redux/config/schema-types' @@ -199,6 +201,7 @@ export function registerDiscovery( function handleRobots(): void { const robots = client.getRobots() + handleNotificationConnectionsFor(robots) if (!disableCache) store.set('robots', robots) diff --git a/app-shell/src/main.ts b/app-shell/src/main.ts index b198f1705bd..75f301fa718 100644 --- a/app-shell/src/main.ts +++ b/app-shell/src/main.ts @@ -19,7 +19,7 @@ import { registerProtocolStorage } from './protocol-storage' import { getConfig, getStore, getOverrides, registerConfig } from './config' import { registerUsb } from './usb' import { createUsbDeviceMonitor } from './system-info/usb-devices' -import { registerNotify, closeAllNotifyConnections } from './notify' +import { registerNotify, closeAllNotifyConnections } from './notifications' import type { BrowserWindow } from 'electron' import type { Dispatch, Logger } from './types' diff --git a/app-shell/src/notifications/__tests__/connect.test.ts b/app-shell/src/notifications/__tests__/connect.test.ts new file mode 100644 index 00000000000..12c41464353 --- /dev/null +++ b/app-shell/src/notifications/__tests__/connect.test.ts @@ -0,0 +1,115 @@ +import { vi, describe, expect, it } from 'vitest' + +import { + getHealthyRobotDataForNotifyConnections, + cleanUpUnreachableRobots, + establishConnections, + closeConnectionsForcefullyFor, +} from '../connect' +import { connectionStore } from '../store' +import { FAILURE_STATUSES } from '../../constants' +import { + MOCK_DISCOVERY_ROBOTS, + MOCK_HEALTHY_ROBOTS, + MOCK_STORE_ROBOTS, +} from '../../__fixtures__' + +vi.mock('electron-store') +vi.mock('../notifyLog', () => { + return { + createLogger: () => { + return { debug: () => null } + }, + notifyLog: { debug: vi.fn(), warn: vi.fn() }, + } +}) + +describe('getHealthyRobotDataForNotifyConnections', () => { + it('should filter a list of discovery robots, only returning robots that have a health status of ok', () => { + const healthyRobots = getHealthyRobotDataForNotifyConnections( + MOCK_DISCOVERY_ROBOTS + ) + expect(healthyRobots).toEqual(MOCK_HEALTHY_ROBOTS) + }) +}) + +describe('cleanUpUnreachableRobots', () => { + it('should close connections forcefully for unreachable robots and resolve them', async () => { + MOCK_STORE_ROBOTS.forEach(robot => { + void connectionStore + .setPendingConnection(robot.robotName) + .then(() => + connectionStore.setConnected(robot.robotName, vi.fn() as any) + ) + }) + const unreachableRobots = await cleanUpUnreachableRobots( + MOCK_HEALTHY_ROBOTS + ) + expect(unreachableRobots).toEqual(['opentrons-dev3']) + }) +}) + +describe('establishConnections', () => { + it('should not resolve any new connections if all reported robots are already in the connection store and connected', async () => { + connectionStore.clearStore() + MOCK_STORE_ROBOTS.forEach(robot => { + void connectionStore + .setPendingConnection(robot.robotName) + .then(() => + connectionStore.setConnected(robot.robotName, vi.fn() as any) + ) + }) + + const newRobots = await establishConnections(MOCK_HEALTHY_ROBOTS) + expect(newRobots).toEqual([]) + }) + + it('should not attempt to connect to a robot if it a known notification port blocked robot', async () => { + await connectionStore.setErrorStatus( + '10.14.19.51', + FAILURE_STATUSES.ECONNREFUSED + ) + connectionStore.clearStore() + + const newRobots = await establishConnections(MOCK_HEALTHY_ROBOTS) + expect(newRobots).toEqual([ + { ip: '10.14.19.50', robotName: 'opentrons-dev' }, + { ip: '10.14.19.53', robotName: 'opentrons-dev4' }, + ]) + }) + + it('should not report a robot as new if it is connecting', async () => { + connectionStore.clearStore() + MOCK_STORE_ROBOTS.forEach(robot => { + void connectionStore.setPendingConnection(robot.robotName) + }) + + const newRobots = await establishConnections(MOCK_HEALTHY_ROBOTS) + expect(newRobots).toEqual([]) + }) + + it('should create a new entry in the connection store for a new robot', async () => { + connectionStore.clearStore() + await establishConnections(MOCK_HEALTHY_ROBOTS) + console.log(connectionStore) + expect(connectionStore.getRobotNameByIP('10.14.19.50')).not.toBeNull() + }) +}) + +describe('closeConnectionsForcefullyFor', () => { + it('should return an array of promises for each closing connection and resolve after closing connections', async () => { + connectionStore.clearStore() + MOCK_STORE_ROBOTS.forEach(robot => { + void connectionStore + .setPendingConnection(robot.robotName) + .then(() => + connectionStore.setConnected(robot.robotName, vi.fn() as any) + ) + }) + const closingRobots = closeConnectionsForcefullyFor([ + 'opentrons-dev', + 'opentrons-dev2', + ]) + closingRobots.forEach(robot => expect(robot).toBeInstanceOf(Promise)) + }) +}) diff --git a/app-shell/src/notifications/__tests__/deserialize.test.ts b/app-shell/src/notifications/__tests__/deserialize.test.ts new file mode 100644 index 00000000000..9c6642d3931 --- /dev/null +++ b/app-shell/src/notifications/__tests__/deserialize.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, it } from 'vitest' + +import { deserializeExpectedMessages } from '../deserialize' + +import type { NotifyResponseData } from '@opentrons/app/src/redux/shell/types' + +const MOCK_VALID_RESPONSE: NotifyResponseData = { refetchUsingHTTP: true } +const MOCK_VALID_STRING_RESPONSE = JSON.stringify(MOCK_VALID_RESPONSE) +const MOCK_INVALID_OBJECT = JSON.stringify({ test: 'MOCK_RESPONSE' }) +const MOCK_INVALID_STRING = 'MOCK_STRING' + +describe('closeConnectionsForcefullyFor', () => { + it('should resolve with the deserialized message if it is a valid notify response', async () => { + const response = await deserializeExpectedMessages( + MOCK_VALID_STRING_RESPONSE + ) + expect(response).toEqual(MOCK_VALID_RESPONSE) + }) + + it('should reject with an error if the deserialized message is not a valid notify response', async () => { + const responsePromise = deserializeExpectedMessages(MOCK_INVALID_OBJECT) + await expect(responsePromise).rejects.toThrowError( + 'Unexpected data received from notify broker: {"test":"MOCK_RESPONSE"}' + ) + }) + + it('should reject with an error if the message cannot be deserialized', async () => { + const responsePromise = deserializeExpectedMessages(MOCK_INVALID_STRING) + await expect(responsePromise).rejects.toThrowError( + 'Unexpected data received from notify broker: MOCK_STRING' + ) + }) +}) diff --git a/app-shell/src/notifications/__tests__/notifications.test.ts b/app-shell/src/notifications/__tests__/notifications.test.ts new file mode 100644 index 00000000000..5fdd521aa0b --- /dev/null +++ b/app-shell/src/notifications/__tests__/notifications.test.ts @@ -0,0 +1,65 @@ +import { vi, describe, it, expect, beforeEach } from 'vitest' + +import { registerNotify, closeAllNotifyConnections } from '..' +import { connectionStore } from '../store' +import { subscribe } from '../subscribe' +import { closeConnectionsForcefullyFor } from '../connect' + +import type { Mock } from 'vitest' + +vi.mock('electron-store') +vi.mock('../store') +vi.mock('../subscribe') +vi.mock('../connect') +vi.mock('../notifyLog', () => { + return { + createLogger: () => { + return { debug: () => null } + }, + notifyLog: { debug: vi.fn() }, + } +}) + +const MOCK_ACTION = { + type: 'shell:NOTIFY_SUBSCRIBE', + payload: { hostname: 'localhost', topic: 'ALL_TOPICS' }, + meta: { shell: true }, +} as any + +describe('registerNotify', () => { + let dispatch: Mock + let mainWindow: Mock + + beforeEach(() => { + dispatch = vi.fn() + mainWindow = vi.fn() + }) + + it('should set browser window when connectionStore has no browser window', () => { + registerNotify(dispatch, mainWindow as any)(MOCK_ACTION) + + expect(connectionStore.setBrowserWindow).toHaveBeenCalledWith(mainWindow) + }) + + it('should subscribe when action type is shell:NOTIFY_SUBSCRIBE', () => { + registerNotify(dispatch, mainWindow as any)(MOCK_ACTION) + + expect(vi.mocked(subscribe)).toHaveBeenCalledWith( + MOCK_ACTION.payload.hostname, + MOCK_ACTION.payload.topic + ) + }) +}) + +describe('closeAllNotifyConnections', () => { + it('should reject with an error when failed to close all connections within the time limit', async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }) + vi.mocked(closeConnectionsForcefullyFor).mockResolvedValue([]) + const promise = closeAllNotifyConnections() + vi.advanceTimersByTime(2000) + + await expect(promise).rejects.toThrowError( + 'Failed to close all connections within the time limit.' + ) + }) +}) diff --git a/app-shell/src/notifications/connect.ts b/app-shell/src/notifications/connect.ts new file mode 100644 index 00000000000..bcaf24e6e3d --- /dev/null +++ b/app-shell/src/notifications/connect.ts @@ -0,0 +1,209 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { + sendDeserialized, + sendDeserializedGenericError, + deserializeExpectedMessages, +} from './deserialize' +import { unsubscribe } from './unsubscribe' +import { notifyLog } from './notifyLog' +import { FAILURE_STATUSES, HEALTH_STATUS_OK } from '../constants' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' +import type { DiscoveryClientRobot } from '@opentrons/discovery-client' + +// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. +const CLIENT_ID = 'app-' + Math.random().toString(16).slice(2, 8) // Derived from mqttjs +const connectOptions: mqtt.IClientOptions = { + clientId: CLIENT_ID, + port: 1883, + keepalive: 60, + protocolVersion: 5, + reconnectPeriod: 1000, + connectTimeout: 30 * 1000, + clean: true, + resubscribe: true, +} + +export interface RobotData { + ip: string + robotName: string +} + +// This is the discovery-client equivalent of "available" robots when viewing the Devices page in the app. +export function getHealthyRobotDataForNotifyConnections( + robots: DiscoveryClientRobot[] +): RobotData[] { + return robots.flatMap(robot => + robot.addresses + .filter(address => address.healthStatus === HEALTH_STATUS_OK) + .map(address => ({ ip: address.ip, robotName: robot.name })) + ) +} + +/** + * + * @description Remove broker connections from the connection store by forcibly disconnecting from brokers + * as robots are no longer discoverable. + */ +export function cleanUpUnreachableRobots( + healthyRobots: RobotData[] +): Promise { + return new Promise((resolve, reject) => { + const healthyRobotNames = healthyRobots.map(({ robotName }) => robotName) + const healthyRobotNamesSet = new Set(healthyRobotNames) + const unreachableRobots = connectionStore + .getAllBrokersInStore() + .filter(robotName => { + return !healthyRobotNamesSet.has(robotName) + }) + void closeConnectionsForcefullyFor(unreachableRobots) + resolve(unreachableRobots) + }) +} + +export function establishConnections( + healthyRobots: RobotData[] +): Promise { + return new Promise((resolve, reject) => { + const newConnections = healthyRobots.filter(({ ip, robotName }) => { + if (connectionStore.isConnectedToBroker(robotName)) { + return false + } else { + connectionStore.associateIPWithRobotName(ip, robotName) + // True when a robot is connecting. + if (!connectionStore.isConnectionTerminated(robotName)) { + return false + } else { + return !connectionStore.isKnownPortBlockedIP(ip) + } + } + }) + newConnections.forEach(({ ip, robotName }) => { + void connectionStore + .setPendingConnection(robotName) + .then(() => { + connectAsync(`mqtt://${ip}`) + .then(client => { + notifyLog.debug(`Successfully connected to ${robotName} on ${ip}`) + void connectionStore + .setConnected(robotName, client) + .then(() => establishListeners(client, ip, robotName)) + .catch((error: Error) => notifyLog.debug(error.message)) + }) + .catch((error: Error) => { + notifyLog.warn( + `Failed to connect to ${robotName} on ${ip} - ${error.name}: ${error.message} ` + ) + void connectionStore.setErrorStatus(ip, error.message) + }) + }) + .catch((error: Error) => notifyLog.debug(error.message)) + }) + resolve(newConnections) + }) +} + +function connectAsync(brokerURL: string): Promise { + const client = mqtt.connect(brokerURL, connectOptions) + + return new Promise((resolve, reject) => { + // Listeners added to client to trigger promise resolution + const promiseListeners: { + [key: string]: (...args: any[]) => void + } = { + connect: () => { + removePromiseListeners() + return resolve(client) + }, + // A connection error event will close the connection without a retry. + error: (error: Error | string) => { + removePromiseListeners() + const clientEndPromise = new Promise((resolve, reject) => + client.end(true, {}, () => resolve(error)) + ) + return clientEndPromise.then(() => reject(error)) + }, + end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), + } + + function removePromiseListeners(): void { + Object.keys(promiseListeners).forEach(eventName => { + client.removeListener(eventName, promiseListeners[eventName]) + }) + } + + Object.keys(promiseListeners).forEach(eventName => { + client.on(eventName, promiseListeners[eventName]) + }) + }) +} + +function establishListeners( + client: mqtt.MqttClient, + ip: string, + robotName: string +): void { + client.on( + 'message', + (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { + deserializeExpectedMessages(message.toString()) + .then(deserializedMessage => { + const messageContainsUnsubFlag = 'unsubscribe' in deserializedMessage + if (messageContainsUnsubFlag) { + void unsubscribe(ip, topic).catch((error: Error) => + notifyLog.debug(error.message) + ) + } + + notifyLog.debug('Received notification data from main via IPC', { + ip, + topic, + }) + + sendDeserialized({ ip, topic, message: deserializedMessage }) + }) + .catch(error => notifyLog.debug(`${error.message}`)) + } + ) + + client.on('reconnect', () => { + notifyLog.debug(`Attempting to reconnect to ${robotName} on ${ip}`) + }) + // handles transport layer errors only + client.on('error', error => { + notifyLog.warn(`Error - ${error.name}: ${error.message}`) + sendDeserializedGenericError(ip, 'ALL_TOPICS') + client.end() + }) + + client.on('end', () => { + notifyLog.debug(`Closed connection to ${robotName} on ${ip}`) + // Marking the connection as failed with a generic error status lets the connection re-establish in the future + // and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing). + void connectionStore.setErrorStatus(ip, FAILURE_STATUSES.ECONNFAILED) + }) + + client.on('disconnect', packet => { + notifyLog.warn( + `Disconnected from ${robotName} on ${ip} with code ${ + packet.reasonCode ?? 'undefined' + }` + ) + sendDeserializedGenericError(ip, 'ALL_TOPICS') + }) +} + +export function closeConnectionsForcefullyFor( + robotNames: string[] +): Array> { + return robotNames.map(ip => { + const client = connectionStore.getClient(ip) + return new Promise((resolve, reject) => { + if (client != null) { + client.end(true, {}, () => resolve()) + } + }) + }) +} diff --git a/app-shell/src/notifications/deserialize.ts b/app-shell/src/notifications/deserialize.ts new file mode 100644 index 00000000000..c96d6d19203 --- /dev/null +++ b/app-shell/src/notifications/deserialize.ts @@ -0,0 +1,71 @@ +import isEqual from 'lodash/isEqual' + +import { connectionStore } from './store' + +import type { + NotifyBrokerResponses, + NotifyRefetchData, + NotifyResponseData, + NotifyTopic, + NotifyUnsubscribeData, +} from '@opentrons/app/src/redux/shell/types' +import { FAILURE_STATUSES } from '../constants' + +interface SendToBrowserParams { + ip: string + topic: NotifyTopic + message: NotifyResponseData +} + +const VALID_NOTIFY_RESPONSES: [NotifyRefetchData, NotifyUnsubscribeData] = [ + { refetchUsingHTTP: true }, + { unsubscribe: true }, +] + +export function sendDeserialized({ + ip, + topic, + message, +}: SendToBrowserParams): void { + try { + const browserWindow = connectionStore.getBrowserWindow() + browserWindow?.webContents.send('notify', ip, topic, message) + } catch {} // Prevents shell erroring during app shutdown event. +} + +export function sendDeserializedGenericError( + ip: string, + topic: NotifyTopic +): void { + sendDeserialized({ + ip, + topic, + message: FAILURE_STATUSES.ECONNFAILED, + }) +} + +export function deserializeExpectedMessages( + message: string +): Promise { + return new Promise((resolve, reject) => { + let deserializedMessage: NotifyResponseData | Record + const error = new Error( + `Unexpected data received from notify broker: ${message}` + ) + + try { + deserializedMessage = JSON.parse(message) + } catch { + reject(error) + } + + const isValidNotifyResponse = VALID_NOTIFY_RESPONSES.some(model => + isEqual(model, deserializedMessage) + ) + if (!isValidNotifyResponse) { + reject(error) + } else { + resolve(JSON.parse(message)) + } + }) +} diff --git a/app-shell/src/notifications/index.ts b/app-shell/src/notifications/index.ts new file mode 100644 index 00000000000..221addea9f6 --- /dev/null +++ b/app-shell/src/notifications/index.ts @@ -0,0 +1,61 @@ +import { connectionStore } from './store' +import { + establishConnections, + cleanUpUnreachableRobots, + getHealthyRobotDataForNotifyConnections, + closeConnectionsForcefullyFor, + RobotData, +} from './connect' +import { subscribe } from './subscribe' +import { notifyLog } from './notifyLog' + +import type { DiscoveryClientRobot } from '@opentrons/discovery-client' +import type { BrowserWindow } from 'electron' +import type { Action, Dispatch } from '../types' + +// Manages MQTT broker connections through a connection store. Broker connections are added based on health status +// reported by discovery-client and broker connectivity status reported by MQTT. Because a robot may have several IPs, +// only the first reported IP that results in a successful broker connection maintains an active connection. +// All associated IPs reference the active connection. Subscriptions are handled "lazily" - a component must +// dispatch a subscribe action before a subscription request is made to the broker. Unsubscribe requests only occur if +// the broker sends an "unsubscribe" flag. Pending subs and unsubs are used to prevent unnecessary network and broker load. + +export function registerNotify( + dispatch: Dispatch, + mainWindow: BrowserWindow +): (action: Action) => unknown { + if (connectionStore.getBrowserWindow() == null) { + connectionStore.setBrowserWindow(mainWindow) + } + + return function handleAction(action: Action) { + switch (action.type) { + case 'shell:NOTIFY_SUBSCRIBE': + return subscribe(action.payload.hostname, action.payload.topic) + } + } +} + +export function handleNotificationConnectionsFor( + robots: DiscoveryClientRobot[] +): RobotData[] { + const reachableRobots = getHealthyRobotDataForNotifyConnections(robots) + void cleanUpUnreachableRobots(reachableRobots) + void establishConnections(reachableRobots) + + return reachableRobots +} + +export function closeAllNotifyConnections(): Promise { + return new Promise((resolve, reject) => { + setTimeout(() => { + reject(Error('Failed to close all connections within the time limit.')) + }, 2000) + + notifyLog.debug('Stopping notify service connections') + const closeConnections = closeConnectionsForcefullyFor( + connectionStore.getAllBrokersInStore() + ) + Promise.all(closeConnections).then(resolve).catch(reject) + }) +} diff --git a/app-shell/src/notifications/notifyLog.ts b/app-shell/src/notifications/notifyLog.ts new file mode 100644 index 00000000000..35507fa2c2a --- /dev/null +++ b/app-shell/src/notifications/notifyLog.ts @@ -0,0 +1,3 @@ +import { createLogger } from '../log' + +export const notifyLog = createLogger('notify') diff --git a/app-shell/src/notifications/store.ts b/app-shell/src/notifications/store.ts new file mode 100644 index 00000000000..63195d62d23 --- /dev/null +++ b/app-shell/src/notifications/store.ts @@ -0,0 +1,269 @@ +/* eslint-disable @typescript-eslint/no-dynamic-delete */ +import type mqtt from 'mqtt' + +import { FAILURE_STATUSES } from '../constants' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' +import type { BrowserWindow } from 'electron' + +type FailedConnStatus = typeof FAILURE_STATUSES[keyof typeof FAILURE_STATUSES] + +interface HostData { + client: mqtt.MqttClient | null + subscriptions: Set + pendingSubs: Set + pendingUnsubs: Set + unreachableStatus: FailedConnStatus | null +} + +/** + * @description Manages the internal state of MQTT connections to various robot hosts. + */ +class ConnectionStore { + private hostsByRobotName: Record = {} + + private robotNamesByIP: Record = {} + + private browserWindow: BrowserWindow | null = null + + private readonly knownPortBlockedIPs = new Set() + + public getBrowserWindow(): BrowserWindow | null { + return this.browserWindow + } + + public getAllBrokersInStore(): string[] { + return Object.keys(this.hostsByRobotName) + } + + public getClient(ip: string): mqtt.MqttClient | null { + const hostData = this.getHostDataByIP(ip) + if (hostData != null) { + return hostData.client + } else { + return null + } + } + + /** + * @returns {FailedConnStatus} "ECONNREFUSED" is a proxy for a port block error and is only returned once + * for analytics reasons. Afterward, a generic "ECONNFAILED" is returned. + */ + public getFailedConnectionStatus(ip: string): FailedConnStatus | null { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null) { + const failureStatus = this.hostsByRobotName[robotName].unreachableStatus + if (failureStatus === FAILURE_STATUSES.ECONNREFUSED) { + this.hostsByRobotName[robotName].unreachableStatus = + FAILURE_STATUSES.ECONNFAILED + } + return failureStatus + } else { + return null + } + } + + public getRobotNameByIP(ip: string): string | null { + return this.robotNamesByIP[ip] ?? null + } + + public setBrowserWindow(window: BrowserWindow): void { + this.browserWindow = window + } + + public setPendingConnection(robotName: string): Promise { + return new Promise((resolve, reject) => { + if (!this.isConnectingToBroker(robotName)) { + this.hostsByRobotName[robotName] = { + client: null, + subscriptions: new Set(), + pendingSubs: new Set(), + pendingUnsubs: new Set(), + unreachableStatus: null, + } + resolve() + } else { + reject( + new Error( + 'Cannot create a new connection while currently connecting.' + ) + ) + } + }) + } + + public setConnected( + robotName: string, + client: mqtt.MqttClient + ): Promise { + return new Promise((resolve, reject) => { + if (robotName in this.hostsByRobotName) { + if (this.hostsByRobotName[robotName].client == null) { + this.hostsByRobotName[robotName].client = client + resolve() + } else { + reject(new Error(`Connection already exists for ${robotName}`)) + } + } else { + reject(new Error('IP is not associated with a connection')) + } + }) + } + + /** + * @description Marks the host as unreachable with an error status derived from the MQTT returned error object. + */ + public setErrorStatus(ip: string, errorMessage: string): Promise { + return new Promise((resolve, reject) => { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + if (this.hostsByRobotName[robotName].unreachableStatus == null) { + const errorStatus = errorMessage?.includes( + FAILURE_STATUSES.ECONNREFUSED + ) + ? FAILURE_STATUSES.ECONNREFUSED + : FAILURE_STATUSES.ECONNFAILED + + this.hostsByRobotName[robotName].unreachableStatus = errorStatus + if (errorStatus === FAILURE_STATUSES.ECONNREFUSED) { + this.knownPortBlockedIPs.add(ip) + } + } + resolve() + } else { + reject(new Error(`${ip} is not associated with a connection`)) + } + }) + } + + public setSubStatus( + ip: string, + topic: NotifyTopic, + status: 'pending' | 'subscribed' + ): Promise { + return new Promise((resolve, reject) => { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingSubs, subscriptions } = this.hostsByRobotName[robotName] + if (status === 'pending') { + pendingSubs.add(topic) + } else { + subscriptions.add(topic) + pendingSubs.delete(topic) + } + resolve() + } else { + reject(new Error('IP is not associated with a connection')) + } + }) + } + + public setUnsubStatus( + ip: string, + topic: NotifyTopic, + status: 'pending' | 'unsubscribed' + ): Promise { + return new Promise((resolve, reject) => { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingUnsubs, subscriptions } = this.hostsByRobotName[ + robotName + ] + if (subscriptions.has(topic)) { + if (status === 'pending') { + pendingUnsubs.add(topic) + } else { + pendingUnsubs.delete(topic) + subscriptions.delete(topic) + } + } + resolve() + } else { + reject(new Error('IP is not associated with a connection')) + } + }) + } + + public associateIPWithRobotName(ip: string, robotName: string): void { + const robotNameInStore = this.robotNamesByIP[ip] + if (robotNameInStore !== robotName) { + this.robotNamesByIP[ip] = robotName + } + } + + /** + * @description Used for testing purposes. + */ + public clearStore(): void { + this.hostsByRobotName = {} + this.robotNamesByIP = {} + this.browserWindow = null + } + + public isConnectedToBroker(robotName: string): boolean { + return robotName != null + ? this.hostsByRobotName[robotName]?.client?.connected ?? false + : false + } + + public isConnectingToBroker(robotName: string): boolean { + return ( + (this.hostsByRobotName[robotName]?.client == null ?? false) && + !this.isConnectionTerminated(robotName) + ) + } + + public isPendingSub(robotName: string, topic: NotifyTopic): boolean { + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingSubs } = this.hostsByRobotName[robotName] + return pendingSubs.has(topic) + } else { + return false + } + } + + public isActiveSub(robotName: string, topic: NotifyTopic): boolean { + if (robotName != null && robotName in this.hostsByRobotName) { + const { subscriptions } = this.hostsByRobotName[robotName] + return subscriptions.has(topic) + } else { + return false + } + } + + public isPendingUnsub(ip: string, topic: NotifyTopic): boolean { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingUnsubs } = this.hostsByRobotName[robotName] + return pendingUnsubs.has(topic) + } else { + return false + } + } + + /** + * @description A broker connection is terminated if it is errored or not present in the store. + */ + public isConnectionTerminated(robotName: string): boolean { + if (robotName in this.hostsByRobotName) { + return this.hostsByRobotName[robotName].unreachableStatus != null + } else { + return true + } + } + + public isKnownPortBlockedIP(ip: string): boolean { + return this.knownPortBlockedIPs.has(ip) + } + + private getHostDataByIP(ip: string): HostData | null { + if (ip in this.robotNamesByIP) { + const robotName = this.robotNamesByIP[ip] + return this.hostsByRobotName[robotName] ?? null + } else { + return null + } + } +} + +export const connectionStore = new ConnectionStore() diff --git a/app-shell/src/notifications/subscribe.ts b/app-shell/src/notifications/subscribe.ts new file mode 100644 index 00000000000..895a010406e --- /dev/null +++ b/app-shell/src/notifications/subscribe.ts @@ -0,0 +1,136 @@ +import mqtt from 'mqtt' + +import { connectionStore } from './store' +import { sendDeserialized, sendDeserializedGenericError } from './deserialize' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +/** + * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger + a render update event if duplicate data is received, we can avoid the additional overhead of guaranteeing "exactly once" delivery. + */ +const subscribeOptions: mqtt.IClientSubscribeOptions = { + qos: 1, +} + +const CHECK_CONNECTION_INTERVAL = 500 + +export function subscribe(ip: string, topic: NotifyTopic): Promise { + const robotName = connectionStore.getRobotNameByIP(ip) + + if (robotName == null || connectionStore.isConnectionTerminated(robotName)) { + const errorMessage = connectionStore.getFailedConnectionStatus(ip) + if (errorMessage != null) { + sendDeserialized({ + ip, + topic, + message: errorMessage, + }) + } + return Promise.resolve() + } else { + return waitUntilActiveOrErrored({ connection: 'client', ip, robotName }) + .then(() => { + const client = connectionStore.getClient(ip) + if (client == null) { + return Promise.reject(new Error('Expected hostData, received null.')) + } + + if ( + !connectionStore.isActiveSub(robotName, topic) && + !connectionStore.isPendingSub(robotName, topic) + ) { + connectionStore + .setSubStatus(ip, topic, 'pending') + .then( + () => + new Promise(() => { + client.subscribe(topic, subscribeOptions, subscribeCb) + }) + ) + .catch((error: Error) => notifyLog.debug(error.message)) + } else { + void waitUntilActiveOrErrored({ + connection: 'subscription', + ip, + robotName, + topic, + }).catch((error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(ip, topic) + }) + } + }) + .catch((error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(ip, topic) + }) + } + + function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { + if (error != null) { + sendDeserializedGenericError(ip, topic) + notifyLog.debug( + `Failed to subscribe to ${robotName} on ${ip} to topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully subscribed to ${robotName} on ${ip} to topic: ${topic}` + ) + connectionStore + .setSubStatus(ip, topic, 'subscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + } +} + +interface WaitUntilActiveOrErroredParams { + connection: 'client' | 'subscription' + ip: string + robotName: string + topic?: NotifyTopic +} + +// Check every 500ms for 2 seconds before failing. +function waitUntilActiveOrErrored({ + connection, + ip, + robotName, + topic, +}: WaitUntilActiveOrErroredParams): Promise { + return new Promise((resolve, reject) => { + if (connection === 'subscription') { + if (topic == null) { + reject( + new Error( + 'Must specify a topic when connection is type "subscription".' + ) + ) + } + } + + const MAX_RETRIES = 4 + let counter = 0 + const intervalId = setInterval(() => { + const hasReceivedAck = + connection === 'client' + ? connectionStore.isConnectedToBroker(robotName) + : connectionStore.isActiveSub(robotName, topic as NotifyTopic) + if (hasReceivedAck) { + clearInterval(intervalId) + resolve() + } + + counter++ + if (counter === MAX_RETRIES) { + clearInterval(intervalId) + reject( + new Error( + `Maximum number of retries exceeded for ${robotName} on ${ip}.` + ) + ) + } + }, CHECK_CONNECTION_INTERVAL) + }) +} diff --git a/app-shell/src/notifications/unsubscribe.ts b/app-shell/src/notifications/unsubscribe.ts new file mode 100644 index 00000000000..8a0f3d032cd --- /dev/null +++ b/app-shell/src/notifications/unsubscribe.ts @@ -0,0 +1,36 @@ +import { connectionStore } from './store' +import { notifyLog } from './notifyLog' + +import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' + +export function unsubscribe(ip: string, topic: NotifyTopic): Promise { + return new Promise((resolve, reject) => { + if (!connectionStore.isPendingUnsub(ip, topic)) { + connectionStore + .setUnsubStatus(ip, topic, 'pending') + .then(() => { + const client = connectionStore.getClient(ip) + if (client == null) { + return reject(new Error('Expected hostData, received null.')) + } + + client.unsubscribe(topic, {}, (error, result) => { + const robotName = connectionStore.getRobotNameByIP(ip) + if (error != null) { + notifyLog.debug( + `Failed to unsubscribe to ${robotName} on ${ip} from topic: ${topic}` + ) + } else { + notifyLog.debug( + `Successfully unsubscribed to ${robotName} on ${ip} from topic: ${topic}` + ) + connectionStore + .setUnsubStatus(ip, topic, 'unsubscribed') + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) + }) + .catch((error: Error) => notifyLog.debug(error.message)) + } + }) +} diff --git a/app-shell/src/notify.ts b/app-shell/src/notify.ts deleted file mode 100644 index 3de2281a385..00000000000 --- a/app-shell/src/notify.ts +++ /dev/null @@ -1,453 +0,0 @@ -/* eslint-disable @typescript-eslint/no-dynamic-delete */ -import mqtt from 'mqtt' -import isEqual from 'lodash/isEqual' - -import { createLogger } from './log' - -import type { BrowserWindow } from 'electron' -import type { - NotifyTopic, - NotifyResponseData, - NotifyRefetchData, - NotifyUnsubscribeData, - NotifyNetworkError, -} from '@opentrons/app/src/redux/shell/types' -import type { Action, Dispatch } from './types' - -// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs. - -// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not -// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker. -// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid. -// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic. -// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails. - -const FAILURE_STATUSES = { - ECONNREFUSED: 'ECONNREFUSED', - ECONNFAILED: 'ECONNFAILED', -} as const - -interface ConnectionStore { - [hostname: string]: { - client: mqtt.MqttClient | null - subscriptions: Record - pendingSubs: Set - } -} - -const connectionStore: ConnectionStore = {} -const unreachableHosts = new Set() -const log = createLogger('notify') -// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. -// This clientId is derived from the mqttjs library. -const CLIENT_ID = 'app-' + Math.random().toString(16).slice(2, 8) - -const connectOptions: mqtt.IClientOptions = { - clientId: CLIENT_ID, - port: 1883, - keepalive: 60, - protocolVersion: 5, - reconnectPeriod: 1000, - connectTimeout: 30 * 1000, - clean: true, - resubscribe: true, -} - -/** - * @property {number} qos: "Quality of Service", "at least once". Because we use React Query, which does not trigger - a render update event if duplicate data is received, we can avoid the additional overhead - to guarantee "exactly once" delivery. - */ -const subscribeOptions: mqtt.IClientSubscribeOptions = { - qos: 1, -} - -export function registerNotify( - dispatch: Dispatch, - mainWindow: BrowserWindow -): (action: Action) => unknown { - return function handleAction(action: Action) { - switch (action.type) { - case 'shell:NOTIFY_SUBSCRIBE': - return subscribe({ - ...action.payload, - browserWindow: mainWindow, - }) - - case 'shell:NOTIFY_UNSUBSCRIBE': - return unsubscribe({ - ...action.payload, - browserWindow: mainWindow, - }) - } - } -} - -const CHECK_CONNECTION_INTERVAL = 500 -let hasReportedAPortBlockEvent = false - -interface NotifyParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic -} - -function subscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic, browserWindow } = notifyParams - if (unreachableHosts.has(hostname)) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - return Promise.resolve() - } - // true if no subscription (and therefore connection) to host exists - else if (connectionStore[hostname] == null) { - connectionStore[hostname] = { - client: null, - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - subscriptions: { [topic]: 1 } as Record, - pendingSubs: new Set(), - } - return connectAsync(`mqtt://${hostname}`) - .then(client => { - const { pendingSubs } = connectionStore[hostname] - log.info(`Successfully connected to ${hostname}`) - connectionStore[hostname].client = client - pendingSubs.add(topic) - establishListeners({ ...notifyParams, client }) - return new Promise(() => { - client.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - }) - .catch((error: Error) => { - log.warn( - `Failed to connect to ${hostname} - ${error.name}: ${error.message} ` - ) - let failureMessage: NotifyNetworkError = FAILURE_STATUSES.ECONNFAILED - if (connectionStore[hostname]?.client == null) { - unreachableHosts.add(hostname) - if ( - error.message.includes(FAILURE_STATUSES.ECONNREFUSED) && - !hasReportedAPortBlockEvent - ) { - failureMessage = FAILURE_STATUSES.ECONNREFUSED - hasReportedAPortBlockEvent = true - } - } - - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: failureMessage, - }) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - } - // true if the connection store has an entry for the hostname. - else { - return waitUntilActiveOrErrored('client') - .then(() => { - const { client, subscriptions, pendingSubs } = connectionStore[hostname] - const activeClient = client as mqtt.Client - const isNotActiveSubscription = (subscriptions[topic] ?? 0) <= 0 - if (!pendingSubs.has(topic) && isNotActiveSubscription) { - pendingSubs.add(topic) - return new Promise(() => { - activeClient.subscribe(topic, subscribeOptions, subscribeCb) - pendingSubs.delete(topic) - }) - } else { - void waitUntilActiveOrErrored('subscription') - .then(() => { - subscriptions[topic] += 1 - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - }) - .catch(() => { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) - } - function subscribeCb(error: Error, result: mqtt.ISubscriptionGrant[]): void { - const { subscriptions } = connectionStore[hostname] - if (error != null) { - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message: FAILURE_STATUSES.ECONNFAILED, - }) - setTimeout(() => { - if (Object.keys(connectionStore[hostname].subscriptions).length <= 0) { - connectionStore[hostname].client?.end() - } - }, RENDER_TIMEOUT) - } else { - if (subscriptions[topic] > 0) { - subscriptions[topic] += 1 - } else { - subscriptions[topic] = 1 - } - } - } - - // Check every 500ms for 2 seconds before failing. - function waitUntilActiveOrErrored( - connection: 'client' | 'subscription' - ): Promise { - return new Promise((resolve, reject) => { - const MAX_RETRIES = 4 - let counter = 0 - const intervalId = setInterval(() => { - const host = connectionStore[hostname] - const hasReceivedAck = - connection === 'client' - ? host?.client != null - : host?.subscriptions[topic] > 0 - if (hasReceivedAck) { - clearInterval(intervalId) - resolve() - } - - counter++ - if (counter === MAX_RETRIES) { - clearInterval(intervalId) - reject(new Error('Maximum subscription retries exceeded.')) - } - }, CHECK_CONNECTION_INTERVAL) - }) - } -} - -// Because subscription logic is directly tied to the component lifecycle, it is possible -// for a component to trigger an unsubscribe event on dismount while a new component mounts and -// triggers a subscribe event. For the connection store and MQTT to reflect correct topic subscriptions, -// do not unsubscribe and close connections before newly mounted components have had time to update the connection store. -const RENDER_TIMEOUT = 10000 // 10 seconds - -function unsubscribe(notifyParams: NotifyParams): Promise { - const { hostname, topic } = notifyParams - return new Promise(() => { - setTimeout(() => { - if (hostname in connectionStore) { - const { client } = connectionStore[hostname] - const subscriptions = connectionStore[hostname]?.subscriptions - const isLastSubscription = subscriptions[topic] <= 1 - - if (isLastSubscription) { - client?.unsubscribe(topic, {}, (error, result) => { - if (error == null) { - handleDecrementSubscriptionCount(hostname, topic) - } else { - log.warn(`Failed to subscribe on ${hostname} to topic: ${topic}`) - } - }) - } else { - subscriptions[topic] -= 1 - } - } - }, RENDER_TIMEOUT) - }) -} - -function connectAsync(brokerURL: string): Promise { - const client = mqtt.connect(brokerURL, connectOptions) - - return new Promise((resolve, reject) => { - // Listeners added to client to trigger promise resolution - const promiseListeners: { - [key: string]: (...args: any[]) => void - } = { - connect: () => { - removePromiseListeners() - return resolve(client) - }, - // A connection error event will close the connection without a retry. - error: (error: Error | string) => { - removePromiseListeners() - const clientEndPromise = new Promise((resolve, reject) => - client.end(true, {}, () => resolve(error)) - ) - return clientEndPromise.then(() => reject(error)) - }, - end: () => promiseListeners.error(`Couldn't connect to ${brokerURL}`), - } - - function removePromiseListeners(): void { - Object.keys(promiseListeners).forEach(eventName => { - client.removeListener(eventName, promiseListeners[eventName]) - }) - } - - Object.keys(promiseListeners).forEach(eventName => { - client.on(eventName, promiseListeners[eventName]) - }) - }) -} - -function handleDecrementSubscriptionCount( - hostname: string, - topic: NotifyTopic -): void { - const host = connectionStore[hostname] - if (host) { - const { client, subscriptions } = host - if (topic in subscriptions) { - subscriptions[topic] -= 1 - if (subscriptions[topic] <= 0) { - delete subscriptions[topic] - } - } - - if (Object.keys(subscriptions).length <= 0) { - client?.end() - } - } -} - -interface ListenerParams { - client: mqtt.MqttClient - browserWindow: BrowserWindow - hostname: string -} - -function establishListeners({ - client, - browserWindow, - hostname, -}: ListenerParams): void { - client.on( - 'message', - (topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { - deserialize(message.toString()) - .then(deserializedMessage => { - log.debug('Received notification data from main via IPC', { - hostname, - topic, - }) - - browserWindow.webContents.send( - 'notify', - hostname, - topic, - deserializedMessage - ) - }) - .catch(error => log.debug(`${error.message}`)) - } - ) - - client.on('reconnect', () => { - log.info(`Attempting to reconnect to ${hostname}`) - }) - // handles transport layer errors only - client.on('error', error => { - log.warn(`Error - ${error.name}: ${error.message}`) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - client.end() - }) - - client.on('end', () => { - log.info(`Closed connection to ${hostname}`) - if (hostname in connectionStore) delete connectionStore[hostname] - }) - - client.on('disconnect', packet => { - log.warn( - `Disconnected from ${hostname} with code ${ - packet.reasonCode ?? 'undefined' - }` - ) - sendToBrowserDeserialized({ - browserWindow, - hostname, - topic: 'ALL_TOPICS', - message: FAILURE_STATUSES.ECONNFAILED, - }) - }) -} - -export function closeAllNotifyConnections(): Promise { - return new Promise((resolve, reject) => { - setTimeout(() => { - reject(Error('Failed to close all connections within the time limit.')) - }, 2000) - - log.debug('Stopping notify service connections') - const closeConnections = Object.values(connectionStore).map( - ({ client }) => { - return new Promise((resolve, reject) => { - client?.end(true, {}, () => resolve(null)) - }) - } - ) - Promise.all(closeConnections).then(resolve).catch(reject) - }) -} - -interface SendToBrowserParams { - browserWindow: BrowserWindow - hostname: string - topic: NotifyTopic - message: NotifyResponseData -} - -function sendToBrowserDeserialized({ - browserWindow, - hostname, - topic, - message, -}: SendToBrowserParams): void { - browserWindow.webContents.send('notify', hostname, topic, message) -} - -const VALID_MODELS: [NotifyRefetchData, NotifyUnsubscribeData] = [ - { refetchUsingHTTP: true }, - { unsubscribe: true }, -] - -function deserialize(message: string): Promise { - return new Promise((resolve, reject) => { - let deserializedMessage: NotifyResponseData | Record - const error = new Error( - `Unexpected data received from notify broker: ${message}` - ) - - try { - deserializedMessage = JSON.parse(message) - } catch { - reject(error) - } - - const isValidNotifyResponse = VALID_MODELS.some(model => - isEqual(model, deserializedMessage) - ) - if (!isValidNotifyResponse) { - reject(error) - } else { - resolve(JSON.parse(message)) - } - }) -}