Skip to content

Commit

Permalink
Add shared subscriptions test (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfod authored Dec 1, 2023
1 parent a91e55a commit 3f57c09
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 0 deletions.
8 changes: 8 additions & 0 deletions lib/native/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
expect(willReceived).toEqual(true);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Shared subscriptions test', async () => {
const config : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
const publisher : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
const subscriber1 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
const subscriber2 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
await test_utils.doSharedSubscriptionsTest(publisher, subscriber1, subscriber2);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Operation failure - null subscribe', async () => {
await test_utils.nullSubscribeTest(new mqtt5.Mqtt5Client(createDirectIotCoreClientConfig()));
});
Expand Down
96 changes: 96 additions & 0 deletions test/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,99 @@ export async function doRetainTest(client1: mqtt5.Mqtt5Client, client2: mqtt5.Mq
await stopped1;
client1.close();
}

export async function doSharedSubscriptionsTest(publisher: mqtt5.Mqtt5Client, subscriber1: mqtt5.Mqtt5Client, subscriber2: mqtt5.Mqtt5Client) {
const payload : Buffer = Buffer.from("share", "utf-8");
const messagesNumber: number = 10;
const testTopic: string = `mqtt5_test${uuid()}`;
const sharedTopicfilter : string = `$share/crttest/${testTopic}`;

const publisherConnected = once(publisher, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const publisherStopped = once(publisher, mqtt5.Mqtt5Client.STOPPED);

const subscriber1Connected = once(subscriber1, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const subscriber1Stopped = once(subscriber1, mqtt5.Mqtt5Client.STOPPED);

const subscriber2Connected = once(subscriber2, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const subscriber2Stopped = once(subscriber2, mqtt5.Mqtt5Client.STOPPED);

publisher.start();
subscriber1.start();
subscriber2.start();

await publisherConnected;
await subscriber1Connected;
await subscriber2Connected;

await subscriber1.subscribe({
subscriptions: [
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
]
});
await subscriber2.subscribe({
subscriptions: [
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
]
});

let receivedResolve : (value?: void | PromiseLike<void>) => void;
const receivedPromise = new Promise<void>((resolve, reject) => {
receivedResolve = resolve;
setTimeout(() => reject(new Error("Did not receive expected number of messages")), 4000);
});

// map: subscriberId -> receivedCount
const subscriberMessages = new Map();

const getOnMessageReceived = (subscriberId : string) => {
subscriberMessages.set(subscriberId, 0);

return (eventData: mqtt5.MessageReceivedEvent) => {
const packet: mqtt5.PublishPacket = eventData.message;

subscriberMessages.set(subscriberId, subscriberMessages.get(subscriberId) + 1);

let messagesReceived : number = 0;
subscriberMessages.forEach(v => messagesReceived += v);
if (messagesReceived == messagesNumber) {
receivedResolve();
}

expect(packet.qos).toEqual(mqtt5.QoS.AtLeastOnce);
expect(packet.topicName).toEqual(testTopic);
};
};

subscriber1.on('messageReceived', getOnMessageReceived("sub1"));
subscriber2.on('messageReceived', getOnMessageReceived("sub2"));

for (let i = 0; i < messagesNumber; ++i) {
publisher.publish({
topicName: testTopic,
qos: mqtt5.QoS.AtLeastOnce,
payload: payload
});
}

// Wait for receiving all published messages.
await receivedPromise;

// Wait a little longer to check if extra messages arrive.
await new Promise(resolve => setTimeout(resolve, 1000));

let messagesReceived : number = 0;
subscriberMessages.forEach(v => {
messagesReceived += v;
// Each subscriber should receive a portion of messages.
expect(v).toBeGreaterThan(0);
});
expect(messagesReceived).toEqual(messagesNumber);

subscriber2.stop();
subscriber1.stop();
publisher.stop();

await subscriber2Stopped;
await subscriber1Stopped;
await publisherStopped;
}

0 comments on commit 3f57c09

Please sign in to comment.