diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 709047b9f..c2bc31060 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 709047b9fd1b1137fdfaaefe5116710191186539 +Subproject commit c2bc31060984dc3eb89b016c9ea0a525d259fc7d diff --git a/lib/browser/mqtt5.spec.ts b/lib/browser/mqtt5.spec.ts index 3bbbb7b48..5b6ab6a39 100644 --- a/lib/browser/mqtt5.spec.ts +++ b/lib/browser/mqtt5.spec.ts @@ -383,6 +383,27 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir expect(settings.clientId).toEqual(clientId); }); +test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Negotiated settings - always rejoin session', async () => { + let clientId : string = `test-${uuid()}`; + let config : mqtt5.Mqtt5ClientConfig = createWsIotCoreClientConfig(); + config.connectProperties = { + clientId: clientId, + keepAliveIntervalSeconds: 600, + sessionExpiryIntervalSeconds: 3600, + }; + + let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + + // @ts-ignore + await test_utils.testNegotiatedSettings(client, false); + + config.sessionBehavior = mqtt5.ClientSessionBehavior.RejoinAlways; + let forcedRejoinClient : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + + // @ts-ignore + await test_utils.testNegotiatedSettings(forcedRejoinClient, true); +}); + test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Sub - Pub QoS 0 - Unsub', async () => { let topic : string = `test-${uuid()}`; let testPayload : Buffer = Buffer.from("Derp", "utf-8"); diff --git a/lib/browser/mqtt5_utils.ts b/lib/browser/mqtt5_utils.ts index f11134afd..40c0ad5ff 100644 --- a/lib/browser/mqtt5_utils.ts +++ b/lib/browser/mqtt5_utils.ts @@ -119,7 +119,7 @@ export function getOrderedReconnectDelayBounds(configMin?: number, configMax?: n /** @internal */ function should_mqtt_js_use_clean_start(session_behavior? : mqtt5.ClientSessionBehavior) : boolean { - return session_behavior !== mqtt5.ClientSessionBehavior.RejoinPostSuccess; + return session_behavior !== mqtt5.ClientSessionBehavior.RejoinPostSuccess && session_behavior !== mqtt5.ClientSessionBehavior.RejoinAlways; } /** @internal */ diff --git a/lib/common/mqtt5.ts b/lib/common/mqtt5.ts index f84f7bcec..b92134c2f 100644 --- a/lib/common/mqtt5.ts +++ b/lib/common/mqtt5.ts @@ -105,6 +105,14 @@ export enum ClientSessionBehavior { * Session rejoin requires an appropriate non-zero session expiry interval in the client's CONNECT options. */ RejoinPostSuccess = 2, + + /** + * Always attempt to rejoin an existing session. Since the client does not yet support durable session persistence, + * this option is not guaranteed to be spec compliant because any unacknowledged qos1 publishes (which are + * part of the client session state) will not be present on the initial connection. Until we support + * durable session resumption, this option is technically spec-breaking, but useful. + */ + RejoinAlways = 3, } /** diff --git a/lib/native/mqtt5.spec.ts b/lib/native/mqtt5.spec.ts index 4946afb7d..267624874 100644 --- a/lib/native/mqtt5.spec.ts +++ b/lib/native/mqtt5.spec.ts @@ -483,6 +483,23 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir expect(settings.clientId).toEqual(clientId); }); +test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Negotiated settings - always rejoin session', async () => { + let clientId : string = `test-${uuid()}`; + let config : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig(); + config.connectProperties = { + clientId: clientId, + keepAliveIntervalSeconds: 600, + sessionExpiryIntervalSeconds: 3600, + }; + + let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + await test_utils.testNegotiatedSettings(client, false); + + config.sessionBehavior = mqtt5.ClientSessionBehavior.RejoinAlways; + let forcedRejoinClient : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + await test_utils.testNegotiatedSettings(forcedRejoinClient, true); +}); + test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Sub - Pub QoS 0 - Unsub', async () => { let topic : string = `test-${uuid()}`; let testPayload : Buffer = Buffer.from("Derp", "utf-8"); diff --git a/test/mqtt5.ts b/test/mqtt5.ts index ea4623965..edc53c5bf 100644 --- a/test/mqtt5.ts +++ b/test/mqtt5.ts @@ -308,7 +308,7 @@ export async function testSubscribeValidationFailure(client : mqtt5.Mqtt5Client, client.close(); } -export function verifyCommonNegotiatedSettings(settings: mqtt5.NegotiatedSettings) { +export function verifyCommonNegotiatedSettings(settings: mqtt5.NegotiatedSettings, expectedRejoinedSession: boolean) { expect(settings.maximumQos).toEqual(mqtt5.QoS.AtLeastOnce); expect(settings.sessionExpiryInterval).toBeDefined(); expect(settings.receiveMaximumFromServer).toBeDefined(); @@ -318,12 +318,12 @@ export function verifyCommonNegotiatedSettings(settings: mqtt5.NegotiatedSetting expect(typeof settings.wildcardSubscriptionsAvailable === 'boolean').toBeTruthy(); expect(typeof settings.subscriptionIdentifiersAvailable === 'boolean').toBeTruthy(); expect(typeof settings.sharedSubscriptionsAvailable === 'boolean').toBeTruthy(); - expect(settings.rejoinedSession).toBeFalsy(); + expect(settings.rejoinedSession).toEqual(expectedRejoinedSession); expect(settings.clientId).toBeDefined(); expect(settings.sessionExpiryInterval).toBeDefined(); } -export async function testNegotiatedSettings(client: mqtt5.Mqtt5Client) : Promise { +export async function testNegotiatedSettings(client: mqtt5.Mqtt5Client, expectedRejoinedSession?: boolean) : Promise { let connectionSuccess = once(client, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); let stopped = once(client, mqtt5.Mqtt5Client.STOPPED) @@ -338,7 +338,7 @@ export async function testNegotiatedSettings(client: mqtt5.Mqtt5Client) : Promis client.close(); - verifyCommonNegotiatedSettings(connectionSuccessEvent.settings); + verifyCommonNegotiatedSettings(connectionSuccessEvent.settings, expectedRejoinedSession ?? false); resolve(connectionSuccessEvent.settings); } catch (err) {