Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(2491): Fix metadata protocol disconnecting light nodes #2533

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions tests/node/peer_manager/test_peer_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import
chronicles,
std/[options, tables, strutils],
stew/shims/net,
chronos,
testutils/unittests

import
../../../waku/[node/waku_node, waku_core],
../../waku_lightpush/[lightpush_utils],
../../testlib/[wakucore, wakunode, futures, testasync],
../../../../waku/node/peer_manager/peer_manager

suite "Peer Manager":
suite "onPeerMetadata":
var
listenPort {.threadvar.}: Port
listenAddress {.threadvar.}: IpAddress
serverKey {.threadvar.}: PrivateKey
clientKey {.threadvar.}: PrivateKey
clusterId {.threadvar.}: uint64
shardTopic0 {.threadvar.}: string
shardTopic1 {.threadvar.}: string

asyncSetup:
listenPort = Port(0)
listenAddress = ValidIpAddress.init("0.0.0.0")
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
clusterId = 1
shardTopic0 = "/waku/2/rs/" & $clusterId & "/0"
shardTopic1 = "/waku/2/rs/" & $clusterId & "/1"

asyncTest "light client is not disconnected":
# Given two nodes with the same shardId
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1])

# And both mount metadata and filter
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountFilterClient()
await server.mountFilter()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
discard await client.filterSubscribe(
some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo
)
await sleepAsync(FUTURE_TIMEOUT)

check:
server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)

asyncTest "relay with same shardId is not disconnected":
# Given two nodes with the same shardId
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic0])

# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
await sleepAsync(FUTURE_TIMEOUT)

check:
server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)

asyncTest "relay with different shardId is disconnected":
# Given two nodes with different shardIds
let
server =
newTestWakuNode(serverKey, listenAddress, listenPort, topics = @[shardTopic0])
client =
newTestWakuNode(clientKey, listenAddress, listenPort, topics = @[shardTopic1])

# And both mount metadata and relay
discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic
discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic
await client.mountRelay()
await server.mountRelay()

# And both nodes are started
waitFor allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT)

# And the nodes are connected
let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo()
await client.connectToNodes(@[serverRemotePeerInfo])
await sleepAsync(FUTURE_TIMEOUT)

# When making an operation that triggers onPeerMetadata
client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic"))
await sleepAsync(FUTURE_TIMEOUT)

check:
not server.switch.isConnected(client.switch.peerInfo.toRemotePeerInfo().peerId)
not client.switch.isConnected(server.switch.peerInfo.toRemotePeerInfo().peerId)
9 changes: 7 additions & 2 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import
metrics,
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver
libp2p/nameresolving/nameresolver,
libp2p/peerstore

import
../../common/nimchronos,
../../common/enr,
Expand Down Expand Up @@ -369,7 +371,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
$clusterId
break guardClauses

if not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)):
if (
pm.peerStore.hasPeer(peerId, WakuRelayCodec) and
not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it))
):
reason = "no shards in common"
break guardClauses

Expand Down
2 changes: 2 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ proc filterSubscribe*(
contentTopics = contentTopics,
peer = remotePeer.peerId

when (contentTopics is ContentTopic):
let contentTopics = @[contentTopics]
let subRes = await node.wakuFilterClient.subscribe(
remotePeer, pubsubTopic.get(), contentTopics
)
Expand Down
Loading