From 059d7443a1e150df1bcfd3501523d173e1878a2b Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Thu, 2 Feb 2023 11:48:27 +0100 Subject: [PATCH 1/7] refactor(px): refactor px + add tests --- tests/v2/test_waku_peer_exchange.nim | 65 +++++++++++++--- waku/v2/node/peer_manager/waku_peer_store.nim | 3 + .../protocol/waku_peer_exchange/protocol.nim | 77 ++++++++----------- 3 files changed, 90 insertions(+), 55 deletions(-) diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index c331fb159c..5af1bb0efe 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -1,12 +1,13 @@ {.used.} import - std/options, + std/[options, sequtils], testutils/unittests, chronos, chronicles, stew/shims/net, libp2p/switch, + libp2p/peerId, libp2p/crypto/crypto, eth/keys, eth/p2p/discoveryv5/enr @@ -18,7 +19,8 @@ import ../../waku/v2/protocol/waku_peer_exchange/rpc, ../../waku/v2/protocol/waku_peer_exchange/rpc_codec, ../test_helpers, - ./utils + ./utils, + ./testlib/testutils # TODO: Extend test coverage @@ -30,8 +32,8 @@ procSuite "Waku Peer Exchange": enr1 = enr.Record(seqNum: 0, raw: @[]) enr2 = enr.Record(seqNum: 0, raw: @[]) - discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") - discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") + check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") + check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") let peerInfos = @[ PeerExchangePeerInfo(enr: enr1.raw), @@ -127,20 +129,63 @@ procSuite "Waku Peer Exchange": await node1.mountPeerExchange() await node3.mountPeerExchange() - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic + # Give the algorithm some time to work its magic + await sleepAsync(3000.millis) asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop() - node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) + let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + check: + connOpt.isSome - ## When - discard waitFor node3.wakuPeerExchange.request(1) + # Give the algorithm some time to work its magic + await sleepAsync(2000.millis) - await sleepAsync(2000.millis) # Give the algorithm some time to work its magic + ## When + let response = await node3.wakuPeerExchange.request(1, connOpt.get()) ## Then check: + response.isOk + response.get().peerInfos.len == 1 node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId) await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + asyncTest "peer exchange request returns some discovered peers": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + check: + connOpt.isSome + + # Create some enr and add to peer exchange (sumilating disv5) + var enr1, enr2 = enr.Record() + check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB") + + # Mock that we have discovered these enrs + node1.wakuPeerExchange.enrCache.add(enr1) + node1.wakuPeerExchange.enrCache.add(enr2) + + # Request 2 peer from px + let response = await node1.wakuPeerExchange.request(2, connOpt.get()) + let pxPeers = response.get().peerInfos + + # Check the response was ok and we got the correct enrs + check: + response.isOk + response.get().peerInfos.len == 2 + + # Since it can return duplicates test that at least one of the enrs is in the response + pxPeers.anyIt(it.enr == enr1.raw) or pxPeers.anyIt(it.enr == enr2.raw) + + +# TODO: Test setPeerExchangePeer #node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index d8d026ad6f..460543c00e 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -175,5 +175,8 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) +proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = + return peerStore.peers.filterIt(it.connectedness == Connected) + proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] = return peerStore.peers.filterIt(it.protos.contains(proto)) diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 4c1f590c73..17b9a12df3 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -49,70 +49,53 @@ type WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager wakuDiscv5: Option[WakuDiscoveryV5] - enrCache: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ -proc sendPeerExchangeRpcToPeer(wpx: WakuPeerExchange, rpc: PeerExchangeRpc, peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) - if connOpt.isNone(): - return err(dialFailure) - - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) +proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = + let rpc = PeerExchangeRpc( + request: PeerExchangeRequest(numPeers: numPeers)) - return ok() + try: + await conn.writeLP(rpc.encode().buffer) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err(exc.msg) -proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let rpc = PeerExchangeRpc( - request: PeerExchangeRequest( - numPeers: numPeers - ) - ) + let buffer = await conn.readLp(MaxRpcSize.int) + let decodedBuff = PeerExchangeRpc.decode(buffer) + if decodedBuff.isErr(): + return err("decode failed: " & $decodedBuff.error) - let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) - if res.isErr(): - waku_px_errors.inc(labelValues = [res.error()]) - return err(res.error()) + return ok(decodedBuff.get().response) - return ok() +proc request*(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = + let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) + if connOpt.isNone(): + return err(dialFailure) + return await wpx.request(numPeers, connOpt.get()) -proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = +proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) - return await wpx.request(numPeers, peerOpt.get()) -proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - var peerInfos: seq[PeerExchangePeerInfo] = @[] - for e in enrs: - let pi = PeerExchangePeerInfo( - enr: e.raw - ) - peerInfos.add(pi) - +proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = let rpc = PeerExchangeRpc( response: PeerExchangeResponse( - peerInfos: peerInfos + peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)) ) ) - let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) - if res.isErr(): - waku_px_errors.inc(labelValues = [res.error()]) - return err(res.error()) + try: + await conn.writeLP(rpc.encode().buffer) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err(exc.msg) return ok() -proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) - if peerOpt.isNone(): - waku_px_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await wpx.respond(enrs, peerOpt.get()) - proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} = wpx.enrCache.delete(0..CacheCleanWindow-1) @@ -147,6 +130,7 @@ proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] return @[] for i in 0.. 0: waku_px_peers_received_unknown.inc(newPeers.len().int64()) debug "Connecting to newly discovered peers", count=newPeers.len() + # TODO: This should just add peers to the peerstore, not trying to connect to them await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") wpx.handler = handler From 8bf3832ec8b71b0fe7be248456981719f5a803a3 Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Thu, 2 Feb 2023 15:43:19 +0100 Subject: [PATCH 2/7] refactor(px): add tests and handle error gracefully --- apps/wakunode2/wakunode2.nim | 11 ++-- tests/v2/test_waku_peer_exchange.nim | 52 +++++++++++++++---- .../protocol/waku_peer_exchange/protocol.nim | 46 +++++++++------- waku/v2/protocol/waku_peer_exchange/rpc.nim | 1 + 4 files changed, 76 insertions(+), 34 deletions(-) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 11d54b6947..292ae537a6 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -529,10 +529,13 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, if conf.peerExchangeNode != "": info "Retrieving peer info via peer exchange protocol" let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - try: - discard await node.wakuPeerExchange.request(desiredOutDegree) - except: - return err("failed to retrieve peer info via peer exchange protocol: " & getCurrentExceptionMsg()) + let pxPeersRes = await node.wakuPeerExchange.request(desiredOutDegree) + if pxPeersRes.isOk: + let pxPeers = pxPeersRes.get().peerInfos + echo "----pxPeers, ", pxPeers + else: + #error is tosevere? + warn "failed to retrieve peer info via peer exchange protocol" # Start keepalive, if enabled if conf.keepAlive: diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 5af1bb0efe..b7e7c84c0f 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -26,7 +26,7 @@ import # TODO: Extend test coverage procSuite "Waku Peer Exchange": - asyncTest "encode and decode peer exchange response": + xasyncTest "encode and decode peer exchange response": ## Setup var enr1 = enr.Record(seqNum: 0, raw: @[]) @@ -69,7 +69,7 @@ procSuite "Waku Peer Exchange": resEnr1 == enr1 resEnr2 == enr2 - asyncTest "retrieve and provide peer exchange peers from discv5": + xasyncTest "retrieve and provide peer exchange peers from discv5": ## Setup (copied from test_waku_discv5.nim) let bindIp = ValidIpAddress.init("0.0.0.0") @@ -163,7 +163,7 @@ procSuite "Waku Peer Exchange": # Create connection let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - check: + require: connOpt.isSome # Create some enr and add to peer exchange (sumilating disv5) @@ -175,17 +175,49 @@ procSuite "Waku Peer Exchange": node1.wakuPeerExchange.enrCache.add(enr1) node1.wakuPeerExchange.enrCache.add(enr2) - # Request 2 peer from px - let response = await node1.wakuPeerExchange.request(2, connOpt.get()) - let pxPeers = response.get().peerInfos + # Request 2 peer from px. Test all request variants + let response1 = await node2.wakuPeerExchange.request(2) + let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo()) + let response3 = await node2.wakuPeerExchange.request(2, connOpt.get()) + + # Check the response or dont even continue + require: + response1.isOk + response2.isOk + response3.isOk - # Check the response was ok and we got the correct enrs check: - response.isOk - response.get().peerInfos.len == 2 + response1.get().peerInfos.len == 2 + response2.get().peerInfos.len == 2 + response3.get().peerInfos.len == 2 # Since it can return duplicates test that at least one of the enrs is in the response - pxPeers.anyIt(it.enr == enr1.raw) or pxPeers.anyIt(it.enr == enr2.raw) + response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw) + response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw) + response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw) + asyncTest "peer exchange request fails gracefully": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + require connOpt.isSome + + # Force closing the connection to simulate a failed peer + await connOpt.get().close() + + # Request 2 peer from px + let response = await node1.wakuPeerExchange.request(2, connOpt.get()) + + # Check that it failed gracefully + check: response.isErr # TODO: Test setPeerExchangePeer #node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) + +# TODO: test with dead connection or failed peers. diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 17b9a12df3..e8a87c85ca 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -55,17 +55,17 @@ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future let rpc = PeerExchangeRpc( request: PeerExchangeRequest(numPeers: numPeers)) + var buffer: seq[byte] try: await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(MaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - return err(exc.msg) + return err("write/read failed: " & $exc.msg) - let buffer = await conn.readLp(MaxRpcSize.int) let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): return err("decode failed: " & $decodedBuff.error) - return ok(decodedBuff.get().response) proc request*(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = @@ -145,33 +145,39 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = let rpc = res.get() # handle peer exchange request + # TODO if rpc.request != PeerExchangeRequest(): trace "peer exchange request received" let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) + # TODO we shouldnt discard this? discard await wpx.respond(enrs, conn) waku_px_peers_sent.inc(enrs.len().int64()) # handle peer exchange response # TODO: wondering if this should not be part of the protocol # whats done with the peers is not part of the protocol - if rpc.response != PeerExchangeResponse(): + + # TODO: This could technically allow to inject unsolicitated responses that + # were not originated by a request. Possible attack. + #if rpc.response != PeerExchangeResponse(): + # echo "---enter in response" # todo: error handling - trace "peer exchange response received" - var record: enr.Record - var remotePeerInfoList: seq[RemotePeerInfo] - waku_px_peers_received_total.inc(rpc.response.peerInfos.len().int64()) - for pi in rpc.response.peerInfos: - discard enr.fromBytes(record, pi.enr) - remotePeerInfoList.add(record.toRemotePeerInfo().get) - - let newPeers = remotePeerInfoList.filterIt( - not wpx.peerManager.switch.isConnected(it.peerId)) - - if newPeers.len() > 0: - waku_px_peers_received_unknown.inc(newPeers.len().int64()) - debug "Connecting to newly discovered peers", count=newPeers.len() - # TODO: This should just add peers to the peerstore, not trying to connect to them - await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") + # trace "peer exchange response received" + # var record: enr.Record + # var remotePeerInfoList: seq[RemotePeerInfo] + # waku_px_peers_received_total.inc(rpc.response.peerInfos.len().int64()) + # for pi in rpc.response.peerInfos: + # discard enr.fromBytes(record, pi.enr) + # remotePeerInfoList.add(record.toRemotePeerInfo().get) + + # let newPeers = remotePeerInfoList.filterIt( + # not wpx.peerManager.switch.isConnected(it.peerId)) + + # if newPeers.len() > 0: + # waku_px_peers_received_unknown.inc(newPeers.len().int64()) + # debug "Connecting to newly discovered peers", count=newPeers.len() + # # TODO: This should just add peers to the peerstore, not trying to connect to them + # await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") wpx.handler = handler wpx.codec = WakuPeerExchangeCodec diff --git a/waku/v2/protocol/waku_peer_exchange/rpc.nim b/waku/v2/protocol/waku_peer_exchange/rpc.nim index 0b248d9356..ea26cfa053 100644 --- a/waku/v2/protocol/waku_peer_exchange/rpc.nim +++ b/waku/v2/protocol/waku_peer_exchange/rpc.nim @@ -8,6 +8,7 @@ type PeerExchangeResponse* = object peerInfos*: seq[PeerExchangePeerInfo] +# TODO: is this even needed? PeerExchangeRpc* = object request*: PeerExchangeRequest response*: PeerExchangeResponse From d6cdfbd02cdd66cb1fe3c85da3e340cd34c9f9f0 Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Thu, 2 Feb 2023 16:49:56 +0100 Subject: [PATCH 3/7] refactoro(px): minor improvements and cleanups --- apps/wakunode2/wakunode2.nim | 13 +++-- tests/v2/test_waku_peer_exchange.nim | 8 +--- .../protocol/waku_peer_exchange/protocol.nim | 48 ++++++------------- waku/v2/protocol/waku_peer_exchange/rpc.nim | 1 - 4 files changed, 25 insertions(+), 45 deletions(-) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 292ae537a6..452be715ae 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -41,6 +41,7 @@ import ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time, ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_peer_exchange, @@ -531,11 +532,15 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, let desiredOutDegree = node.wakuRelay.parameters.d.uint64() let pxPeersRes = await node.wakuPeerExchange.request(desiredOutDegree) if pxPeersRes.isOk: - let pxPeers = pxPeersRes.get().peerInfos - echo "----pxPeers, ", pxPeers + var record: enr.Record + var validPeers = 0 + for pi in pxPeersRes.get().peerInfos: + if enr.fromBytes(record, pi.enr): + node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec) + validPeers += 1 + info "Retrieved peer info via peer exchange protocol", validPeers = validPeers else: - #error is tosevere? - warn "failed to retrieve peer info via peer exchange protocol" + warn "Failed to retrieve peer info via peer exchange protocol" # Start keepalive, if enabled if conf.keepAlive: diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index b7e7c84c0f..850c14001e 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -26,7 +26,7 @@ import # TODO: Extend test coverage procSuite "Waku Peer Exchange": - xasyncTest "encode and decode peer exchange response": + asyncTest "encode and decode peer exchange response": ## Setup var enr1 = enr.Record(seqNum: 0, raw: @[]) @@ -69,7 +69,7 @@ procSuite "Waku Peer Exchange": resEnr1 == enr1 resEnr2 == enr2 - xasyncTest "retrieve and provide peer exchange peers from discv5": + asyncTest "retrieve and provide peer exchange peers from discv5": ## Setup (copied from test_waku_discv5.nim) let bindIp = ValidIpAddress.init("0.0.0.0") @@ -217,7 +217,3 @@ procSuite "Waku Peer Exchange": # Check that it failed gracefully check: response.isErr - -# TODO: Test setPeerExchangePeer #node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) - -# TODO: test with dead connection or failed peers. diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index e8a87c85ca..6bd439fc5c 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -135,49 +135,29 @@ proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let buff = await conn.readLp(MaxRpcSize.int) + var buffer: seq[byte] + try: + buffer = await conn.readLp(MaxRpcSize.int) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return - let res = PeerExchangeRpc.decode(buff) + let res = PeerExchangeRpc.decode(buffer) if res.isErr(): waku_px_errors.inc(labelValues = [decodeRpcFailure]) return let rpc = res.get() - # handle peer exchange request - # TODO - if rpc.request != PeerExchangeRequest(): + # If we got a request (request field is not empty) + if rpc.request != default(PeerExchangeRequest): trace "peer exchange request received" let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) - # TODO we shouldnt discard this? - discard await wpx.respond(enrs, conn) - waku_px_peers_sent.inc(enrs.len().int64()) - - # handle peer exchange response - # TODO: wondering if this should not be part of the protocol - # whats done with the peers is not part of the protocol - - # TODO: This could technically allow to inject unsolicitated responses that - # were not originated by a request. Possible attack. - #if rpc.response != PeerExchangeResponse(): - # echo "---enter in response" - # todo: error handling - # trace "peer exchange response received" - # var record: enr.Record - # var remotePeerInfoList: seq[RemotePeerInfo] - # waku_px_peers_received_total.inc(rpc.response.peerInfos.len().int64()) - # for pi in rpc.response.peerInfos: - # discard enr.fromBytes(record, pi.enr) - # remotePeerInfoList.add(record.toRemotePeerInfo().get) - - # let newPeers = remotePeerInfoList.filterIt( - # not wpx.peerManager.switch.isConnected(it.peerId)) - - # if newPeers.len() > 0: - # waku_px_peers_received_unknown.inc(newPeers.len().int64()) - # debug "Connecting to newly discovered peers", count=newPeers.len() - # # TODO: This should just add peers to the peerstore, not trying to connect to them - # await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") + let res = await wpx.respond(enrs, conn) + if res.isErr: + waku_px_errors.inc(labelValues = [res.error]) + else: + waku_px_peers_sent.inc(enrs.len().int64()) wpx.handler = handler wpx.codec = WakuPeerExchangeCodec diff --git a/waku/v2/protocol/waku_peer_exchange/rpc.nim b/waku/v2/protocol/waku_peer_exchange/rpc.nim index ea26cfa053..0b248d9356 100644 --- a/waku/v2/protocol/waku_peer_exchange/rpc.nim +++ b/waku/v2/protocol/waku_peer_exchange/rpc.nim @@ -8,7 +8,6 @@ type PeerExchangeResponse* = object peerInfos*: seq[PeerExchangePeerInfo] -# TODO: is this even needed? PeerExchangeRpc* = object request*: PeerExchangeRequest response*: PeerExchangeResponse From e5e439858cccd8c2e1c09e7e851c46d59286d933 Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Tue, 7 Feb 2023 09:55:48 +0100 Subject: [PATCH 4/7] add new test case --- tests/v2/test_waku_peer_exchange.nim | 38 ++++++++++++++++++- .../protocol/waku_peer_exchange/protocol.nim | 2 +- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 850c14001e..c2d5b22b64 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -152,7 +152,7 @@ procSuite "Waku Peer Exchange": await allFutures([node1.stop(), node2.stop(), node3.stop()]) - asyncTest "peer exchange request returns some discovered peers": + asyncTest "peer exchange request functions returns some discovered peers": let node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) @@ -196,6 +196,42 @@ procSuite "Waku Peer Exchange": response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw) response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw) + asyncTest "peer exchange handler works as expected": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we have discovered these enrs + var enr1 = enr.Record() + check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + node1.wakuPeerExchange.enrCache.add(enr1) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + require connOpt.isSome + let conn = connOpt.get() + + # Send bytes so that they directly hit the handler + let rpc = PeerExchangeRpc( + request: PeerExchangeRequest(numPeers: 1)) + + var buffer: seq[byte] + await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(MaxRpcSize.int) + + # Decode the response + let decodedBuff = PeerExchangeRpc.decode(buffer) + require decodedBuff.isOk + + # Check we got back the enr we mocked + check: + decodedBuff.get().response.peerInfos.len == 1 + decodedBuff.get().response.peerInfos[0].enr == enr1.raw + asyncTest "peer exchange request fails gracefully": let node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 6bd439fc5c..6e7b92f306 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -29,7 +29,7 @@ logScope: const # We add a 64kB safety buffer for protocol overhead. # 10x-multiplier also for safety - MaxRpcSize = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... + MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... MaxCacheSize = 1000 CacheCleanWindow = 200 From d33294d5fbdd4ddd7fafbb9ad817f84e0b558cda Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Wed, 8 Feb 2023 10:11:16 +0100 Subject: [PATCH 5/7] refactor(px): move logic to waku_node + test --- apps/wakunode2/wakunode2.nim | 15 ++------------- tests/v2/test_wakunode.nim | 35 +++++++++++++++++++++++++++++++++-- waku/v2/node/waku_node.nim | 19 +++++++++++++++++++ 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 452be715ae..7c426f022a 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -526,21 +526,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, if conf.peerExchange: asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange) - # retrieve and connect to peer exchange peers + # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": - info "Retrieving peer info via peer exchange protocol" let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - let pxPeersRes = await node.wakuPeerExchange.request(desiredOutDegree) - if pxPeersRes.isOk: - var record: enr.Record - var validPeers = 0 - for pi in pxPeersRes.get().peerInfos: - if enr.fromBytes(record, pi.enr): - node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec) - validPeers += 1 - info "Retrieved peer info via peer exchange protocol", validPeers = validPeers - else: - warn "Failed to retrieve peer info via peer exchange protocol" + await node.fetchPeerExchangePeers(desiredOutDegree) # Start keepalive, if enabled if conf.keepAlive: diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 2b59445a23..cc08fb858c 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1,6 +1,7 @@ {.used.} import + std/sequtils, stew/byteutils, stew/shims/net as stewNet, testutils/unittests, @@ -13,13 +14,17 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, - libp2p/nameresolving/mockresolver + libp2p/nameresolving/mockresolver, + eth/p2p/discoveryv5/enr import ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_relay, - ../../waku/v2/utils/peers + ../../waku/v2/protocol/waku_peer_exchange, + ../../waku/v2/utils/peers, + ./testlib/testutils, + ../test_helpers procSuite "WakuNode": @@ -284,3 +289,29 @@ procSuite "WakuNode": node1MultiAddrs.contains(expectedMultiaddress1) await allFutures(node1.stop(), node2.stop()) + + asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we discovered a node (to avoid running discv5) + var enr = enr.Record() + require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + node2.wakuPeerExchange.enrCache.add(enr) + + # Set node2 as service peer (default one) for px protocol + node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + + # Request 1 peer from peer exchange protocol + await node1.fetchPeerExchangePeers(1) + + # Check that the peer ended up in the peerstore + let rpInfo = enr.toRemotePeerInfo.get() + check: + node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) + node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 0a370d192d..dfd63d9da0 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -960,6 +960,25 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) +proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} = + if node.wakuPeerExchange.isNil(): + error "could not get peers from px, waku peer-exchange is nil" + return + + info "Retrieving peer info via peer exchange protocol" + let pxPeersRes = await node.wakuPeerExchange.request(amount) + if pxPeersRes.isOk: + var record: enr.Record + var validPeers = 0 + for pi in pxPeersRes.get().peerInfos: + if enr.fromBytes(record, pi.enr): + # TODO: Add source: PX + node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec) + validPeers += 1 + info "Retrieved peer info via peer exchange protocol", validPeers = validPeers + else: + warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error + # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = if node.wakuPeerExchange.isNil(): From 1af3a0d656c309aeae2851bf0058a729a74c475f Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Thu, 9 Feb 2023 09:30:08 +0100 Subject: [PATCH 6/7] Fix issue where ok enrs were parsed as nok --- waku/v2/node/waku_node.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index dfd63d9da0..ec4f9af840 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -968,9 +968,9 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D info "Retrieving peer info via peer exchange protocol" let pxPeersRes = await node.wakuPeerExchange.request(amount) if pxPeersRes.isOk: - var record: enr.Record var validPeers = 0 for pi in pxPeersRes.get().peerInfos: + var record: enr.Record if enr.fromBytes(record, pi.enr): # TODO: Add source: PX node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec) From 7cb54339fe20cf991a9816a62e6447b94ae13212 Mon Sep 17 00:00:00 2001 From: alrevuelta Date: Thu, 9 Feb 2023 16:14:59 +0100 Subject: [PATCH 7/7] Fix proto comments --- .../protocol/waku_peer_exchange/protocol.nim | 23 ++++++++----------- .../protocol/waku_peer_exchange/rpc_codec.nim | 3 ++- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 6e7b92f306..c78f81b03f 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -142,22 +142,19 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = waku_px_errors.inc(labelValues = [exc.msg]) return - let res = PeerExchangeRpc.decode(buffer) - if res.isErr(): + let decBuf = PeerExchangeRpc.decode(buffer) + if decBuf.isErr(): waku_px_errors.inc(labelValues = [decodeRpcFailure]) return - let rpc = res.get() - - # If we got a request (request field is not empty) - if rpc.request != default(PeerExchangeRequest): - trace "peer exchange request received" - let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) - let res = await wpx.respond(enrs, conn) - if res.isErr: - waku_px_errors.inc(labelValues = [res.error]) - else: - waku_px_peers_sent.inc(enrs.len().int64()) + let rpc = decBuf.get() + trace "peer exchange request received" + let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) + let res = await wpx.respond(enrs, conn) + if res.isErr: + waku_px_errors.inc(labelValues = [res.error]) + else: + waku_px_peers_sent.inc(enrs.len().int64()) wpx.handler = handler wpx.codec = WakuPeerExchangeCodec diff --git a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim index cb4925db20..f8916185b2 100644 --- a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim +++ b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim @@ -83,7 +83,8 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] - discard ?pb.getField(1, requestBuffer) + if not ?pb.getField(1, requestBuffer): + return err(ProtoError.RequiredFieldMissing) rpc.request = ?PeerExchangeRequest.decode(requestBuffer) var responseBuffer: seq[byte]