Skip to content

Commit

Permalink
fix(app, app-shell, app-shell-odd): block initial HTTP request until …
Browse files Browse the repository at this point in the history
…successful MQTT subscription (#15094)

Closes EXEC-429

Currently, the desktop app/ODD attempt to address the "missed update problem" in the following way: while we subscribe to a topic, we simultaneously GET whatever equivalent HTTP resource we just subscribed to. However, there's definitely a world (albeit a very small one) in which we receive the HTTP response, a server update occurs, the server publishes, and then we successfully subscribe to a topic. In this world, we've missed the update event.

Solve for this by simply refetching right after the client subscribe ACKs. We still keep the initial fetch on mount to keep latency low.
  • Loading branch information
mjhuff authored May 7, 2024
1 parent d9aa18f commit 80cfe7e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 16 deletions.
4 changes: 4 additions & 0 deletions app-shell-odd/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ export function sendDeserialized(
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedRefetch(topic: NotifyTopic): void {
sendDeserialized(topic, { refetch: true })
}

export function sendDeserializedGenericError(topic: NotifyTopic): void {
sendDeserialized(topic, FAILURE_STATUSES.ECONNFAILED)
}
Expand Down
24 changes: 16 additions & 8 deletions app-shell-odd/src/notifications/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import { sendDeserialized, sendDeserializedGenericError } from './deserialize'
import {
sendDeserialized,
sendDeserializedGenericError,
sendDeserializedRefetch,
} from './deserialize'
import { notifyLog } from './notifyLog'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'
Expand Down Expand Up @@ -30,8 +34,8 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
if (client == null) {
return Promise.reject(new Error('Expected hostData, received null.'))
}

if (
// The first time the client wants to subscribe on a robot to a particular topic.
else if (
!connectionStore.isActiveSub(topic) &&
!connectionStore.isPendingSub(topic)
) {
Expand All @@ -44,13 +48,15 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
})
)
.catch((error: Error) => notifyLog.debug(error.message))
} else {
void waitUntilActiveOrErrored('subscription', topic).catch(
(error: Error) => {
}
// The client is either already subscribed or the subscription is currently pending.
else {
void waitUntilActiveOrErrored('subscription', topic)
.then(() => sendDeserializedRefetch(topic))
.catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(topic)
}
)
})
}
})
.catch((error: Error) => {
Expand All @@ -74,6 +80,8 @@ export function subscribe(topic: NotifyTopic): Promise<void> {
connectionStore
.setSubStatus(topic, 'subscribed')
.catch((error: Error) => notifyLog.debug(error.message))

sendDeserializedRefetch(topic)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions app-shell/src/notifications/deserialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export function sendDeserialized({
} catch {} // Prevents shell erroring during app shutdown event.
}

export function sendDeserializedRefetch(ip: string, topic: NotifyTopic): void {
sendDeserialized({
ip,
topic,
message: { refetch: true },
})
}

export function sendDeserializedGenericError(
ip: string,
topic: NotifyTopic
Expand Down
24 changes: 17 additions & 7 deletions app-shell/src/notifications/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import mqtt from 'mqtt'

import { connectionStore } from './store'
import { sendDeserialized, sendDeserializedGenericError } from './deserialize'
import {
sendDeserialized,
sendDeserializedGenericError,
sendDeserializedRefetch,
} from './deserialize'
import { notifyLog } from './notifyLog'

import type { NotifyTopic } from '@opentrons/app/src/redux/shell/types'
Expand Down Expand Up @@ -36,8 +40,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
if (client == null) {
return Promise.reject(new Error('Expected hostData, received null.'))
}

if (
// The first time the client wants to subscribe on a robot to a particular topic.
else if (
!connectionStore.isActiveSub(robotName, topic) &&
!connectionStore.isPendingSub(robotName, topic)
) {
Expand All @@ -50,16 +54,20 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
})
)
.catch((error: Error) => notifyLog.debug(error.message))
} else {
}
// The client is either already subscribed or the subscription is currently pending.
else {
void waitUntilActiveOrErrored({
connection: 'subscription',
ip,
robotName,
topic,
}).catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(ip, topic)
})
.then(() => sendDeserializedRefetch(ip, topic))
.catch((error: Error) => {
notifyLog.debug(error.message)
sendDeserializedGenericError(ip, topic)
})
}
})
.catch((error: Error) => {
Expand All @@ -81,6 +89,8 @@ export function subscribe(ip: string, topic: NotifyTopic): Promise<void> {
connectionStore
.setSubStatus(ip, topic, 'subscribed')
.catch((error: Error) => notifyLog.debug(error.message))

sendDeserializedRefetch(ip, topic)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/src/resources/useNotifyService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function useNotifyService<TData, TError = Error>({

React.useEffect(() => {
if (shouldUseNotifications) {
// Always fetch on initial mount.
// Always fetch on initial mount to keep latency as low as possible.
setRefetch('once')
appShellListener({
hostname,
Expand Down

0 comments on commit 80cfe7e

Please sign in to comment.