From 3f57c0977db994e01080cdf1a296ff333c4f635c Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 1 Dec 2023 10:18:48 -0800 Subject: [PATCH] Add shared subscriptions test (#509) --- lib/native/mqtt5.spec.ts | 8 ++++ test/mqtt5.ts | 96 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/lib/native/mqtt5.spec.ts b/lib/native/mqtt5.spec.ts index 144c7a009..46d5d46f2 100644 --- a/lib/native/mqtt5.spec.ts +++ b/lib/native/mqtt5.spec.ts @@ -582,6 +582,14 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir expect(willReceived).toEqual(true); }); +test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Shared subscriptions test', async () => { + const config : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig(); + const publisher : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + const subscriber1 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + const subscriber2 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config); + await test_utils.doSharedSubscriptionsTest(publisher, subscriber1, subscriber2); +}); + test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Operation failure - null subscribe', async () => { await test_utils.nullSubscribeTest(new mqtt5.Mqtt5Client(createDirectIotCoreClientConfig())); }); diff --git a/test/mqtt5.ts b/test/mqtt5.ts index c9d344b71..5c227a48d 100644 --- a/test/mqtt5.ts +++ b/test/mqtt5.ts @@ -576,3 +576,99 @@ export async function doRetainTest(client1: mqtt5.Mqtt5Client, client2: mqtt5.Mq await stopped1; client1.close(); } + +export async function doSharedSubscriptionsTest(publisher: mqtt5.Mqtt5Client, subscriber1: mqtt5.Mqtt5Client, subscriber2: mqtt5.Mqtt5Client) { + const payload : Buffer = Buffer.from("share", "utf-8"); + const messagesNumber: number = 10; + const testTopic: string = `mqtt5_test${uuid()}`; + const sharedTopicfilter : string = `$share/crttest/${testTopic}`; + + const publisherConnected = once(publisher, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + const publisherStopped = once(publisher, mqtt5.Mqtt5Client.STOPPED); + + const subscriber1Connected = once(subscriber1, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + const subscriber1Stopped = once(subscriber1, mqtt5.Mqtt5Client.STOPPED); + + const subscriber2Connected = once(subscriber2, mqtt5.Mqtt5Client.CONNECTION_SUCCESS); + const subscriber2Stopped = once(subscriber2, mqtt5.Mqtt5Client.STOPPED); + + publisher.start(); + subscriber1.start(); + subscriber2.start(); + + await publisherConnected; + await subscriber1Connected; + await subscriber2Connected; + + await subscriber1.subscribe({ + subscriptions: [ + {topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce} + ] + }); + await subscriber2.subscribe({ + subscriptions: [ + {topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce} + ] + }); + + let receivedResolve : (value?: void | PromiseLike) => void; + const receivedPromise = new Promise((resolve, reject) => { + receivedResolve = resolve; + setTimeout(() => reject(new Error("Did not receive expected number of messages")), 4000); + }); + + // map: subscriberId -> receivedCount + const subscriberMessages = new Map(); + + const getOnMessageReceived = (subscriberId : string) => { + subscriberMessages.set(subscriberId, 0); + + return (eventData: mqtt5.MessageReceivedEvent) => { + const packet: mqtt5.PublishPacket = eventData.message; + + subscriberMessages.set(subscriberId, subscriberMessages.get(subscriberId) + 1); + + let messagesReceived : number = 0; + subscriberMessages.forEach(v => messagesReceived += v); + if (messagesReceived == messagesNumber) { + receivedResolve(); + } + + expect(packet.qos).toEqual(mqtt5.QoS.AtLeastOnce); + expect(packet.topicName).toEqual(testTopic); + }; + }; + + subscriber1.on('messageReceived', getOnMessageReceived("sub1")); + subscriber2.on('messageReceived', getOnMessageReceived("sub2")); + + for (let i = 0; i < messagesNumber; ++i) { + publisher.publish({ + topicName: testTopic, + qos: mqtt5.QoS.AtLeastOnce, + payload: payload + }); + } + + // Wait for receiving all published messages. + await receivedPromise; + + // Wait a little longer to check if extra messages arrive. + await new Promise(resolve => setTimeout(resolve, 1000)); + + let messagesReceived : number = 0; + subscriberMessages.forEach(v => { + messagesReceived += v; + // Each subscriber should receive a portion of messages. + expect(v).toBeGreaterThan(0); + }); + expect(messagesReceived).toEqual(messagesNumber); + + subscriber2.stop(); + subscriber1.stop(); + publisher.stop(); + + await subscriber2Stopped; + await subscriber1Stopped; + await publisherStopped; +}