From 2fb099caf73465d487c52183987c358ca98da2db Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Wed, 20 Mar 2024 17:16:25 -0400 Subject: [PATCH] feedback --- app-shell/src/notifications/connect.ts | 35 ++-- app-shell/src/notifications/index.ts | 2 +- app-shell/src/notifications/store.ts | 177 +++++++++------------ app-shell/src/notifications/subscribe.ts | 4 +- app-shell/src/notifications/unsubscribe.ts | 8 +- 5 files changed, 94 insertions(+), 132 deletions(-) diff --git a/app-shell/src/notifications/connect.ts b/app-shell/src/notifications/connect.ts index bbe5893efa5..9347c289461 100644 --- a/app-shell/src/notifications/connect.ts +++ b/app-shell/src/notifications/connect.ts @@ -51,12 +51,12 @@ export function cleanUpUnreachableRobots( healthyRobots: RobotData[] ): Promise { return new Promise(() => { - const healthyRobotIPs = healthyRobots.map(({ ip }) => ip) - const healthyRobotIPsSet = new Set(healthyRobotIPs) + const healthyRobotNames = healthyRobots.map(({ robotName }) => robotName) + const healthyRobotNamesSet = new Set(healthyRobotNames) const unreachableRobots = connectionStore - .getReachableHosts() - .filter(hostname => { - return !healthyRobotIPsSet.has(hostname) + .getAllBrokersInStore() + .filter(robotName => { + return !healthyRobotNamesSet.has(robotName) }) void closeConnectionsForcefullyFor(unreachableRobots) }) @@ -67,31 +67,20 @@ export function establishConnections( ): Promise { return new Promise(() => { const newConnections = healthyRobots.filter(({ ip, robotName }) => { - if (connectionStore.isConnectedToBroker(ip)) { + if (connectionStore.isConnectedToBroker(robotName)) { return false - } else if (connectionStore.isAssociatedWithExistingHostData(robotName)) { - if (!connectionStore.isAssociatedBrokerErrored(robotName)) { - // If not connected, wait for another poll of discovery-client to resolve the current connection. - if (connectionStore.isAssociatedBrokerConnected(robotName)) { - void connectionStore.associateIPWithExistingHostData(ip, robotName) - } + } else { + connectionStore.associateIPWithRobotName(ip, robotName) + if (!connectionStore.isConnectionTerminated(robotName)) { return false } else { - // Mark this IP as a new potential broker connection to check if the broker is reachable on this IP. - if (!connectionStore.isKnownPortBlockedIP(ip)) { - void connectionStore.deleteAllAssociatedIPsGivenRobotName(robotName) - return true - } else { - return false - } + return !connectionStore.isKnownPortBlockedIP(ip) } - } else { - return !connectionStore.isKnownPortBlockedIP(ip) } }) newConnections.forEach(({ ip, robotName }) => { void connectionStore - .setPendingConnection(ip, robotName) + .setPendingConnection(robotName) .then(() => { connectAsync(`mqtt://${ip}`) .then(client => { @@ -188,6 +177,8 @@ function establishListeners( client.on('end', () => { notifyLog.debug(`Closed connection to ${robotName} on ${ip}`) + // Marking the connection as failed with a generic error status lets the connection re-establish in the future + // and tells the browser to fall back to polling (assuming this 'end' event isn't caused by the app closing). void connectionStore.setErrorStatus(ip, FAILURE_STATUSES.ECONNFAILED) }) diff --git a/app-shell/src/notifications/index.ts b/app-shell/src/notifications/index.ts index a281e02f7b2..221addea9f6 100644 --- a/app-shell/src/notifications/index.ts +++ b/app-shell/src/notifications/index.ts @@ -54,7 +54,7 @@ export function closeAllNotifyConnections(): Promise { notifyLog.debug('Stopping notify service connections') const closeConnections = closeConnectionsForcefullyFor( - connectionStore.getReachableHosts() + connectionStore.getAllBrokersInStore() ) Promise.all(closeConnections).then(resolve).catch(reject) }) diff --git a/app-shell/src/notifications/store.ts b/app-shell/src/notifications/store.ts index 29c8e7733cb..78d4b85fd58 100644 --- a/app-shell/src/notifications/store.ts +++ b/app-shell/src/notifications/store.ts @@ -1,6 +1,5 @@ /* eslint-disable @typescript-eslint/no-dynamic-delete */ import type mqtt from 'mqtt' -import head from 'lodash/head' import { FAILURE_STATUSES } from '../constants' @@ -22,7 +21,9 @@ interface HostData { * Manages the internal state of MQTT connections to various robot hosts. */ class ConnectionStore { - private hosts: Record = {} + private hostsByRobotName: Record = {} + + private robotNamesByIP: Record = {} private browserWindow: BrowserWindow | null = null @@ -32,13 +33,14 @@ class ConnectionStore { return this.browserWindow } - public getReachableHosts(): string[] { - return Object.keys(this.hosts) + public getAllBrokersInStore(): string[] { + return Object.keys(this.hostsByRobotName) } public getClient(ip: string): mqtt.MqttClient | null { - if (ip in this.hosts) { - return this.hosts[ip].client + const hostData = this.getHostDataByIP(ip) + if (hostData != null) { + return hostData.client } else { return null } @@ -50,10 +52,12 @@ class ConnectionStore { * for analytics reasons. Afterward, a generic "ECONNFAILED" is returned. */ public getFailedConnectionStatus(ip: string): FailedConnStatus | null { - if (ip in this.hosts) { - const failureStatus = this.hosts[ip].unreachableStatus + const robotName = this.getRobotNameByIP(ip) + if (robotName != null) { + const failureStatus = this.hostsByRobotName[robotName].unreachableStatus if (failureStatus === FAILURE_STATUSES.ECONNREFUSED) { - this.hosts[ip].unreachableStatus = FAILURE_STATUSES.ECONNFAILED + this.hostsByRobotName[robotName].unreachableStatus = + FAILURE_STATUSES.ECONNFAILED } return failureStatus } else { @@ -61,26 +65,18 @@ class ConnectionStore { } } - public getAssociatedIPsFromRobotName(robotName: string): string[] { - return Object.keys(this.hosts).filter( - ip => this.hosts[ip].robotName === robotName - ) - } - - public getRobotNameFromIP(ip: string): string | null { - if (ip in this.hosts) { - return this.hosts[ip].robotName - } else return null + public getRobotNameByIP(ip: string): string | null { + return this.robotNamesByIP[ip] ?? null } public setBrowserWindow(window: BrowserWindow): void { this.browserWindow = window } - public setPendingConnection(ip: string, robotName: string): Promise { + public setPendingConnection(robotName: string): Promise { return new Promise((resolve, reject) => { - if (!this.isAssociatedBrokerConnecting(robotName)) { - this.hosts[ip] = { + if (!this.isConnectingToBroker(robotName)) { + this.hostsByRobotName[robotName] = { robotName, client: null, subscriptions: new Set(), @@ -92,21 +88,24 @@ class ConnectionStore { } else { reject( new Error( - 'Cannot create a new connection while connecting on an associated IP.' + 'Cannot create a new connection while currently connecting.' ) ) } }) } - public setConnected(ip: string, client: mqtt.MqttClient): Promise { + public setConnected( + robotName: string, + client: mqtt.MqttClient + ): Promise { return new Promise((resolve, reject) => { - if (ip in this.hosts) { - if (this.hosts[ip].client == null) { - this.hosts[ip].client = client + if (robotName in this.hostsByRobotName) { + if (this.hostsByRobotName[robotName].client == null) { + this.hostsByRobotName[robotName].client = client resolve() } else { - reject(new Error(`Connection already exists for ${ip}`)) + reject(new Error(`Connection already exists for ${robotName}`)) } } else { reject(new Error('IP is not associated with a connection')) @@ -121,20 +120,21 @@ class ConnectionStore { */ public setErrorStatus(ip: string, errorMessage: string): Promise { return new Promise((resolve, reject) => { - if (ip in this.hosts) { - if (this.hosts[ip].unreachableStatus == null) { + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + if (this.hostsByRobotName[robotName].unreachableStatus == null) { const errorStatus = errorMessage?.includes( FAILURE_STATUSES.ECONNREFUSED ) ? FAILURE_STATUSES.ECONNREFUSED : FAILURE_STATUSES.ECONNFAILED - this.hosts[ip].unreachableStatus = errorStatus + this.hostsByRobotName[ip].unreachableStatus = errorStatus if (errorStatus === FAILURE_STATUSES.ECONNREFUSED) { this.knownPortBlockedIPs.add(ip) } - resolve() } + resolve() } else { reject(new Error(`${ip} is not associated with a connection`)) } @@ -147,8 +147,9 @@ class ConnectionStore { status: 'pending' | 'subscribed' ): Promise { return new Promise((resolve, reject) => { - if (ip in this.hosts) { - const { pendingSubs, subscriptions } = this.hosts[ip] + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingSubs, subscriptions } = this.hostsByRobotName[robotName] if (status === 'pending') { pendingSubs.add(topic) } else { @@ -162,14 +163,17 @@ class ConnectionStore { }) } - public setUnubStatus( + public setUnsubStatus( ip: string, topic: NotifyTopic, status: 'pending' | 'unsubscribed' ): Promise { return new Promise((resolve, reject) => { - if (ip in this.hosts) { - const { pendingUnsubs, subscriptions } = this.hosts[ip] + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingUnsubs, subscriptions } = this.hostsByRobotName[ + robotName + ] if (subscriptions.has(topic)) { if (status === 'pending') { pendingUnsubs.add(topic) @@ -185,72 +189,30 @@ class ConnectionStore { }) } - /** - * - * @description Creates a new hosts entry for a given IP with HostData that is a reference to an existing - * IP's HostData. This occurs when two IPs reported by discovery-client actually reference the same broker. - */ - public associateIPWithExistingHostData( - ip: string, - robotName: string - ): Promise { - return new Promise((resolve, reject) => { - const associatedHost = Object.values(this.hosts).find( - hostData => hostData.robotName === robotName - ) - if (associatedHost != null) { - this.hosts[ip] = associatedHost - resolve() - } else { - reject(new Error('No associated IP found.')) - } - }) - } - - public deleteAllAssociatedIPsGivenRobotName( - robotName: string - ): Promise { - return new Promise((resolve, reject) => { - const associatedHosts = this.getAssociatedIPsFromRobotName(robotName) - associatedHosts.forEach(hostname => { - delete this.hosts[hostname] - }) - resolve() - }) - } - - public isAssociatedWithExistingHostData(robotName: string): boolean { - return this.getAssociatedIPsFromRobotName(robotName).length > 0 - } - - public isAssociatedBrokerErrored(robotName: string): boolean { - const associatedRobots = this.getAssociatedIPsFromRobotName(robotName) - return this.isBrokerErrored(head(associatedRobots) as string) - } - - public isAssociatedBrokerConnected(robotName: string): boolean { - const associatedIPs = this.getAssociatedIPsFromRobotName(robotName) - return this.isConnectedToBroker(head(associatedIPs) as string) - } - - public isAssociatedBrokerConnecting(robotName: string): boolean { - const associatedIPs = this.getAssociatedIPsFromRobotName(robotName) - return this.isConnectingToBroker(head(associatedIPs) as string) + public associateIPWithRobotName(ip: string, robotName: string): void { + const robotNameInStore = this.robotNamesByIP[ip] + if (robotNameInStore !== robotName) { + this.robotNamesByIP[ip] = robotName + } } - public isConnectedToBroker(ip: string): boolean { - return this.hosts[ip]?.client?.connected ?? false + public isConnectedToBroker(robotName: string): boolean { + return robotName != null + ? this.hostsByRobotName[robotName]?.client?.connected ?? false + : false } - public isConnectingToBroker(ip: string): boolean { + public isConnectingToBroker(robotName: string): boolean { return ( - (this.hosts[ip]?.client == null ?? false) && !this.isBrokerErrored(ip) + (this.hostsByRobotName[robotName]?.client == null ?? false) && + !this.isConnectionTerminated(robotName) ) } public isPendingSub(ip: string, topic: NotifyTopic): boolean { - if (ip in this.hosts) { - const { pendingSubs } = this.hosts[ip] + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingSubs } = this.hostsByRobotName[robotName] return pendingSubs.has(topic) } else { return false @@ -258,8 +220,9 @@ class ConnectionStore { } public isActiveSub(ip: string, topic: NotifyTopic): boolean { - if (ip in this.hosts) { - const { subscriptions } = this.hosts[ip] + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { subscriptions } = this.hostsByRobotName[robotName] return subscriptions.has(topic) } else { return false @@ -267,8 +230,9 @@ class ConnectionStore { } public isPendingUnsub(ip: string, topic: NotifyTopic): boolean { - if (ip in this.hosts) { - const { pendingUnsubs } = this.hosts[ip] + const robotName = this.getRobotNameByIP(ip) + if (robotName != null && robotName in this.hostsByRobotName) { + const { pendingUnsubs } = this.hostsByRobotName[robotName] return pendingUnsubs.has(topic) } else { return false @@ -277,11 +241,11 @@ class ConnectionStore { /** * - * @description Reachable refers to whether the broker connection has returned an error. + * @description A broker connection is terminated if it is errored or not present in the store. */ - public isBrokerErrored(ip: string): boolean { - if (ip in this.hosts) { - return this.hosts[ip].unreachableStatus != null + public isConnectionTerminated(robotName: string): boolean { + if (robotName in this.hostsByRobotName) { + return this.hostsByRobotName[robotName].unreachableStatus != null } else { return true } @@ -291,8 +255,13 @@ class ConnectionStore { return this.knownPortBlockedIPs.has(ip) } - public isIPInStore(ip: string): boolean { - return ip in this.hosts + private getHostDataByIP(ip: string): HostData | null { + if (ip in this.robotNamesByIP) { + const robotName = this.robotNamesByIP[ip] + return this.hostsByRobotName[robotName] ?? null + } else { + return null + } } } diff --git a/app-shell/src/notifications/subscribe.ts b/app-shell/src/notifications/subscribe.ts index 5ccb755ea77..c9123414fb2 100644 --- a/app-shell/src/notifications/subscribe.ts +++ b/app-shell/src/notifications/subscribe.ts @@ -17,9 +17,9 @@ const subscribeOptions: mqtt.IClientSubscribeOptions = { const CHECK_CONNECTION_INTERVAL = 500 export function subscribe(ip: string, topic: NotifyTopic): Promise { - const robotName = connectionStore.getRobotNameFromIP(ip) + const robotName = connectionStore.getRobotNameByIP(ip) - if (connectionStore.isBrokerErrored(ip)) { + if (connectionStore.isConnectionTerminated(ip)) { const errorMessage = connectionStore.getFailedConnectionStatus(ip) if (errorMessage != null) { sendDeserialized({ diff --git a/app-shell/src/notifications/unsubscribe.ts b/app-shell/src/notifications/unsubscribe.ts index 258324ed405..8a0f3d032cd 100644 --- a/app-shell/src/notifications/unsubscribe.ts +++ b/app-shell/src/notifications/unsubscribe.ts @@ -7,7 +7,7 @@ export function unsubscribe(ip: string, topic: NotifyTopic): Promise { return new Promise((resolve, reject) => { if (!connectionStore.isPendingUnsub(ip, topic)) { connectionStore - .setUnubStatus(ip, topic, 'pending') + .setUnsubStatus(ip, topic, 'pending') .then(() => { const client = connectionStore.getClient(ip) if (client == null) { @@ -15,7 +15,7 @@ export function unsubscribe(ip: string, topic: NotifyTopic): Promise { } client.unsubscribe(topic, {}, (error, result) => { - const robotName = connectionStore.getRobotNameFromIP(ip) + const robotName = connectionStore.getRobotNameByIP(ip) if (error != null) { notifyLog.debug( `Failed to unsubscribe to ${robotName} on ${ip} from topic: ${topic}` @@ -24,7 +24,9 @@ export function unsubscribe(ip: string, topic: NotifyTopic): Promise { notifyLog.debug( `Successfully unsubscribed to ${robotName} on ${ip} from topic: ${topic}` ) - connectionStore.setUnubStatus(ip, topic, 'unsubscribed') + connectionStore + .setUnsubStatus(ip, topic, 'unsubscribed') + .catch((error: Error) => notifyLog.debug(error.message)) } }) })