Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(px): refactor peer exchange + tests #1527

Merged
merged 7 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/protocol/waku_store,
../../waku/v2/protocol/waku_relay,
../../waku/v2/protocol/waku_filter,
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/protocol/waku_peer_exchange,
Expand Down Expand Up @@ -525,14 +526,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
if conf.peerExchange:
asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange)

# retrieve and connect to peer exchange peers
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
info "Retrieving peer info via peer exchange protocol"
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
try:
discard await node.wakuPeerExchange.request(desiredOutDegree)
except:
return err("failed to retrieve peer info via peer exchange protocol: " & getCurrentExceptionMsg())
await node.fetchPeerExchangePeers(desiredOutDegree)

# Start keepalive, if enabled
if conf.keepAlive:
Expand Down
129 changes: 119 additions & 10 deletions tests/v2/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
{.used.}

import
std/options,
std/[options, sequtils],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
Expand All @@ -18,7 +19,8 @@ import
../../waku/v2/protocol/waku_peer_exchange/rpc,
../../waku/v2/protocol/waku_peer_exchange/rpc_codec,
../test_helpers,
./utils
./utils,
./testlib/testutils


# TODO: Extend test coverage
Expand All @@ -30,8 +32,8 @@ procSuite "Waku Peer Exchange":
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])

discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")

let peerInfos = @[
PeerExchangePeerInfo(enr: enr1.raw),
Expand Down Expand Up @@ -127,20 +129,127 @@ procSuite "Waku Peer Exchange":
await node1.mountPeerExchange()
await node3.mountPeerExchange()

await sleepAsync(3000.millis) # Give the algorithm some time to work its magic
# Give the algorithm some time to work its magic
await sleepAsync(3000.millis)

asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()

node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo())
let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
check:
connOpt.isSome

## When
discard waitFor node3.wakuPeerExchange.request(1)
# Give the algorithm some time to work its magic
await sleepAsync(2000.millis)

await sleepAsync(2000.millis) # Give the algorithm some time to work its magic
## When
let response = await node3.wakuPeerExchange.request(1, connOpt.get())

## Then
check:
response.isOk
response.get().peerInfos.len == 1
node1.wakuDiscv5.protocol.nodesDiscovered > 0
node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId)

await allFutures([node1.stop(), node2.stop(), node3.stop()])

asyncTest "peer exchange request functions returns some discovered peers":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require:
connOpt.isSome

# Create some enr and add to peer exchange (sumilating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB")

# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)

# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())

# Check the response or dont even continue
require:
response1.isOk
response2.isOk
response3.isOk

check:
response1.get().peerInfos.len == 2
response2.get().peerInfos.len == 2
response3.get().peerInfos.len == 2

# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw)

asyncTest "peer exchange handler works as expected":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])

# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node1.wakuPeerExchange.enrCache.add(enr1)

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
let conn = connOpt.get()

# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(numPeers: 1))

var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)

# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
require decodedBuff.isOk

# Check we got back the enr we mocked
check:
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw

asyncTest "peer exchange request fails gracefully":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])

# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome

# Force closing the connection to simulate a failed peer
await connOpt.get().close()

# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())

# Check that it failed gracefully
check: response.isErr
35 changes: 33 additions & 2 deletions tests/v2/test_wakunode.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{.used.}

import
std/sequtils,
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
Expand All @@ -13,13 +14,17 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/nameresolving/mockresolver
libp2p/nameresolving/mockresolver,
eth/p2p/discoveryv5/enr
import
../../waku/v2/node/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_relay,
../../waku/v2/utils/peers
../../waku/v2/protocol/waku_peer_exchange,
../../waku/v2/utils/peers,
./testlib/testutils,
../test_helpers


procSuite "WakuNode":
Expand Down Expand Up @@ -284,3 +289,29 @@ procSuite "WakuNode":
node1MultiAddrs.contains(expectedMultiaddress1)

await allFutures(node1.stop(), node2.stop())

asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers":
let
node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))

# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])

# Mock that we discovered a node (to avoid running discv5)
var enr = enr.Record()
require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node2.wakuPeerExchange.enrCache.add(enr)

# Set node2 as service peer (default one) for px protocol
node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)

# Request 1 peer from peer exchange protocol
await node1.fetchPeerExchangePeers(1)

# Check that the peer ended up in the peerstore
let rpInfo = enr.toRemotePeerInfo.get()
check:
node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)
3 changes: 3 additions & 0 deletions waku/v2/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,8 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)

proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =
return peerStore.peers.filterIt(it.connectedness == Connected)

proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] =
return peerStore.peers.filterIt(it.protos.contains(proto))
19 changes: 19 additions & 0 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,25 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =

node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))

proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
return

info "Retrieving peer info via peer exchange protocol"
let pxPeersRes = await node.wakuPeerExchange.request(amount)
if pxPeersRes.isOk:
var validPeers = 0
for pi in pxPeersRes.get().peerInfos:
var record: enr.Record
if enr.fromBytes(record, pi.enr):
# TODO: Add source: PX
node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec)
validPeers += 1
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers
else:
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error

# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuPeerExchange.isNil():
Expand Down
Loading