From 8b456188762bbb76f8717bf0430b33379a696f6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Thu, 14 Mar 2024 16:05:40 +0100 Subject: [PATCH 1/5] Fix metadata protocol disconnecting light nodes. --- tests/node/peer_manager/test_peer_manager.nim | 127 ++++++++++++++++++ waku/node/peer_manager/peer_manager.nim | 5 +- waku/node/waku_node.nim | 1 + 3 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 tests/node/peer_manager/test_peer_manager.nim diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim new file mode 100644 index 0000000000..983a3ac3fe --- /dev/null +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -0,0 +1,127 @@ +import + chronicles, + std/[options, tables, strutils], + stew/shims/net, + chronos, + testutils/unittests + +import + ../../../waku/[node/waku_node, waku_core], + ../../waku_lightpush/[lightpush_utils], + ../../testlib/[wakucore, wakunode, futures, testasync], + ../../../../waku/node/peer_manager/peer_manager + +suite "Peer Manager": + suite "onPeerMetadata": + var + listenPort {.threadvar.}: Port + listenAddress {.threadvar.}: IpAddress + serverKey {.threadvar.}: PrivateKey + clientKey {.threadvar.}: PrivateKey + clusterId {.threadvar.}: uint64 + shardTopic0 {.threadvar.}: string + shardTopic1 {.threadvar.}: string + + asyncSetup: + listenPort = Port(0) + listenAddress = ValidIpAddress.init("0.0.0.0") + serverKey = generateSecp256k1Key() + clientKey = generateSecp256k1Key() + clusterId = 1 + shardTopic0 = "/waku/2/rs/" & $clusterId & "/0" + shardTopic1 = "/waku/2/rs/" & $clusterId & "/1" + + asyncTest "light client is not disconnected": + # Given two nodes with the same shardId + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1]) + + # And both mount metadata and filter + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountFilterClient() + await server.mountFilter() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + discard await client.filterSubscribe( + some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo + ) + await sleepAsync(FUTURE_TIMEOUT) + + check: + server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) + + asyncTest "relay with same shardId is not disconnected": + # Given two nodes with the same shardId + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic0]) + + # And both mount metadata and relay + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountRelay() + await server.mountRelay() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + await sleepAsync(FUTURE_TIMEOUT) + + check: + server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) + + asyncTest "relay with different shardId is disconnected": + # Given two nodes with different shardIds + let + server = + newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0]) + client = + newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1]) + + # And both mount metadata and relay + discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + await client.mountRelay() + await server.mountRelay() + + # And both nodes are started + waitFor allFutures(server.start(), client.start()) + await sleepAsync(FUTURE_TIMEOUT) + + # And the nodes are connected + let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() + await client.connectToNodes(@[serverRemotePeerInfo]) + await sleepAsync(FUTURE_TIMEOUT) + + # When making an operation that triggers onPeerMetadata + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + await sleepAsync(FUTURE_TIMEOUT) + + check: + not server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId) + not client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 65df136972..4f20d5f1b1 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -369,7 +369,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = $clusterId break guardClauses - if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)): + if ( + pm.peerStore.hasProtocol(peerId, WakuRelayCodec) and + not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) + ): reason = "no shards in common" break guardClauses diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index dbd985ddaf..c72176be90 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -605,6 +605,7 @@ proc filterSubscribe*( contentTopics = contentTopics, peer = remotePeer.peerId + let contentTopics = contentTopics.mapIt($it) let subRes = await node.wakuFilterClient.subscribe( remotePeer, pubsubTopic.get(), contentTopics ) From de42c57e44d1745fb9cbb9078c49c280c6f912b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Thu, 14 Mar 2024 16:13:53 +0100 Subject: [PATCH 2/5] Add missing import. --- waku/node/peer_manager/peer_manager.nim | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 4f20d5f1b1..f17117d4fd 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -10,7 +10,9 @@ import metrics, libp2p/multistream, libp2p/muxers/muxer, - libp2p/nameresolving/nameresolver + libp2p/nameresolving/nameresolver, + libp2p/peerstore + import ../../common/nimchronos, ../../common/enr, From fda87ad027cbcb12faff7e2b5cb3cd4df2e7ae21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Thu, 14 Mar 2024 17:45:30 +0100 Subject: [PATCH 3/5] Add peerHasProtocol method. --- waku/node/peer_manager/peer_manager.nim | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index f17117d4fd..df100baf35 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -342,6 +342,11 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = of ConnEventKind.Disconnected: discard +proc peerHasProtocol( + peerManager: PeerManager, peerId: PeerId, protocol: string +): bool {.inline, raises: [].} = + return peerManager.peerStore[ProtoBook][peerId].contains(protocol) + proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = # To prevent metadata protocol from breaking prev nodes, by now we only # disconnect if the clusterid is specified. @@ -372,7 +377,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = break guardClauses if ( - pm.peerStore.hasProtocol(peerId, WakuRelayCodec) and + pm.peerHasProtocol(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): reason = "no shards in common" From eb1ef5c664451b50ae565a533180c8650b8d09ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Fri, 15 Mar 2024 16:33:30 +0100 Subject: [PATCH 4/5] Remove unneeded peerHasProtocol and fix string to seq conversion. --- waku/node/peer_manager/peer_manager.nim | 7 +------ waku/node/waku_node.nim | 3 ++- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index df100baf35..c2e245a0e5 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -342,11 +342,6 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = of ConnEventKind.Disconnected: discard -proc peerHasProtocol( - peerManager: PeerManager, peerId: PeerId, protocol: string -): bool {.inline, raises: [].} = - return peerManager.peerStore[ProtoBook][peerId].contains(protocol) - proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = # To prevent metadata protocol from breaking prev nodes, by now we only # disconnect if the clusterid is specified. @@ -377,7 +372,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = break guardClauses if ( - pm.peerHasProtocol(peerId, WakuRelayCodec) and + pm.peerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): reason = "no shards in common" diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c72176be90..434bd13b5c 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -605,7 +605,8 @@ proc filterSubscribe*( contentTopics = contentTopics, peer = remotePeer.peerId - let contentTopics = contentTopics.mapIt($it) + when (contentTopics is ContentTopic): + let contentTopics = @[contentTopics] let subRes = await node.wakuFilterClient.subscribe( remotePeer, pubsubTopic.get(), contentTopics ) From a6227a7233f3943fa40092d4f42d052c333d69f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Tue, 19 Mar 2024 15:09:24 +0100 Subject: [PATCH 5/5] Styling --- tests/node/peer_manager/test_peer_manager.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index 983a3ac3fe..f9e8f9a29e 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -56,8 +56,8 @@ suite "Peer Manager": # When making an operation that triggers onPeerMetadata discard await client.filterSubscribe( - some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo - ) + some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo + ) await sleepAsync(FUTURE_TIMEOUT) check: