-
Notifications
You must be signed in to change notification settings - Fork 180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(app-shell, app-shell-odd): Refactor app to use unsubscribe flags #14633
Changes from all commits
3565396
d3d2bd1
dbb66e4
10c464e
c7d40ee
a689fcd
9b6caad
ba3da20
33da3f7
a1ad5d7
d97e9b7
303012e
7afd091
ba30343
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,26 +11,27 @@ import type { Action, Dispatch } from './types' | |
// TODO(jh, 2024-03-01): after refactoring notify connectivity and subscription logic, uncomment logs. | ||
|
||
// Manages MQTT broker connections via a connection store, establishing a connection to the broker only if a connection does not | ||
// already exist, and disconnects from the broker when the app is not subscribed to any topics for the given broker. | ||
// A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid. | ||
// already exist. A redundant connection to the same broker results in the older connection forcibly closing, which we want to avoid. | ||
// However, redundant subscriptions are permitted and result in the broker sending the retained message for that topic. | ||
// To mitigate redundant connections, the connection manager eagerly adds the host, removing the host if the connection fails. | ||
|
||
const FAILURE_STATUSES = { | ||
ECONNREFUSED: 'ECONNREFUSED', | ||
ECONNFAILED: 'ECONNFAILED', | ||
} as const | ||
|
||
const LOCALHOST: '127.0.0.1' = '127.0.0.1' | ||
|
||
interface ConnectionStore { | ||
[hostname: string]: { | ||
client: mqtt.MqttClient | null | ||
subscriptions: Record<NotifyTopic, number> | ||
subscriptions: Set<NotifyTopic> | ||
pendingSubs: Set<NotifyTopic> | ||
} | ||
} | ||
|
||
const connectionStore: ConnectionStore = {} | ||
const unreachableHosts = new Set<string>() | ||
const pendingUnsubs = new Set<NotifyTopic>() | ||
const log = createLogger('notify') | ||
// MQTT is somewhat particular about the clientId format and will connect erratically if an unexpected string is supplied. | ||
// This clientId is derived from the mqttjs library. | ||
|
@@ -66,14 +67,7 @@ export function registerNotify( | |
return subscribe({ | ||
...action.payload, | ||
browserWindow: mainWindow, | ||
hostname: '127.0.0.1', | ||
}) | ||
|
||
case 'shell:NOTIFY_UNSUBSCRIBE': | ||
return unsubscribe({ | ||
...action.payload, | ||
browserWindow: mainWindow, | ||
hostname: '127.0.0.1', | ||
hostname: LOCALHOST, | ||
}) | ||
} | ||
} | ||
|
@@ -103,8 +97,7 @@ function subscribe(notifyParams: NotifyParams): Promise<void> { | |
else if (connectionStore[hostname] == null) { | ||
connectionStore[hostname] = { | ||
client: null, | ||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions | ||
subscriptions: { [topic]: 1 } as Record<NotifyTopic, number>, | ||
subscriptions: new Set(), | ||
pendingSubs: new Set(), | ||
} | ||
return connectAsync(`mqtt://${hostname}`) | ||
|
@@ -149,27 +142,24 @@ function subscribe(notifyParams: NotifyParams): Promise<void> { | |
return waitUntilActiveOrErrored('client') | ||
.then(() => { | ||
const { client, subscriptions, pendingSubs } = connectionStore[hostname] | ||
const activeClient = client as mqtt.Client | ||
const isNotActiveSubscription = (subscriptions[topic] ?? 0) <= 0 | ||
if (!pendingSubs.has(topic) && isNotActiveSubscription) { | ||
pendingSubs.add(topic) | ||
return new Promise<void>(() => { | ||
activeClient.subscribe(topic, subscribeOptions, subscribeCb) | ||
pendingSubs.delete(topic) | ||
}) | ||
} else { | ||
void waitUntilActiveOrErrored('subscription') | ||
.then(() => { | ||
subscriptions[topic] += 1 | ||
const activeClient = client as mqtt.MqttClient | ||
if (!subscriptions.has(topic)) { | ||
if (!pendingSubs.has(topic)) { | ||
pendingSubs.add(topic) | ||
return new Promise<void>(() => { | ||
activeClient.subscribe(topic, subscribeOptions, subscribeCb) | ||
pendingSubs.delete(topic) | ||
}) | ||
.catch(() => { | ||
} else { | ||
void waitUntilActiveOrErrored('subscription').catch(() => { | ||
sendToBrowserDeserialized({ | ||
browserWindow, | ||
hostname, | ||
topic, | ||
message: FAILURE_STATUSES.ECONNFAILED, | ||
}) | ||
}) | ||
} | ||
} | ||
}) | ||
.catch(() => { | ||
|
@@ -191,18 +181,9 @@ function subscribe(notifyParams: NotifyParams): Promise<void> { | |
topic, | ||
message: FAILURE_STATUSES.ECONNFAILED, | ||
}) | ||
setTimeout(() => { | ||
if (Object.keys(connectionStore[hostname].subscriptions).length <= 0) { | ||
connectionStore[hostname].client?.end() | ||
} | ||
}, RENDER_TIMEOUT) | ||
} else { | ||
// log.info(`Successfully subscribed on ${hostname} to topic: ${topic}`) | ||
if (subscriptions[topic] > 0) { | ||
subscriptions[topic] += 1 | ||
} else { | ||
subscriptions[topic] = 1 | ||
} | ||
subscriptions.add(topic) | ||
} | ||
} | ||
|
||
|
@@ -218,7 +199,7 @@ function subscribe(notifyParams: NotifyParams): Promise<void> { | |
const hasReceivedAck = | ||
connection === 'client' | ||
? host?.client != null | ||
: host?.subscriptions[topic] > 0 | ||
: host?.subscriptions.has(topic) | ||
if (hasReceivedAck) { | ||
clearInterval(intervalId) | ||
resolve() | ||
|
@@ -235,43 +216,42 @@ function subscribe(notifyParams: NotifyParams): Promise<void> { | |
} | ||
} | ||
|
||
// Because subscription logic is directly tied to the component lifecycle, it is possible | ||
// for a component to trigger an unsubscribe event on dismount while a new component mounts and | ||
// triggers a subscribe event. For the connection store and MQTT to reflect correct topic subscriptions, | ||
// do not unsubscribe and close connections before newly mounted components have had time to update the connection store. | ||
const RENDER_TIMEOUT = 10000 // 10 seconds | ||
function checkForUnsubscribeFlag( | ||
deserializedMessage: string | Record<string, unknown>, | ||
hostname: string, | ||
topic: NotifyTopic | ||
): void { | ||
const messageContainsUnsubFlag = | ||
typeof deserializedMessage !== 'string' && | ||
'unsubscribe' in deserializedMessage | ||
|
||
if (messageContainsUnsubFlag) void unsubscribe(hostname, topic) | ||
} | ||
|
||
function unsubscribe(notifyParams: NotifyParams): Promise<void> { | ||
const { hostname, topic } = notifyParams | ||
function unsubscribe(hostname: string, topic: NotifyTopic): Promise<void> { | ||
return new Promise<void>(() => { | ||
setTimeout(() => { | ||
if (hostname in connectionStore) { | ||
const { client } = connectionStore[hostname] | ||
const subscriptions = connectionStore[hostname]?.subscriptions | ||
const isLastSubscription = subscriptions[topic] <= 1 | ||
|
||
if (isLastSubscription) { | ||
client?.unsubscribe(topic, {}, (error, result) => { | ||
if (error != null) { | ||
// log.warn( | ||
// `Failed to unsubscribe on ${hostname} from topic: ${topic}` | ||
// ) | ||
} else { | ||
// log.info( | ||
// `Successfully unsubscribed on ${hostname} from topic: ${topic}` | ||
// ) | ||
handleDecrementSubscriptionCount(hostname, topic) | ||
} | ||
}) | ||
if (hostname in connectionStore && !pendingUnsubs.has(topic)) { | ||
pendingUnsubs.add(topic) | ||
const { client } = connectionStore[hostname] | ||
client?.unsubscribe(topic, {}, (error, result) => { | ||
if (error != null) { | ||
// log.warn( | ||
// `Failed to unsubscribe on ${hostname} from topic: ${topic}` | ||
// ) | ||
} else { | ||
subscriptions[topic] -= 1 | ||
log.debug( | ||
`Successfully unsubscribed on ${hostname} from topic: ${topic}` | ||
) | ||
const { subscriptions } = connectionStore[hostname] | ||
subscriptions.delete(topic) | ||
pendingUnsubs.delete(topic) | ||
} | ||
} else { | ||
// log.info( | ||
// `Attempted to unsubscribe from unconnected hostname: ${hostname}` | ||
// ) | ||
} | ||
}, RENDER_TIMEOUT) | ||
}) | ||
} else { | ||
// log.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's uncomment or delete |
||
// `Attempted to unsubscribe from unconnected hostname: ${hostname}` | ||
// ) | ||
} | ||
}) | ||
} | ||
|
||
|
@@ -310,26 +290,6 @@ function connectAsync(brokerURL: string): Promise<mqtt.Client> { | |
}) | ||
} | ||
|
||
function handleDecrementSubscriptionCount( | ||
hostname: string, | ||
topic: NotifyTopic | ||
): void { | ||
const host = connectionStore[hostname] | ||
if (host) { | ||
const { client, subscriptions } = host | ||
if (topic in subscriptions) { | ||
subscriptions[topic] -= 1 | ||
if (subscriptions[topic] <= 0) { | ||
delete subscriptions[topic] | ||
} | ||
} | ||
|
||
if (Object.keys(subscriptions).length <= 0) { | ||
client?.end() | ||
} | ||
} | ||
} | ||
|
||
interface ListenerParams { | ||
client: mqtt.MqttClient | ||
browserWindow: BrowserWindow | ||
|
@@ -344,12 +304,15 @@ function establishListeners({ | |
client.on( | ||
'message', | ||
(topic: NotifyTopic, message: Buffer, packet: mqtt.IPublishPacket) => { | ||
sendToBrowserDeserialized({ | ||
browserWindow, | ||
const deserializedMessage = deserialize(message.toString()) | ||
checkForUnsubscribeFlag(deserializedMessage, LOCALHOST, topic) | ||
|
||
browserWindow.webContents.send( | ||
'notify', | ||
hostname, | ||
topic, | ||
message: message.toString(), | ||
}) | ||
deserializedMessage | ||
) | ||
} | ||
) | ||
|
||
|
@@ -419,13 +382,7 @@ function sendToBrowserDeserialized({ | |
topic, | ||
message, | ||
}: SendToBrowserParams): void { | ||
let deserializedMessage: string | Object | ||
|
||
try { | ||
deserializedMessage = JSON.parse(message) | ||
} catch { | ||
deserializedMessage = message | ||
} | ||
const deserializedMessage = deserialize(message) | ||
|
||
// log.info('Received notification data from main via IPC', { | ||
// hostname, | ||
|
@@ -434,3 +391,14 @@ function sendToBrowserDeserialized({ | |
|
||
browserWindow.webContents.send('notify', hostname, topic, deserializedMessage) | ||
} | ||
|
||
function deserialize(message: string): string | Record<string, unknown> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a case where this will be (a) valid and (b) not json? if so can we remove that case, and make this be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good point |
||
let deserializedMessage: string | Record<string, unknown> | ||
|
||
try { | ||
deserializedMessage = JSON.parse(message) | ||
} catch { | ||
deserializedMessage = message | ||
} | ||
return deserializedMessage | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might want this back but definitely don't want it commented - let's uncomment or delete