From ff76bee8f470e7279ff4c801cedcb21528b13f48 Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Wed, 26 Jun 2024 16:16:00 +0530 Subject: [PATCH 1/9] chore: add unit test for duplicate message push --- tests/wakunode_rest/test_rest_filter.nim | 145 +++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index e3893ec081..3f4f12e275 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -1,6 +1,7 @@ {.used.} import + std/[os, times], stew/byteutils, stew/shims/net, testutils/unittests, @@ -315,3 +316,147 @@ suite "Waku v2 Rest API - Filter V2": messages == @[testMessage] await restFilterTest.shutdown() + + asyncTest "duplicate message push to filter subscriber": + # setup filter service and client node + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + + let requestBody = FilterSubscribeRequest( + requestId: "1001", + contentFilters: @[DefaultContentTopic], + pubsubTopic: some(DefaultPubsubTopic), + ) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) + + # subscribe fiter service + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers( + DefaultPubsubTopic, DefaultContentTopic + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "1001" + subscribedPeer.len() == 1 + + # ping subscriber node + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("1002") + + check: + pingResponse.status == 200 + pingResponse.data.requestId == "1002" + pingResponse.data.statusDesc == "OK" + + # first - message push from service node to subscriber client + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: DefaultContentTopic, + timestamp: int64(2022), + meta: "test-meta".toBytes(), + ) + + let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check messages received client side or not + let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse1.status == 200 + $postMsgResponse1.contentType == $MIMETYPE_TEXT + postMsgResponse1.data == "OK" + len(messages1.data) == 1 + + # second - message push from service node to subscriber client + let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check message received client side or not + let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse2.status == 200 + $postMsgResponse2.contentType == $MIMETYPE_TEXT + postMsgResponse2.data == "OK" + len(messages2.data) == 0 + + await restFilterTest.shutdown() + + asyncTest "duplicate message push to filter subscriber ( sleep in between )": + # setup filter service and client node + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + + let requestBody = FilterSubscribeRequest( + requestId: "1001", + contentFilters: @[DefaultContentTopic], + pubsubTopic: some(DefaultPubsubTopic), + ) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) + + # subscribe fiter service + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers( + DefaultPubsubTopic, DefaultContentTopic + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "1001" + subscribedPeer.len() == 1 + + # ping subscriber node + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("1002") + + check: + pingResponse.status == 200 + pingResponse.data.requestId == "1002" + pingResponse.data.statusDesc == "OK" + + # first - message push from service node to subscriber client + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: DefaultContentTopic, + timestamp: int64(2022), + meta: "test-meta".toBytes(), + ) + + let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check messages received client side or not + let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse1.status == 200 + $postMsgResponse1.contentType == $MIMETYPE_TEXT + postMsgResponse1.data == "OK" + len(messages1.data) == 1 + + # Pause execution for 2 minutes to test TimeCache functionality of service node + sleep(120000) + + # second - message push from service node to subscriber client + let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check message received client side or not + let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse2.status == 200 + $postMsgResponse2.contentType == $MIMETYPE_TEXT + postMsgResponse2.data == "OK" + len(messages2.data) == 1 + await restFilterTest.shutdown() From 1ab08edeb15f6b482f3eddcfd2a6d7c5d5d882dc Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 02:45:31 +0530 Subject: [PATCH 2/9] chore: speedup the unit test and create better variable --- tests/wakunode_rest/test_rest_filter.nim | 4 ++-- waku/node/waku_node.nim | 4 +++- waku/waku_filter_v2/protocol.nim | 4 ++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 3f4f12e275..228db00a4a 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -53,7 +53,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountFilter() + await testSetup.serviceNode.mountFilter(cacheTTL = 2.minutes) await testSetup.subscriberNode.mountFilterClient() testSetup.subscriberNode.peerManager.addServicePeer( @@ -444,7 +444,7 @@ suite "Waku v2 Rest API - Filter V2": len(messages1.data) == 1 # Pause execution for 2 minutes to test TimeCache functionality of service node - sleep(120000) + sleep(1) # second - message push from service node to subscriber client let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 8d1ff8d790..3ce88cf788 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -437,12 +437,14 @@ proc mountFilter*( filter_subscriptions.DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, + cacheTTL: Duration = 2.minutes, ) {.async: (raises: []).} = ## Mounting filter v2 protocol info "mounting filter protocol" node.wakuFilter = WakuFilter.new( - node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer + node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, + cacheTTL, ) if node.started: diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 5d5b0a6e46..eec1a4c5d5 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -298,14 +298,14 @@ proc new*( subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, - timeout: Duration = 2.minutes, + cacheTTL: Duration = 2.minutes, ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.init( subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer ), peerManager: peerManager, - messageCache: init(TimedCache[string], timeout), + messageCache: init(TimedCache[string], cacheTTL), ) wf.initProtocolHandler() From dd3ea7882c86cc9c0a5d678948b5a5e9856c416f Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 02:52:28 +0530 Subject: [PATCH 3/9] chore: update cacheTTL to 1.seconds --- tests/wakunode_rest/test_rest_filter.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 228db00a4a..6e47264f80 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -53,7 +53,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountFilter(cacheTTL = 2.minutes) + await testSetup.serviceNode.mountFilter(cacheTTL = 1.seconds) await testSetup.subscriberNode.mountFilterClient() testSetup.subscriberNode.peerManager.addServicePeer( From 9dfa3632feb07d014694e3231215a0a37c4b5262 Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 03:14:18 +0530 Subject: [PATCH 4/9] chore: better naming converntion --- tests/wakunode_rest/test_rest_filter.nim | 2 +- waku/node/waku_node.nim | 4 ++-- waku/waku_filter_v2/protocol.nim | 4 ++-- waku/waku_filter_v2/subscriptions.nim | 1 + 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 6e47264f80..ebc3065002 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -53,7 +53,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountFilter(cacheTTL = 1.seconds) + await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds) await testSetup.subscriberNode.mountFilterClient() testSetup.subscriberNode.peerManager.addServicePeer( diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3ce88cf788..1b3e1e79b0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -437,14 +437,14 @@ proc mountFilter*( filter_subscriptions.DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, - cacheTTL: Duration = 2.minutes, + messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, ) {.async: (raises: []).} = ## Mounting filter v2 protocol info "mounting filter protocol" node.wakuFilter = WakuFilter.new( node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, - cacheTTL, + messageCacheTTL, ) if node.started: diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index eec1a4c5d5..c7f5cfb9c2 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -298,14 +298,14 @@ proc new*( subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, - cacheTTL: Duration = 2.minutes, + messageCacheTTL: Duration = MessageCacheTTL, ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.init( subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer ), peerManager: peerManager, - messageCache: init(TimedCache[string], cacheTTL), + messageCache: init(TimedCache[string], messageCacheTTL), ) wf.initProtocolHandler() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 37348025d6..5d03ff9ac9 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -13,6 +13,7 @@ const MaxFilterPeers* = 1000 MaxFilterCriteriaPerPeer* = 1000 DefaultSubscriptionTimeToLiveSec* = 5.minutes + MessageCacheTTL* = 2.minutes type # a single filter criterion is fully defined by a pubsub topic and content topic From 8a68bd47b0fb94bb32db0550c1f714d4c737ec5a Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 28 Jun 2024 01:47:17 +0200 Subject: [PATCH 5/9] Fix test compilation bug due wrong import --- tests/wakunode_rest/test_rest_filter.nim | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index ebc3065002..413dd49146 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -1,13 +1,14 @@ {.used.} import - std/[os, times], + std/[os], stew/byteutils, stew/shims/net, testutils/unittests, presto, presto/client as presto_client, - libp2p/crypto/crypto + libp2p/crypto/crypto, + chronos/timer import ../../waku/waku_api/message_cache, ../../waku/waku_core, From 5b067abe803ddee57ecc090d82a9038c9bec9e8e Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 11:40:39 +0530 Subject: [PATCH 6/9] chore: import chronos timer instead of os --- tests/wakunode_rest/test_rest_filter.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 413dd49146..18755ee09e 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -1,14 +1,14 @@ {.used.} import - std/[os], + std/os, + chronos/timer, stew/byteutils, stew/shims/net, testutils/unittests, presto, presto/client as presto_client, libp2p/crypto/crypto, - chronos/timer import ../../waku/waku_api/message_cache, ../../waku/waku_core, @@ -445,7 +445,7 @@ suite "Waku v2 Rest API - Filter V2": len(messages1.data) == 1 # Pause execution for 2 minutes to test TimeCache functionality of service node - sleep(1) + sleep(2) # second - message push from service node to subscriber client let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( From 4fbf5a6747f29f1fede2e3bf03c1017e18492718 Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 12:15:48 +0530 Subject: [PATCH 7/9] chore: resolve identation issue --- tests/wakunode_rest/test_rest_filter.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 18755ee09e..2bb93aa3b8 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -8,7 +8,7 @@ import testutils/unittests, presto, presto/client as presto_client, - libp2p/crypto/crypto, + libp2p/crypto/crypto import ../../waku/waku_api/message_cache, ../../waku/waku_core, From aada94bf28869b51be32051711e1b6fc702f2c8f Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 13:20:03 +0530 Subject: [PATCH 8/9] chore: resolve identation issue --- tests/wakunode_rest/test_rest_filter.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 2bb93aa3b8..8894f137ec 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -445,7 +445,7 @@ suite "Waku v2 Rest API - Filter V2": len(messages1.data) == 1 # Pause execution for 2 minutes to test TimeCache functionality of service node - sleep(2) + sleep(1000) # second - message push from service node to subscriber client let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( From 9785d5566851e085dad559b513b16dae00935719 Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Fri, 28 Jun 2024 16:14:15 +0530 Subject: [PATCH 9/9] chore: update to async sleep --- tests/wakunode_rest/test_rest_filter.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 8894f137ec..1127a9cc98 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -444,8 +444,8 @@ suite "Waku v2 Rest API - Filter V2": postMsgResponse1.data == "OK" len(messages1.data) == 1 - # Pause execution for 2 minutes to test TimeCache functionality of service node - sleep(1000) + # Pause execution for 1 seconds to test TimeCache functionality of service node + await sleepAsync(1.seconds) # second - message push from service node to subscriber client let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(