From 30fddab27ceac46541ba6a684fabd4ac0a3a39e1 Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Fri, 3 May 2024 15:52:36 -0400 Subject: [PATCH 1/2] fix(app, app-shell): block HTTP refetching until successful MQTT subscription Currently, the desktop app/ODD attempt to address the "missed update problem" in the following way: while we subscribe to a topic, we simultaneously GET whatever equivalent HTTP resource we just subscribed to. However, there's definitely a world (albeit a very small one) in which we receive the HTTP response, a server update occurs, the server publishes, and then we successfully subscribe to a topic. In this world, we've missed the update event. Solve for this by simply blocking the initial HTTP GET until we subscribe. While the subscribe handshake could theoretically take a maximum of 2 seconds (at which point we forcefully timeout the subscribe action and fallback to polling), in practice it's more like 250ms. We already handle failed connections and don't go through this handshake if we can't connect to the client to begin with, so the "wait 2 second until sub failure" scenario shouldn't realistically happen. This bug was a product of initially using retained messages during prototyping, however we removed retained messaging by MQTT launch. --- .../src/notifications/deserialize.ts | 4 ++++ app-shell-odd/src/notifications/subscribe.ts | 24 ++++++++++++------- app-shell/src/notifications/deserialize.ts | 8 +++++++ app-shell/src/notifications/subscribe.ts | 24 +++++++++++++------ .../__tests__/useNotifyService.test.ts | 3 +-- app/src/resources/useNotifyService.ts | 2 -- 6 files changed, 46 insertions(+), 19 deletions(-) diff --git a/app-shell-odd/src/notifications/deserialize.ts b/app-shell-odd/src/notifications/deserialize.ts index 01fd4bc933b..5a154ebee5f 100644 --- a/app-shell-odd/src/notifications/deserialize.ts +++ b/app-shell-odd/src/notifications/deserialize.ts @@ -31,6 +31,10 @@ export function sendDeserialized( } catch {} // Prevents shell erroring during app shutdown event. } +export function sendDeserializedRefetch(topic: NotifyTopic): void { + sendDeserialized(topic, { refetch: true }) +} + export function sendDeserializedGenericError(topic: NotifyTopic): void { sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED) } diff --git a/app-shell-odd/src/notifications/subscribe.ts b/app-shell-odd/src/notifications/subscribe.ts index 6e334cb89c9..4db9e3dbf9a 100644 --- a/app-shell-odd/src/notifications/subscribe.ts +++ b/app-shell-odd/src/notifications/subscribe.ts @@ -1,7 +1,11 @@ import mqtt from 'mqtt' import { connectionStore } from './store' -import { sendDeserialized, sendDeserializedGenericError } from './deserialize' +import { + sendDeserialized, + sendDeserializedGenericError, + sendDeserializedRefetch, +} from './deserialize' import { notifyLog } from './notifyLog' import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' @@ -30,8 +34,8 @@ export function subscribe(topic: NotifyTopic): Promise { if (client == null) { return Promise.reject(new Error('Expected hostData, received null.')) } - - if ( + // The first time the client wants to subscribe on a robot to a particular topic. + else if ( !connectionStore.isActiveSub(topic) && !connectionStore.isPendingSub(topic) ) { @@ -44,13 +48,15 @@ export function subscribe(topic: NotifyTopic): Promise { }) ) .catch((error: Error) => notifyLog.debug(error.message)) - } else { - void waitUntilActiveOrErrored('subscription', topic).catch( - (error: Error) => { + } + // The client is either already subscribed or the subscription is currently pending. + else { + void waitUntilActiveOrErrored('subscription', topic) + .then(() => sendDeserializedRefetch(topic)) + .catch((error: Error) => { notifyLog.debug(error.message) sendDeserializedGenericError(topic) - } - ) + }) } }) .catch((error: Error) => { @@ -74,6 +80,8 @@ export function subscribe(topic: NotifyTopic): Promise { connectionStore .setSubStatus(topic, 'subscribed') .catch((error: Error) => notifyLog.debug(error.message)) + + sendDeserializedRefetch(topic) } } } diff --git a/app-shell/src/notifications/deserialize.ts b/app-shell/src/notifications/deserialize.ts index 53752b32a0f..9743f1c3e5c 100644 --- a/app-shell/src/notifications/deserialize.ts +++ b/app-shell/src/notifications/deserialize.ts @@ -33,6 +33,14 @@ export function sendDeserialized({ } catch {} // Prevents shell erroring during app shutdown event. } +export function sendDeserializedRefetch(ip: string, topic: NotifyTopic): void { + sendDeserialized({ + ip, + topic, + message: { refetch: true }, + }) +} + export function sendDeserializedGenericError( ip: string, topic: NotifyTopic diff --git a/app-shell/src/notifications/subscribe.ts b/app-shell/src/notifications/subscribe.ts index 895a010406e..54ee43d92c6 100644 --- a/app-shell/src/notifications/subscribe.ts +++ b/app-shell/src/notifications/subscribe.ts @@ -1,7 +1,11 @@ import mqtt from 'mqtt' import { connectionStore } from './store' -import { sendDeserialized, sendDeserializedGenericError } from './deserialize' +import { + sendDeserialized, + sendDeserializedGenericError, + sendDeserializedRefetch, +} from './deserialize' import { notifyLog } from './notifyLog' import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types' @@ -36,8 +40,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise { if (client == null) { return Promise.reject(new Error('Expected hostData, received null.')) } - - if ( + // The first time the client wants to subscribe on a robot to a particular topic. + else if ( !connectionStore.isActiveSub(robotName, topic) && !connectionStore.isPendingSub(robotName, topic) ) { @@ -50,16 +54,20 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise { }) ) .catch((error: Error) => notifyLog.debug(error.message)) - } else { + } + // The client is either already subscribed or the subscription is currently pending. + else { void waitUntilActiveOrErrored({ connection: 'subscription', ip, robotName, topic, - }).catch((error: Error) => { - notifyLog.debug(error.message) - sendDeserializedGenericError(ip, topic) }) + .then(() => sendDeserializedRefetch(ip, topic)) + .catch((error: Error) => { + notifyLog.debug(error.message) + sendDeserializedGenericError(ip, topic) + }) } }) .catch((error: Error) => { @@ -81,6 +89,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise { connectionStore .setSubStatus(ip, topic, 'subscribed') .catch((error: Error) => notifyLog.debug(error.message)) + + sendDeserializedRefetch(ip, topic) } } } diff --git a/app/src/resources/__tests__/useNotifyService.test.ts b/app/src/resources/__tests__/useNotifyService.test.ts index f2bf6bb516f..11b3afcb93d 100644 --- a/app/src/resources/__tests__/useNotifyService.test.ts +++ b/app/src/resources/__tests__/useNotifyService.test.ts @@ -47,7 +47,7 @@ describe('useNotifyService', () => { vi.clearAllMocks() }) - it('should trigger an HTTP refetch and subscribe action on a successful initial mount', () => { + it('should trigger a subscribe action on a successful initial mount', () => { renderHook(() => useNotifyService({ topic: MOCK_TOPIC, @@ -55,7 +55,6 @@ describe('useNotifyService', () => { options: MOCK_OPTIONS, } as any) ) - expect(mockHTTPRefetch).toHaveBeenCalledWith('once') expect(mockDispatch).toHaveBeenCalledWith( notifySubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) ) diff --git a/app/src/resources/useNotifyService.ts b/app/src/resources/useNotifyService.ts index d8422ba786f..022ce0407b7 100644 --- a/app/src/resources/useNotifyService.ts +++ b/app/src/resources/useNotifyService.ts @@ -51,8 +51,6 @@ export function useNotifyService({ React.useEffect(() => { if (shouldUseNotifications) { - // Always fetch on initial mount. - setRefetch('once') appShellListener({ hostname, topic, From 2703967af57ebde27bfe6062d6aff7cd36ed87cf Mon Sep 17 00:00:00 2001 From: Jamey Huffnagle Date: Mon, 6 May 2024 11:08:41 -0400 Subject: [PATCH 2/2] fetch on initial mount to reduce client latency --- app/src/resources/__tests__/useNotifyService.test.ts | 3 ++- app/src/resources/useNotifyService.ts | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/app/src/resources/__tests__/useNotifyService.test.ts b/app/src/resources/__tests__/useNotifyService.test.ts index 11b3afcb93d..f2bf6bb516f 100644 --- a/app/src/resources/__tests__/useNotifyService.test.ts +++ b/app/src/resources/__tests__/useNotifyService.test.ts @@ -47,7 +47,7 @@ describe('useNotifyService', () => { vi.clearAllMocks() }) - it('should trigger a subscribe action on a successful initial mount', () => { + it('should trigger an HTTP refetch and subscribe action on a successful initial mount', () => { renderHook(() => useNotifyService({ topic: MOCK_TOPIC, @@ -55,6 +55,7 @@ describe('useNotifyService', () => { options: MOCK_OPTIONS, } as any) ) + expect(mockHTTPRefetch).toHaveBeenCalledWith('once') expect(mockDispatch).toHaveBeenCalledWith( notifySubscribeAction(MOCK_HOST_CONFIG.hostname, MOCK_TOPIC) ) diff --git a/app/src/resources/useNotifyService.ts b/app/src/resources/useNotifyService.ts index 022ce0407b7..619b797e527 100644 --- a/app/src/resources/useNotifyService.ts +++ b/app/src/resources/useNotifyService.ts @@ -51,6 +51,8 @@ export function useNotifyService({ React.useEffect(() => { if (shouldUseNotifications) { + // Always fetch on initial mount to keep latency as low as possible. + setRefetch('once') appShellListener({ hostname, topic,