Skip to content

Commit

Permalink
Attempt to reduce some test flakiness (#594)
Browse files Browse the repository at this point in the history
* Update several MQTT will tests to be more forgiving of potential service race conditions
* Remove shared subscription distribution assertion because it's not valid


Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Jan 6, 2025
1 parent 2eee416 commit aa5caa9
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
21 changes: 17 additions & 4 deletions lib/common/mqtt.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,26 @@ test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQT
const onMessage = once(connectionWaitingForWill, 'message');
await connectionWaitingForWill.subscribe(willTopic, QoS.AtLeastOnce);

// pause for a couple of seconds to try and minimize chance for a service-side race
await new Promise(resolve => setTimeout(resolve, 2000));

// The third connection that will cause the first one to be disconnected because it has the same client ID.
const connectionDuplicate = await makeConnection(undefined, client_id);
const onConnectDuplicate = once(connectionDuplicate, 'connect');

const onDisconnectDuplicate = once(connectionDuplicate, 'disconnect');
await connectionDuplicate.connect()
const connectDuplicateResult = (await onConnectDuplicate)[0];
expect(connectDuplicateResult).toBeFalsy(); /* session present */

// Rarely, IoT Core disconnects the new connection and not the existing one, so retry in that case
let continueConnecting = true;
while (continueConnecting) {
try {
const onConnectDuplicate = once(connectionDuplicate, 'connect');
await connectionDuplicate.connect();
await onConnectDuplicate;
continueConnecting = false;
} catch (err) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}

// The second connection should receive Will message after the first connection was kicked out.
const messageReceivedArgs = (await onMessage);
Expand Down
2 changes: 1 addition & 1 deletion lib/native/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
payload: testPayload
});

await setTimeout(()=>{}, 2000);
await new Promise(resolve => setTimeout(resolve, 2000));

statistics = client.getOperationalStatistics();
expect(statistics.incompleteOperationCount).toBeLessThanOrEqual(0);
Expand Down
7 changes: 4 additions & 3 deletions test/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ export async function subPubUnsubTest(client: mqtt5.Mqtt5Client, qos: mqtt5.QoS,
payload: testPayload
});

await setTimeout(()=>{}, 2000);
await new Promise(resolve => setTimeout(resolve, 2000));

client.stop();
await stopped;
Expand Down Expand Up @@ -434,6 +434,9 @@ export async function willTest(publisher: mqtt5.Mqtt5Client, subscriber: mqtt5.M
throw new CrtError("doh");
}

// pause to minimize eventual consistency race condition possibility
await new Promise(resolve => setTimeout(resolve, 1000));

publisher.stop({
reasonCode: mqtt5.DisconnectReasonCode.DisconnectWithWillMessage
});
Expand Down Expand Up @@ -659,8 +662,6 @@ export async function doSharedSubscriptionsTest(publisher: mqtt5.Mqtt5Client, su
let messagesReceived : number = 0;
subscriberMessages.forEach(v => {
messagesReceived += v;
// Each subscriber should receive a portion of messages.
expect(v).toBeGreaterThan(0);
});
expect(messagesReceived).toEqual(messagesNumber);

Expand Down

0 comments on commit aa5caa9

Please sign in to comment.