Skip to content

Commit

Permalink
refactor(peer-exchange): move peer management to waku_node module
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorenzo Delgado committed Nov 1, 2022
1 parent cd73029 commit 3aa3856
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 62 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
# waku peer exchange setup
if (conf.peerExchangeNode != "") or (conf.peerExchange):
try:
await mountWakuPeerExchange(node)
await mountPeerExchange(node)
except:
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())

Expand Down
6 changes: 3 additions & 3 deletions tests/v2/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,14 @@ procSuite "Waku Peer Exchange":
await allFutures([node1.startDiscv5(), node2.startDiscv5()])

# Mount peer exchange
await node1.mountWakuPeerExchange()
await node3.mountWakuPeerExchange()
await node1.mountPeerExchange()
await node3.mountPeerExchange()

await sleepAsync(3000.millis) # Give the algorithm some time to work its magic

asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop()

node3.wakuPeerExchange.setPeer(node1.switch.peerInfo.toRemotePeerInfo())
node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo())

## When
discard waitFor node3.wakuPeerExchange.request(1)
Expand Down
11 changes: 7 additions & 4 deletions waku/v2/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicGauge waku_lightpush_peers, "number of lightpush peers"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"


logScope:
Expand Down Expand Up @@ -697,20 +698,21 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMe

## Waku peer-exchange

proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku peer exchange"

var discv5Opt: Option[WakuDiscoveryV5]
if not node.wakuDiscV5.isNil():
discv5Opt = some(node.wakuDiscV5)
node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt)

node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, discv5Opt)

if node.started:
# Node has started already. Let's start Waku peer exchange too.
await node.wakuPeerExchange.start()

node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))

# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} =
if node.wakuPeerExchange.isNil():
error "could not set peer, waku peer-exchange is nil"
Expand All @@ -720,7 +722,8 @@ proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises:

let remotePeer = when peer is string: parseRemotePeerInfo(peer)
else: peer
node.wakuPeerExchange.setPeer(remotePeer)
node.peerManager.addPeer(remotePeer, WakuPeerExchangeCodec)
waku_px_peers.inc()


## Other protocols
Expand Down
7 changes: 2 additions & 5 deletions waku/v2/protocol/waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
import
./waku_peer_exchange/rpc,
./waku_peer_exchange/rpc_codec,
./waku_peer_exchange/protocol,
./waku_peer_exchange/client

./waku_peer_exchange/protocol
export
rpc,
rpc_codec,
protocol,
client
protocol
9 changes: 0 additions & 9 deletions waku/v2/protocol/waku_peer_exchange/client.nim

This file was deleted.

72 changes: 32 additions & 40 deletions waku/v2/protocol/waku_peer_exchange/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import
./rpc_codec


declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol"
declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange"
declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange"
declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters"
Expand Down Expand Up @@ -107,56 +106,55 @@ proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo

return ok()

proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)

return await wpx.respond(enrs, peerOpt.get())

proc cleanCache(px: WakuPeerExchange) {.gcsafe.} =
px.enrCache.delete(0, CacheCleanWindow-1)
proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} =
wpx.enrCache.delete(0..CacheCleanWindow-1)

proc runPeerExchangeDiscv5Loop*(px: WakuPeerExchange) {.async, gcsafe.} =
proc runPeerExchangeDiscv5Loop*(wpx: WakuPeerExchange) {.async, gcsafe.} =
## Runs a discv5 loop adding new peers to the px peer cache
if px.wakuDiscv5.isNone():
if wpx.wakuDiscv5.isNone():
warn "Trying to run discovery v5 (for PX) while it's disabled"
return

info "Starting peer exchange discovery v5 loop"

while px.wakuDiscv5.get().listening:
while wpx.wakuDiscv5.get().listening:
trace "Running px discv5 discovery loop"
let discoveredPeers = await px.wakuDiscv5.get().findRandomPeers()
let discoveredPeers = await wpx.wakuDiscv5.get().findRandomPeers()
info "Discovered px peers via discv5", count=discoveredPeers.get().len()
if discoveredPeers.isOk:
if discoveredPeers.isOk():
for dp in discoveredPeers.get():
if dp.enr.isSome() and not px.enrCache.contains(dp.enr.get()):
px.enrCache.add(dp.enr.get())
if dp.enr.isSome() and not wpx.enrCache.contains(dp.enr.get()):
wpx.enrCache.add(dp.enr.get())

if px.enrCache.len() >= MaxCacheSize:
px.cleanCache()
if wpx.enrCache.len() >= MaxCacheSize:
wpx.cleanCache()

## This loop "competes" with the loop in wakunode2
## For the purpose of collecting px peers, 30 sec intervals should be enough
await sleepAsync(30.seconds)

proc getEnrsFromCache(px: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} =
randomize()
if px.enrCache.len() == 0:
if wpx.enrCache.len() == 0:
debug "peer exchange ENR cache is empty"
return @[]
for i in 0..<min(numPeers, px.enrCache.len().uint64()):
let ri = rand(0..<px.enrCache.len())
result.add(px.enrCache[ri])

proc initProtocolHandler*(px: WakuPeerExchange) =
for i in 0..<min(numPeers, wpx.enrCache.len().uint64()):
let ri = rand(0..<wpx.enrCache.len())
result.add(wpx.enrCache[ri])

proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let message = await conn.readLp(MaxRpcSize.int)
let buff = await conn.readLp(MaxRpcSize.int)

let res = PeerExchangeRpc.init(message)
let res = PeerExchangeRpc.init(buff)
if res.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
return
Expand All @@ -166,8 +164,8 @@ proc initProtocolHandler*(px: WakuPeerExchange) =
# handle peer exchange request
if rpc.request != PeerExchangeRequest():
trace "peer exchange request received"
let enrs = px.getEnrsFromCache(rpc.request.numPeers)
discard await px.respond(enrs, conn.peerId)
let enrs = wpx.getEnrsFromCache(rpc.request.numPeers)
discard await wpx.respond(enrs, conn.peerId)
waku_px_peers_sent.inc(enrs.len().int64())

# handle peer exchange response
Expand All @@ -182,28 +180,22 @@ proc initProtocolHandler*(px: WakuPeerExchange) =
remotePeerInfoList.add(record.toRemotePeerInfo().get)

let newPeers = remotePeerInfoList.filterIt(
not px.peerManager.switch.isConnected(it.peerId))
not wpx.peerManager.switch.isConnected(it.peerId))

if newPeers.len() > 0:
waku_px_peers_received_unknown.inc(newPeers.len().int64())
debug "Connecting to newly discovered peers", count=newPeers.len()
await px.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")
await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange")

px.handler = handler
px.codec = WakuPeerExchangeCodec
wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec

proc init*(T: type WakuPeerExchange,
peerManager: PeerManager,
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)
): T =
let px = WakuPeerExchange(
proc new*(T: type WakuPeerExchange,
peerManager: PeerManager,
wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5)): T =
let wpx = WakuPeerExchange(
peerManager: peerManager,
wakuDiscv5: wakuDiscv5
)
px.initProtocolHandler()
return px

proc setPeer*(wpx: WakuPeerExchange, peer: RemotePeerInfo) =
wpx.peerManager.addPeer(peer, WakuPeerExchangeCodec)
waku_px_peers.inc()

wpx.initProtocolHandler()
return wpx

0 comments on commit 3aa3856

Please sign in to comment.