From 729e63f53d84f734d8fa4cf239d1db803d433ffd Mon Sep 17 00:00:00 2001 From: Darshan K <35736874+darshankabariya@users.noreply.github.com> Date: Fri, 27 Sep 2024 18:16:46 +0530 Subject: [PATCH] refactor: wrap peer store (#3051) Encapsulate peerstore with wakupeerstore --- .../diagnose_connections.nim | 6 +- apps/wakucanary/wakucanary.nim | 2 +- examples/publisher.nim | 5 +- examples/subscriber.nim | 5 +- .../requests/peer_manager_request.nim | 3 +- tests/node/test_wakunode_peer_exchange.nim | 26 +- tests/node/test_wakunode_peer_manager.nim | 128 +++++---- tests/test_peer_manager.nim | 271 +++++++++--------- tests/test_peer_store_extended.nim | 195 +++++++------ tests/test_waku_dnsdisc.nim | 12 +- waku/node/peer_manager/peer_manager.nim | 127 ++++---- waku/node/peer_manager/waku_peer_store.nim | 186 ++++++------ waku/node/waku_node.nim | 4 +- waku/waku_api/rest/admin/handlers.nim | 18 +- waku/waku_core/peers.nim | 25 +- waku/waku_filter_v2/protocol.nim | 4 +- waku/waku_peer_exchange/protocol.nim | 5 +- 17 files changed, 570 insertions(+), 452 deletions(-) diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim index d5b2cca0ba..66718be7c7 100644 --- a/apps/liteprotocoltester/diagnose_connections.nim +++ b/apps/liteprotocoltester/diagnose_connections.nim @@ -37,9 +37,9 @@ logScope: proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} = trace "Starting logSelfPeersLoop diagnosis loop" while true: - let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec) - let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec) - let selfFilterPeers = pm.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec) + let selfLighpushPeers = pm.wakuPeerStore.getPeersByProtocol(WakuLightPushCodec) + let selfRelayPeers = pm.wakuPeerStore.getPeersByProtocol(WakuRelayCodec) + let selfFilterPeers = pm.wakuPeerStore.getPeersByProtocol(WakuFilterSubscribeCodec) let printable = catch: """*------------------------------------------------------------------------------------------* diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 9d7f5450e1..318b40629a 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -255,7 +255,7 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = return 1 let lp2pPeerStore = node.switch.peerStore - let conStatus = node.peerManager.peerStore[ConnectionBook][peer.peerId] + let conStatus = node.peerManager.wakuPeerStore[ConnectionBook][peer.peerId] if conf.ping: discard await pingFut diff --git a/examples/publisher.nim b/examples/publisher.nim index fa646536c9..654f406012 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -95,8 +95,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = - node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) + let numConnectedPeers = node.peerManager.wakuPeerStore[ConnectionBook].book + .values() + .countIt(it == Connected) if numConnectedPeers >= 6: notice "publisher is ready", connectedPeers = numConnectedPeers, required = 6 break diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 2cab3a731e..0dd22f4699 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -93,8 +93,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = - node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) + let numConnectedPeers = node.peerManager.wakuPeerStore[ConnectionBook].book + .values() + .countIt(it == Connected) if numConnectedPeers >= 6: notice "subscriber is ready", connectedPeers = numConnectedPeers, required = 6 break diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index 439859c6f0..ae4ef95ade 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -75,7 +75,8 @@ proc process*( return err(ret.error) of GET_ALL_PEER_IDS: ## returns a comma-separated string of peerIDs - let peerIDs = waku.node.peerManager.peerStore.peers().mapIt($it.peerId).join(",") + let peerIDs = + waku.node.peerManager.wakuPeerStore.peers().mapIt($it.peerId).join(",") return ok(peerIDs) of GET_PEER_IDS_BY_PROTOCOL: ## returns a comma-separated string of peerIDs that mount the given protocol diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index 49f61d2959..6da8d3fa30 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -83,7 +83,7 @@ suite "Waku Peer Exchange": # Then no peers are fetched check: - node.peerManager.peerStore.peers.len == 0 + node.peerManager.wakuPeerStore.peers.len == 0 res.error.status_code == SERVICE_UNAVAILABLE res.error.status_desc == some("PeerExchange is not mounted") @@ -98,12 +98,12 @@ suite "Waku Peer Exchange": res.error.status_desc == some("peer_not_found_failure") # Then no peers are fetched - check node.peerManager.peerStore.peers.len == 0 + check node.peerManager.wakuPeerStore.peers.len == 0 asyncTest "Node succesfully exchanges px peers with faked discv5": # Given both nodes mount peer exchange await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()]) - check node.peerManager.peerStore.peers.len == 0 + check node.peerManager.wakuPeerStore.peers.len == 0 # Mock that we discovered a node (to avoid running discv5) var enr = enr.Record() @@ -124,8 +124,8 @@ suite "Waku Peer Exchange": # Check that the peer ended up in the peerstore let rpInfo = enr.toRemotePeerInfo.get() check: - node.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) - node.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) + node.peerManager.wakuPeerStore.peers.anyIt(it.peerId == rpInfo.peerId) + node.peerManager.wakuPeerStore.peers.anyIt(it.addrs == rpInfo.addrs) suite "setPeerExchangePeer": var node2 {.threadvar.}: WakuNode @@ -142,7 +142,7 @@ suite "Waku Peer Exchange": asyncTest "peer set successfully": # Given a node with peer exchange mounted await node.mountPeerExchange() - let initialPeers = node.peerManager.peerStore.peers.len + let initialPeers = node.peerManager.wakuPeerStore.peers.len # And a valid peer info let remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() @@ -152,12 +152,12 @@ suite "Waku Peer Exchange": # Then the peer is added to the peer store check: - node.peerManager.peerStore.peers.len == (initialPeers + 1) + node.peerManager.wakuPeerStore.peers.len == (initialPeers + 1) asyncTest "peer exchange not mounted": # Given a node without peer exchange mounted check node.wakuPeerExchange == nil - let initialPeers = node.peerManager.peerStore.peers.len + let initialPeers = node.peerManager.wakuPeerStore.peers.len # And a valid peer info let invalidMultiAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() @@ -167,12 +167,12 @@ suite "Waku Peer Exchange": # Then no peer is added to the peer store check: - node.peerManager.peerStore.peers.len == initialPeers + node.peerManager.wakuPeerStore.peers.len == initialPeers asyncTest "peer info parse error": # Given a node with peer exchange mounted await node.mountPeerExchange() - let initialPeers = node.peerManager.peerStore.peers.len + let initialPeers = node.peerManager.wakuPeerStore.peers.len # And given a peer info with an invalid peer id var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() @@ -183,7 +183,7 @@ suite "Waku Peer Exchange": # Then no peer is added to the peer store check: - node.peerManager.peerStore.peers.len == initialPeers + node.peerManager.wakuPeerStore.peers.len == initialPeers suite "Waku Peer Exchange with discv5": asyncTest "Node successfully exchanges px peers with real discv5": @@ -286,13 +286,13 @@ suite "Waku Peer Exchange with discv5": let requestPeers = 1 - currentPeers = node3.peerManager.peerStore.peers.len + currentPeers = node3.peerManager.wakuPeerStore.peers.len let res = await node3.fetchPeerExchangePeers(1) check res.tryGet() == 1 # Then node3 has received 1 peer from node1 check: - node3.peerManager.peerStore.peers.len == currentPeers + requestPeers + node3.peerManager.wakuPeerStore.peers.len == currentPeers + requestPeers await allFutures( [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index 104baa6ef4..0fd80271be 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -45,9 +45,9 @@ suite "Peer Manager": var server {.threadvar.}: WakuNode - serverPeerStore {.threadvar.}: PeerStore + serverPeerStore {.threadvar.}: WakuPeerStore client {.threadvar.}: WakuNode - clientPeerStore {.threadvar.}: PeerStore + clientPeerStore {.threadvar.}: WakuPeerStore var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo @@ -64,9 +64,9 @@ suite "Peer Manager": clientKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, Port(3000)) - serverPeerStore = server.peerManager.peerStore + serverPeerStore = server.peerManager.wakuPeerStore client = newTestWakuNode(clientKey, listenIp, Port(3001)) - clientPeerStore = client.peerManager.peerStore + clientPeerStore = client.peerManager.wakuPeerStore await allFutures(server.start(), client.start()) @@ -86,8 +86,8 @@ suite "Peer Manager": # Then the server should have the client in its peer store check: clientPeerStore.peerExists(serverRemotePeerInfo.peerId) - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + clientPeerStore.getPeer(serverPeerId).connectedness == Connectedness.Connected + serverPeerStore.getPeer(clientPeerId).connectedness == Connectedness.Connected asyncTest "Graceful Handling of Non-Existent Peers": # Given a non existent RemotePeerInfo @@ -105,7 +105,8 @@ suite "Peer Manager": await client.connectToNodes(@[nonExistentRemotePeerInfo]) # Then the client exists in the peer store but is marked as a failed connection - let parsedRemotePeerInfo = clientPeerStore.get(nonExistentRemotePeerInfo.peerId) + let parsedRemotePeerInfo = + clientPeerStore.getPeer(nonExistentRemotePeerInfo.peerId) check: clientPeerStore.peerExists(nonExistentRemotePeerInfo.peerId) parsedRemotePeerInfo.connectedness == CannotConnect @@ -115,7 +116,7 @@ suite "Peer Manager": suite "Peer Store Pruning": asyncTest "Capacity is not exceeded": # Given the client's peer store has a capacity of 1 - clientPeerStore.capacity = 1 + clientPeerStore.setCapacity(1) # And the client connects to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -131,7 +132,7 @@ suite "Peer Manager": asyncTest "Capacity is not exceeded but some peers are unhealthy": # Given the client's peer store has a capacity of 1 - clientPeerStore.capacity = 1 + clientPeerStore.setCapacity(1) # And the client connects to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -139,7 +140,8 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as CannotConnect - client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + CannotConnect # When pruning the client's store client.peerManager.prunePeerStore() @@ -150,7 +152,7 @@ suite "Peer Manager": asyncTest "Capacity is exceeded but all peers are healthy": # Given the client's peer store has a capacity of 0 - clientPeerStore.capacity = 0 + clientPeerStore.setCapacity(0) # And the client connects to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -166,7 +168,7 @@ suite "Peer Manager": asyncTest "Failed connections": # Given the client's peer store has a capacity of 0 and maxFailedAttempts of 1 - clientPeerStore.capacity = 0 + clientPeerStore.setCapacity(0) client.peerManager.maxFailedAttempts = 1 # And the client connects to the server @@ -175,7 +177,7 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as having 1 failed connection - client.peerManager.peerStore[NumberFailedConnBook].book[serverPeerId] = 1 + client.peerManager.wakuPeerStore[NumberFailedConnBook].book[serverPeerId] = 1 # When pruning the client's store client.peerManager.prunePeerStore() @@ -186,7 +188,7 @@ suite "Peer Manager": asyncTest "Shardless": # Given the client's peer store has a capacity of 0 - clientPeerStore.capacity = 0 + clientPeerStore.setCapacity(0) # And the client connects to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -194,7 +196,8 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as not connected - client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + CannotConnect # When pruning the client's store client.peerManager.prunePeerStore() @@ -205,7 +208,7 @@ suite "Peer Manager": asyncTest "Higher than avg shard count": # Given the client's peer store has a capacity of 0 - clientPeerStore.capacity = 0 + clientPeerStore.setCapacity(0) # And the server's remote peer info contains the node's ENR serverRemotePeerInfo.enr = some(server.enr) @@ -217,7 +220,8 @@ suite "Peer Manager": # Given the server is marked as not connected # (There's only one shard in the ENR so avg shards will be the same as the shard count; hence it will be purged.) - client.peerManager.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect + client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + CannotConnect # When pruning the client's store client.peerManager.prunePeerStore() @@ -303,7 +307,7 @@ suite "Peer Manager": # Then the stored protocols should be the default (libp2p) ones check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS asyncTest "Peer Protocol Support Verification (Before Connection)": # Given the server has mounted some Waku protocols @@ -316,7 +320,7 @@ suite "Peer Manager": # Then the stored protocols should include the Waku protocols check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec, WakuFilterSubscribeCodec] asyncTest "Service-Specific Peer Addition": @@ -342,10 +346,10 @@ suite "Peer Manager": # Then the peer store should contain both peers with the correct protocols check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuFilterSubscribeCodec] clientPeerStore.peerExists(server2PeerId) - clientPeerStore.get(server2PeerId).protocols == + clientPeerStore.getPeer(server2PeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec] # Cleanup @@ -537,16 +541,20 @@ suite "Peer Manager": # Then their connectedness should be NotConnected check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.NotConnected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.NotConnected + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.NotConnected + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.NotConnected # When connecting the client to the server await client.connectToNodes(@[serverRemotePeerInfo]) # Then both peers' connectedness should be Connected check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.Connected + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.Connected # When stopping the switches of either of the peers # (Running just one stop is enough to change the states in both peers, but I'll leave both calls as an example) @@ -555,8 +563,10 @@ suite "Peer Manager": # Then both peers are gracefully disconnected, and turned to CanConnect check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.CanConnect + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.CanConnect # When trying to connect those peers to a non-existent peer # Generate an invalid multiaddress, and patching both peerInfos with it so dialing fails @@ -572,9 +582,9 @@ suite "Peer Manager": # Then both peers should be marked as CannotConnect check: - clientPeerStore.get(serverPeerId).connectedness == + clientPeerStore.getPeer(serverPeerId).connectedness == Connectedness.CannotConnect - serverPeerStore.get(clientPeerId).connectedness == + serverPeerStore.getPeer(clientPeerId).connectedness == Connectedness.CannotConnect suite "Automatic Reconnection": @@ -585,29 +595,37 @@ suite "Peer Manager": await client.connectToNodes(@[serverRemotePeerInfo]) waitActive: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.Connected and + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.Connected await client.disconnectNode(serverRemotePeerInfo) waitActive: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.CanConnect and + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.CanConnect # When triggering the reconnection await client.peerManager.reconnectPeers(WakuRelayCodec) # Then both peers should be marked as Connected waitActive: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.Connected and + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.Connected ## Now let's do the same but with backoff period await client.disconnectNode(serverRemotePeerInfo) waitActive: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and - serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.CanConnect and + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.CanConnect # When triggering a reconnection with a backoff period let backoffPeriod = chronos.seconds(1) @@ -618,8 +636,10 @@ suite "Peer Manager": # Then both peers should be marked as Connected check: - clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected - serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected + clientPeerStore.getPeer(serverPeerId).connectedness == + Connectedness.Connected + serverPeerStore.getPeer(clientPeerId).connectedness == + Connectedness.Connected reconnectDurationWithBackoffPeriod > backoffPeriod.seconds.float suite "Handling Connections on Different Networks": @@ -694,8 +714,8 @@ suite "Persistence Check": client = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = clientPeerStorage ) - serverPeerStore = server.peerManager.peerStore - clientPeerStore = client.peerManager.peerStore + serverPeerStore = server.peerManager.wakuPeerStore + clientPeerStore = client.peerManager.wakuPeerStore await allFutures(server.start(), client.start()) @@ -711,7 +731,7 @@ suite "Persistence Check": newClient = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = newClientPeerStorage ) - newClientPeerStore = newClient.peerManager.peerStore + newClientPeerStore = newClient.peerManager.wakuPeerStore await newClient.start() @@ -736,8 +756,8 @@ suite "Persistence Check": client = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = clientPeerStorage ) - serverPeerStore = server.peerManager.peerStore - clientPeerStore = client.peerManager.peerStore + serverPeerStore = server.peerManager.wakuPeerStore + clientPeerStore = client.peerManager.wakuPeerStore await allFutures(server.start(), client.start()) @@ -756,8 +776,8 @@ suite "Persistence Check": clientKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, listenPort) client = newTestWakuNode(clientKey, listenIp, listenPort) - serverPeerStore = server.peerManager.peerStore - clientPeerStore = client.peerManager.peerStore + serverPeerStore = server.peerManager.wakuPeerStore + clientPeerStore = client.peerManager.wakuPeerStore await allFutures(server.start(), client.start()) @@ -772,13 +792,13 @@ suite "Mount Order": var client {.threadvar.}: WakuNode clientRemotePeerInfo {.threadvar.}: RemotePeerInfo - clientPeerStore {.threadvar.}: PeerStore + clientPeerStore {.threadvar.}: WakuPeerStore asyncSetup: let clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, listenIp, listenPort) - clientPeerStore = client.peerManager.peerStore + clientPeerStore = client.peerManager.wakuPeerStore await client.start() @@ -805,7 +825,7 @@ suite "Mount Order": # Then the peer store should contain the peer with the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec] # Cleanup @@ -829,7 +849,7 @@ suite "Mount Order": # Then the peer store should contain the peer with the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec] # Cleanup @@ -853,7 +873,7 @@ suite "Mount Order": # Then the peer store should contain the peer with the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec] # Cleanup @@ -877,7 +897,7 @@ suite "Mount Order": # Then the peer store should contain the peer with the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS & @[WakuRelayCodec] # Cleanup @@ -901,7 +921,7 @@ suite "Mount Order": # Then the peer store should contain the peer but not the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS # Cleanup await server.stop() @@ -924,7 +944,7 @@ suite "Mount Order": # Then the peer store should contain the peer but not the mounted protocol check: clientPeerStore.peerExists(serverPeerId) - clientPeerStore.get(serverPeerId).protocols == DEFAULT_PROTOCOLS + clientPeerStore.getPeer(serverPeerId).protocols == DEFAULT_PROTOCOLS # Cleanup await server.stop() diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index d71f186cac..31b04ebf43 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -50,10 +50,10 @@ procSuite "Peer Manager": check: connOk == true - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[1].peerInfo.peerId ) - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected asyncTest "dialPeer() works": @@ -80,13 +80,13 @@ procSuite "Peer Manager": # Check that node2 is being managed in node1 check: - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[1].peerInfo.peerId ) # Check connectedness check: - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected await allFutures(nodes.mapIt(it.stop())) @@ -141,12 +141,12 @@ procSuite "Peer Manager": # Check peers were successfully added to peer manager check: - node.peerManager.peerStore.peers().len == 2 - node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).allIt( + node.peerManager.wakuPeerStore.peers().len == 2 + node.peerManager.wakuPeerStore.peers(WakuFilterSubscribeCodec).allIt( it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and it.protocols.contains(WakuFilterSubscribeCodec) ) - node.peerManager.peerStore.peers(WakuStoreCodec).allIt( + node.peerManager.wakuPeerStore.peers(WakuStoreCodec).allIt( it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and it.protocols.contains(WakuStoreCodec) ) @@ -166,7 +166,7 @@ procSuite "Peer Manager": nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo()) check: # No information about node2's connectedness - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected # Failed connection @@ -183,7 +183,7 @@ procSuite "Peer Manager": check: # Cannot connect to node2 - nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nonExistentPeer.peerId) == CannotConnect # Successful connection @@ -194,13 +194,14 @@ procSuite "Peer Manager": check: # Currently connected to node2 - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected + nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + Connected # Stop node. Gracefully disconnect from all peers. await nodes[0].stop() check: # Not currently connected to node2, but had recent, successful connection. - nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == CanConnect await nodes[1].stop() @@ -231,11 +232,12 @@ procSuite "Peer Manager": let conn1Ok = await nodes[0].peerManager.connectRelay(nonExistentPeer) check: # Cannot connect to node2 - nodes[0].peerManager.peerStore.connectedness(nonExistentPeer.peerId) == + nodes[0].peerManager.wakuPeerStore.connectedness(nonExistentPeer.peerId) == CannotConnect - nodes[0].peerManager.peerStore[ConnectionBook][nonExistentPeer.peerId] == + nodes[0].peerManager.wakuPeerStore[ConnectionBook][nonExistentPeer.peerId] == CannotConnect - nodes[0].peerManager.peerStore[NumberFailedConnBook][nonExistentPeer.peerId] == 1 + nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nonExistentPeer.peerId] == + 1 # Connection attempt failed conn1Ok == false @@ -251,12 +253,14 @@ procSuite "Peer Manager": nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true # After a successful connection, the number of failed connections is reset - nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4 + nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = + 4 let conn2Ok = await nodes[0].peerManager.connectRelay(nodes[1].peerInfo.toRemotePeerInfo()) check: conn2Ok == true - nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0 + nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == + 0 await allFutures(nodes.mapIt(it.stop())) @@ -291,7 +295,7 @@ procSuite "Peer Manager": assert is12Connected == true, "Node 1 and 2 not connected" check: - node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == + node1.peerManager.wakuPeerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs # wait for the peer store update @@ -299,9 +303,9 @@ procSuite "Peer Manager": check: # Currently connected to node2 - node1.peerManager.peerStore.peers().len == 1 - node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.wakuPeerStore.peers().len == 1 + node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -317,9 +321,9 @@ procSuite "Peer Manager": check: # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() @@ -329,9 +333,9 @@ procSuite "Peer Manager": check: # Reconnected to node2 after "restart" - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -366,7 +370,7 @@ procSuite "Peer Manager": assert is12Connected == true, "Node 1 and 2 not connected" check: - node1.peerManager.peerStore[AddressBook][remotePeerInfo2.peerId] == + node1.peerManager.wakuPeerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs # wait for the peer store update @@ -374,9 +378,9 @@ procSuite "Peer Manager": check: # Currently connected to node2 - node1.peerManager.peerStore.peers().len == 1 - node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.wakuPeerStore.peers().len == 1 + node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -392,9 +396,9 @@ procSuite "Peer Manager": check: # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() @@ -404,9 +408,9 @@ procSuite "Peer Manager": check: # Reconnected to node2 after "restart" - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -494,12 +498,12 @@ procSuite "Peer Manager": (await node1.peerManager.connectRelay(peerInfo2.toRemotePeerInfo())) == true check: # Currently connected to node2 - node1.peerManager.peerStore.peers().len == 1 - node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.peerStore.peers().anyIt( + node1.peerManager.wakuPeerStore.peers().len == 1 + node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.wakuPeerStore.peers().anyIt( it.protocols.contains(node2.wakuRelay.codec) ) - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -516,20 +520,20 @@ procSuite "Peer Manager": node2.wakuRelay.codec == betaCodec node3.wakuRelay.codec == stableCodec # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(betaCodec)) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.start() # This should trigger a reconnect check: # Reconnected to node2 after "restart" - node3.peerManager.peerStore.peers().len == 1 - node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) - node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec)) - node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.wakuPeerStore.peers().len == 1 + node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(betaCodec)) + node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(stableCodec)) + node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -566,37 +570,40 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.peerStore.peers().len == 3 + nodes[0].peerManager.wakuPeerStore.peers().len == 3 # All peer ids are correct - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == - Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[1].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[2].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[3].switch.peerInfo.peerId + ] == Connected await allFutures(nodes.mapIt(it.stop())) @@ -633,37 +640,40 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.peerStore.peers().len == 3 + nodes[0].peerManager.wakuPeerStore.peers().len == 3 # All peer ids are correct - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == - Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[1].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[2].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[3].switch.peerInfo.peerId + ] == Connected await allFutures(nodes.mapIt(it.stop())) @@ -690,62 +700,65 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.peerStore.peers().len == 3 + nodes[0].peerManager.wakuPeerStore.peers().len == 3 # Inbound/Outbound number of peers match - nodes[0].peerManager.peerStore.getPeersByDirection(Inbound).len == 3 - nodes[0].peerManager.peerStore.getPeersByDirection(Outbound).len == 0 - nodes[1].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 - nodes[1].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 - nodes[2].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 - nodes[2].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 - nodes[3].peerManager.peerStore.getPeersByDirection(Inbound).len == 0 - nodes[3].peerManager.peerStore.getPeersByDirection(Outbound).len == 1 + nodes[0].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 3 + nodes[0].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 0 + nodes[1].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 + nodes[1].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 + nodes[2].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 + nodes[2].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 + nodes[3].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 + nodes[3].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 # All peer ids are correct - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.peerStore.peers().anyIt( + nodes[0].peerManager.wakuPeerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[2].switch.peerInfo.peerId] == - Connected - nodes[0].peerManager.peerStore[ConnectionBook][nodes[3].switch.peerInfo.peerId] == - Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[1].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[2].switch.peerInfo.peerId + ] == Connected + nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[3].switch.peerInfo.peerId + ] == Connected # All peers are Inbound in peer 0 - nodes[0].peerManager.peerStore[DirectionBook][nodes[1].switch.peerInfo.peerId] == + nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[1].switch.peerInfo.peerId] == Inbound - nodes[0].peerManager.peerStore[DirectionBook][nodes[2].switch.peerInfo.peerId] == + nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[2].switch.peerInfo.peerId] == Inbound - nodes[0].peerManager.peerStore[DirectionBook][nodes[3].switch.peerInfo.peerId] == + nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[3].switch.peerInfo.peerId] == Inbound # All peers have an Outbound connection with peer 0 - nodes[1].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == + nodes[1].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound - nodes[2].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == + nodes[2].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound - nodes[3].peerManager.peerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == + nodes[3].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == Outbound await allFutures(nodes.mapIt(it.stop())) @@ -775,12 +788,12 @@ procSuite "Peer Manager": # all peers are stored in the peerstore check: - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[0].peerId) - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[1].peerId) - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[2].peerId) + node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[0].peerId) + node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[1].peerId) + node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[2].peerId) # but the relay peer is not - node.peerManager.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == false + node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[3].peerId) == false # all service peers are added to its service slot check: @@ -897,8 +910,8 @@ procSuite "Peer Manager": peers.len == 3 # Add a peer[0] to the peerstore - pm.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs - pm.peerStore[ProtoBook][peers[0].peerId] = + pm.wakuPeerStore[AddressBook][peers[0].peerId] = peers[0].addrs + pm.wakuPeerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec] # When no service peers, we get one from the peerstore @@ -977,36 +990,36 @@ procSuite "Peer Manager": # Check that we have 15 peers in the peerstore check: - pm.peerStore.peers.len == 15 + pm.wakuPeerStore.peers.len == 15 # fake that some peers failed to connected - pm.peerStore[NumberFailedConnBook][peers[0].peerId] = 2 - pm.peerStore[NumberFailedConnBook][peers[1].peerId] = 2 - pm.peerStore[NumberFailedConnBook][peers[2].peerId] = 2 + pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2 + pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2 + pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2 # fake that some peers are connected - pm.peerStore[ConnectionBook][peers[5].peerId] = Connected - pm.peerStore[ConnectionBook][peers[8].peerId] = Connected - pm.peerStore[ConnectionBook][peers[10].peerId] = Connected - pm.peerStore[ConnectionBook][peers[12].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[10].peerId] = Connected + pm.wakuPeerStore[ConnectionBook][peers[12].peerId] = Connected # Prune the peerstore (current=15, target=5) pm.prunePeerStore() check: # ensure peerstore was pruned - pm.peerStore.peers.len == 10 + pm.wakuPeerStore.peers.len == 10 # ensure connected peers were not pruned - pm.peerStore.peers.anyIt(it.peerId == peers[5].peerId) - pm.peerStore.peers.anyIt(it.peerId == peers[8].peerId) - pm.peerStore.peers.anyIt(it.peerId == peers[10].peerId) - pm.peerStore.peers.anyIt(it.peerId == peers[12].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[10].peerId) + pm.wakuPeerStore.peers.anyIt(it.peerId == peers[12].peerId) # ensure peers that failed were the first to be pruned - not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId) - not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId) - not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId) + not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId) + not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId) + not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId) asyncTest "canBeConnected() returns correct value": let pm = PeerManager.new( @@ -1033,8 +1046,8 @@ procSuite "Peer Manager": pm.canBeConnected(p1) == true # peer with ONE error that just failed - pm.peerStore[NumberFailedConnBook][p1] = 1 - pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + pm.wakuPeerStore[NumberFailedConnBook][p1] = 1 + pm.wakuPeerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) # we cant connect right now check: pm.canBeConnected(p1) == false @@ -1045,8 +1058,8 @@ procSuite "Peer Manager": pm.canBeConnected(p1) == true # peer with TWO errors, we can connect until 2 seconds have passed - pm.peerStore[NumberFailedConnBook][p1] = 2 - pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + pm.wakuPeerStore[NumberFailedConnBook][p1] = 2 + pm.wakuPeerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) # cant be connected after 1 second await sleepAsync(chronos.milliseconds(1000)) @@ -1146,6 +1159,6 @@ procSuite "Peer Manager": check: nodes[0].peerManager.ipTable["127.0.0.1"].len == 1 nodes[0].peerManager.switch.connManager.getConnections().len == 1 - nodes[0].peerManager.peerStore.peers().len == 1 + nodes[0].peerManager.wakuPeerStore.peers().len == 1 await allFutures(nodes.mapIt(it.stop())) diff --git a/tests/test_peer_store_extended.nim b/tests/test_peer_store_extended.nim index e06630364e..ef03fc69ac 100644 --- a/tests/test_peer_store_extended.nim +++ b/tests/test_peer_store_extended.nim @@ -9,7 +9,12 @@ import libp2p/multiaddress, testutils/unittests import - waku/[node/peer_manager/peer_manager, node/peer_manager/waku_peer_store, waku_node], + waku/[ + node/peer_manager/peer_manager, + node/peer_manager/waku_peer_store, + waku_node, + waku_core/peers, + ], ./testlib/wakucore suite "Extended nim-libp2p Peer Store": @@ -20,7 +25,7 @@ suite "Extended nim-libp2p Peer Store": setup: # Setup a nim-libp2p peerstore with some peers - let peerStore = PeerStore.new(nil, capacity = 50) + let peerStore = WakuPeerStore.new(nil, capacity = 50) var p1, p2, p3, p4, p5, p6: PeerId # create five peers basePeerId + [1-5] @@ -33,76 +38,100 @@ suite "Extended nim-libp2p Peer Store": # peer6 is not part of the peerstore require p6.init(basePeerId & "6") - # Peer1: Connected - peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] - peerStore[ProtoBook][p1] = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"] - peerStore[KeyBook][p1] = generateEcdsaKeyPair().pubkey - peerStore[AgentBook][p1] = "nwaku" - peerStore[ProtoVersionBook][p1] = "protoVersion1" - peerStore[ConnectionBook][p1] = Connected - peerStore[DisconnectBook][p1] = 0 - peerStore[SourceBook][p1] = Discv5 - peerStore[DirectionBook][p1] = Inbound - peerStore[NumberFailedConnBook][p1] = 1 - peerStore[LastFailedConnBook][p1] = Moment.init(1001, Second) + # Peer1: Connected + peerStore.addPeer( + RemotePeerInfo.init( + peerId = p1, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()], + protocols = @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "nwaku", + protoVersion = "protoVersion1", + connectedness = Connected, + disconnectTime = 0, + origin = Discv5, + direction = Inbound, + lastFailedConn = Moment.init(1001, Second), + numberFailedConn = 1, + ) + ) # Peer2: Connected - peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()] - peerStore[ProtoBook][p2] = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"] - peerStore[KeyBook][p2] = generateEcdsaKeyPair().pubkey - peerStore[AgentBook][p2] = "nwaku" - peerStore[ProtoVersionBook][p2] = "protoVersion2" - peerStore[ConnectionBook][p2] = Connected - peerStore[DisconnectBook][p2] = 0 - peerStore[SourceBook][p2] = Discv5 - peerStore[DirectionBook][p2] = Inbound - peerStore[NumberFailedConnBook][p2] = 2 - peerStore[LastFailedConnBook][p2] = Moment.init(1002, Second) + peerStore.addPeer( + RemotePeerInfo.init( + peerId = p2, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()], + protocols = @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "nwaku", + protoVersion = "protoVersion2", + connectedness = Connected, + disconnectTime = 0, + origin = Discv5, + direction = Inbound, + lastFailedConn = Moment.init(1002, Second), + numberFailedConn = 2, + ) + ) # Peer3: Connected - peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] - peerStore[ProtoBook][p3] = - @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"] - peerStore[KeyBook][p3] = generateEcdsaKeyPair().pubkey - peerStore[AgentBook][p3] = "gowaku" - peerStore[ProtoVersionBook][p3] = "protoVersion3" - peerStore[ConnectionBook][p3] = Connected - peerStore[DisconnectBook][p3] = 0 - peerStore[SourceBook][p3] = Discv5 - peerStore[DirectionBook][p3] = Inbound - peerStore[NumberFailedConnBook][p3] = 3 - peerStore[LastFailedConnBook][p3] = Moment.init(1003, Second) + peerStore.addPeer( + RemotePeerInfo.init( + peerId = p3, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()], + protocols = @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "gowaku", + protoVersion = "protoVersion3", + connectedness = Connected, + disconnectTime = 0, + origin = Discv5, + direction = Inbound, + lastFailedConn = Moment.init(1003, Second), + numberFailedConn = 3, + ) + ) # Peer4: Added but never connected - peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()] - # unknown: peerStore[ProtoBook][p4] - peerStore[KeyBook][p4] = generateEcdsaKeyPair().pubkey - # unknown: peerStore[AgentBook][p4] - # unknown: peerStore[ProtoVersionBook][p4] - peerStore[ConnectionBook][p4] = NotConnected - peerStore[DisconnectBook][p4] = 0 - peerStore[SourceBook][p4] = Discv5 - peerStore[DirectionBook][p4] = Inbound - peerStore[NumberFailedConnBook][p4] = 4 - peerStore[LastFailedConnBook][p4] = Moment.init(1004, Second) - - # Peer5: Connecteed in the past - peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()] - peerStore[ProtoBook][p5] = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"] - peerStore[KeyBook][p5] = generateEcdsaKeyPair().pubkey - peerStore[AgentBook][p5] = "gowaku" - peerStore[ProtoVersionBook][p5] = "protoVersion5" - peerStore[ConnectionBook][p5] = CanConnect - peerStore[DisconnectBook][p5] = 1000 - peerStore[SourceBook][p5] = Discv5 - peerStore[DirectionBook][p5] = Outbound - peerStore[NumberFailedConnBook][p5] = 5 - peerStore[LastFailedConnBook][p5] = Moment.init(1005, Second) + peerStore.addPeer( + RemotePeerInfo.init( + peerId = p4, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()], + protocols = @[], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "", + protoVersion = "", + connectedness = NotConnected, + disconnectTime = 0, + origin = Discv5, + direction = Inbound, + lastFailedConn = Moment.init(1004, Second), + numberFailedConn = 4, + ) + ) + + # Peer5: Connected + peerStore.addPeer( + RemotePeerInfo.init( + peerId = p5, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()], + protocols = @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "gowaku", + protoVersion = "protoVersion5", + connectedness = CanConnect, + disconnectTime = 1000, + origin = Discv5, + direction = Outbound, + lastFailedConn = Moment.init(1005, Second), + numberFailedConn = 5, + ) + ) test "get() returns the correct StoredInfo for a given PeerId": # When - let peer1 = peerStore.get(p1) - let peer6 = peerStore.get(p6) + let peer1 = peerStore.getPeer(p1) + let peer6 = peerStore.getPeer(p6) # Then check: @@ -213,7 +242,7 @@ suite "Extended nim-libp2p Peer Store": test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo": # Given - let peer1 = peerStore.get(p1) + let peer1 = peerStore.getPeer(p1) # Then check: @@ -278,9 +307,9 @@ suite "Extended nim-libp2p Peer Store": inPeers.len == 4 outPeers.len == 1 - test "getNotConnectedPeers()": + test "getDisconnectedPeers()": # When - let disconnedtedPeers = peerStore.getNotConnectedPeers() + let disconnedtedPeers = peerStore.getDisconnectedPeers() # Then check: @@ -291,23 +320,29 @@ suite "Extended nim-libp2p Peer Store": test "del() successfully deletes waku custom books": # Given - let peerStore = PeerStore.new(nil, capacity = 5) + let peerStore = WakuPeerStore.new(nil, capacity = 5) var p1: PeerId - require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") - peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] - peerStore[ProtoBook][p1] = @["proto"] - peerStore[KeyBook][p1] = generateEcdsaKeyPair().pubkey - peerStore[AgentBook][p1] = "agent" - peerStore[ProtoVersionBook][p1] = "version" - peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) - peerStore[NumberFailedConnBook][p1] = 1 - peerStore[ConnectionBook][p1] = Connected - peerStore[DisconnectBook][p1] = 0 - peerStore[SourceBook][p1] = Discv5 - peerStore[DirectionBook][p1] = Inbound + require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW1") + + let remotePeer = RemotePeerInfo.init( + peerId = p1, + addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()], + protocols = @["proto"], + publicKey = generateEcdsaKeyPair().pubkey, + agent = "agent", + protoVersion = "version", + lastFailedConn = Moment.init(getTime().toUnix, Second), + numberFailedConn = 1, + connectedness = Connected, + disconnectTime = 0, + origin = Discv5, + direction = Inbound, + ) + + peerStore.addPeer(remotePeer) # When - peerStore.del(p1) + peerStore.delete(p1) # Then check: diff --git a/tests/test_waku_dnsdisc.nim b/tests/test_waku_dnsdisc.nim index 10f7cf59fe..4040bea8f2 100644 --- a/tests/test_waku_dnsdisc.nim +++ b/tests/test_waku_dnsdisc.nim @@ -94,20 +94,20 @@ suite "Waku DNS Discovery": check: # We have successfully connected to all discovered nodes - node4.peerManager.peerStore.peers().anyIt( + node4.peerManager.wakuPeerStore.peers().anyIt( it.peerId == node1.switch.peerInfo.peerId ) - node4.peerManager.peerStore.connectedness(node1.switch.peerInfo.peerId) == + node4.peerManager.wakuPeerStore.connectedness(node1.switch.peerInfo.peerId) == Connected - node4.peerManager.peerStore.peers().anyIt( + node4.peerManager.wakuPeerStore.peers().anyIt( it.peerId == node2.switch.peerInfo.peerId ) - node4.peerManager.peerStore.connectedness(node2.switch.peerInfo.peerId) == + node4.peerManager.wakuPeerStore.connectedness(node2.switch.peerInfo.peerId) == Connected - node4.peerManager.peerStore.peers().anyIt( + node4.peerManager.wakuPeerStore.peers().anyIt( it.peerId == node3.switch.peerInfo.peerId ) - node4.peerManager.peerStore.connectedness(node3.switch.peerInfo.peerId) == + node4.peerManager.wakuPeerStore.connectedness(node3.switch.peerInfo.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()]) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 87506e7a4c..0b53129277 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -73,7 +73,7 @@ const type PeerManager* = ref object of RootObj switch*: Switch - peerStore*: PeerStore + wakuPeerStore*: WakuPeerStore wakuMetadata*: WakuMetadata initialBackoffInSec*: int backoffFactor*: int @@ -125,16 +125,16 @@ proc addPeer*( trace "skipping to manage our unmanageable self" return - if pm.peerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.peerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and - pm.peerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: + if pm.wakuPeerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and + pm.wakuPeerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and + pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: let incomingEnr = remotePeerInfo.enr.valueOr: trace "peer already managed and incoming ENR is empty", remote_peer_id = $remotePeerInfo.peerId return - if pm.peerStore[ENRBook][remotePeerInfo.peerId].raw == incomingEnr.raw or - pm.peerStore[ENRBook][remotePeerInfo.peerId].seqNum > incomingEnr.seqNum: + if pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].raw == incomingEnr.raw or + pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].seqNum > incomingEnr.seqNum: trace "peer already managed and ENR info is already saved", remote_peer_id = $remotePeerInfo.peerId return @@ -144,21 +144,22 @@ proc addPeer*( waku_total_unique_peers.inc() - pm.peerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.peerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey - pm.peerStore[SourceBook][remotePeerInfo.peerId] = origin - pm.peerStore[ProtoVersionBook][remotePeerInfo.peerId] = remotePeerInfo.protoVersion - pm.peerStore[AgentBook][remotePeerInfo.peerId] = remotePeerInfo.agent + pm.wakuPeerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs + pm.wakuPeerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey + pm.wakuPeerStore[SourceBook][remotePeerInfo.peerId] = origin + pm.wakuPeerStore[ProtoVersionBook][remotePeerInfo.peerId] = + remotePeerInfo.protoVersion + pm.wakuPeerStore[AgentBook][remotePeerInfo.peerId] = remotePeerInfo.agent if remotePeerInfo.protocols.len > 0: - pm.peerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols + pm.wakuPeerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols if remotePeerInfo.enr.isSome(): - pm.peerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() + pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: - # Reading from the db (pm.storage) is only done on startup, hence you need to connect to all saved peers. + # Reading from the db (pm.storage) is only done on startup, hence you need to connect to all saved peers. # `remotePeerInfo.connectedness` should already be `NotConnected`, but both we reset it to `NotConnected` just in case. # This reset is also done when reading from storage, I believe, to ensure the `connectedness` state is the correct one. # So many resets are likely redudant, but I haven't verified whether this is the case or not. @@ -181,10 +182,10 @@ proc connectRelay*( if peerId == pm.switch.peerInfo.peerId: return false - if not pm.peerStore.hasPeer(peerId, WakuRelayCodec): + if not pm.wakuPeerStore.hasPeer(peerId, WakuRelayCodec): pm.addPeer(peer) - let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + let failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] trace "Connecting to relay peer", wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts @@ -208,20 +209,20 @@ proc connectRelay*( waku_peers_dials.inc(labelValues = ["successful"]) waku_node_conns_initiated.inc(labelValues = [source]) - pm.peerStore[NumberFailedConnBook][peerId] = 0 + pm.wakuPeerStore[NumberFailedConnBook][peerId] = 0 return true # Dial failed - pm.peerStore[NumberFailedConnBook][peerId] = - pm.peerStore[NumberFailedConnBook][peerId] + 1 - pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) - pm.peerStore[ConnectionBook][peerId] = CannotConnect + pm.wakuPeerStore[NumberFailedConnBook][peerId] = + pm.wakuPeerStore[NumberFailedConnBook][peerId] + 1 + pm.wakuPeerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) + pm.wakuPeerStore[ConnectionBook][peerId] = CannotConnect trace "Connecting relay peer failed", peerId = peerId, reason = reasonFailed, - failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] waku_peers_dials.inc(labelValues = [reasonFailed]) return false @@ -288,19 +289,19 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = version = remotePeerInfo.protoVersion # nim-libp2p books - pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs - pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols - pm.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey - pm.peerStore[AgentBook][peerId] = remotePeerInfo.agent - pm.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion + pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs + pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols + pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey + pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent + pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion # custom books - pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state - pm.peerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime - pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin + pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state + pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime + pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin if remotePeerInfo.enr.isSome(): - pm.peerStore[ENRBook][peerId] = remotePeerInfo.enr.get() + pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get() amount.inc() @@ -315,7 +316,7 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = # Returns if we can try to connect to this peer, based on past failed attempts # It uses an exponential backoff. Each connection attempt makes us # wait more before trying again. - let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + let failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] # if it never errored, we can try to connect if failedAttempts == 0: @@ -328,7 +329,7 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = # If it errored we wait an exponential backoff from last connection # the more failed attempts, the greater the backoff since last attempt let now = Moment.init(getTime().toUnix, Second) - let lastFailed = pm.peerStore[LastFailedConnBook][peerId] + let lastFailed = pm.wakuPeerStore[LastFailedConnBook][peerId] let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) @@ -387,7 +388,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = break guardClauses if ( - pm.peerStore.hasPeer(peerId, WakuRelayCodec) and + pm.wakuPeerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]" @@ -401,7 +402,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = info "disconnecting from peer", peerId = peerId, reason = reason asyncSpawn(pm.switch.disconnect(peerId)) - pm.peerStore.delete(peerId) + pm.wakuPeerStore.delete(peerId) # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = @@ -427,7 +428,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip asyncSpawn(pm.switch.disconnect(peerId)) - pm.peerStore.delete(peerId) + pm.wakuPeerStore.delete(peerId) of Left: direction = UnknownDirection connectedness = CanConnect @@ -442,11 +443,11 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = of Identified: debug "event identified", peerId = peerId - pm.peerStore[ConnectionBook][peerId] = connectedness - pm.peerStore[DirectionBook][peerId] = direction + pm.wakuPeerStore[ConnectionBook][peerId] = connectedness + pm.wakuPeerStore[DirectionBook][peerId] = direction if not pm.storage.isNil: - var remotePeerInfo = pm.peerStore.get(peerId) + var remotePeerInfo = pm.wakuPeerStore.getPeer(peerId) if event.kind == PeerEventKind.Left: remotePeerInfo.disconnectTime = getTime().toUnix @@ -503,7 +504,7 @@ proc new*( let pm = PeerManager( switch: switch, wakuMetadata: wakuMetadata, - peerStore: switch.peerStore, + wakuPeerStore: createWakuPeerStore(switch.peerStore), storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, @@ -522,7 +523,7 @@ proc new*( onPeerEvent(pm, peerId, event) proc peerStoreChanged(peerId: PeerId) {.gcsafe.} = - waku_peer_store_size.set(toSeq(pm.peerStore[AddressBook].book.keys).len.int64) + waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64) # currently disabled #pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected) @@ -532,7 +533,7 @@ proc new*( pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left) # called every time the peerstore is updated - pm.peerStore[AddressBook].addHandler(peerStoreChanged) + pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged) pm.serviceSlots = initTable[string, RemotePeerInfo]() pm.ipTable = initTable[string, seq[PeerId]]() @@ -580,7 +581,7 @@ proc dialPeer*( # First add dialed peer info to peer store, if it does not exist yet.. # TODO: nim libp2p peerstore already adds them - if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto): + if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto): trace "Adding newly dialed peer to manager", peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto pm.addPeer(remotePeerInfo) @@ -658,7 +659,7 @@ proc reconnectPeers*( debug "Reconnecting peers", proto = proto # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.peerStore.peers(protocolMatcher(proto)): + for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)): # Check that the peer can be connected if peerInfo.connectedness == CannotConnect: error "Not reconnecting to unreachable or non-existing peer", @@ -728,7 +729,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = if outRelayPeers.len >= pm.outRelayPeersTarget: return - let notConnectedPeers = pm.peerStore.getNotConnectedPeers() + let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers() var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) @@ -765,11 +766,11 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = for shard in pm.wakuMetadata.shards.items: # Filter out peer not on this shard let connectedInPeers = inPeers.filterIt( - pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) + pm.wakuPeerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) ) let connectedOutPeers = outPeers.filterIt( - pm.peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) + pm.wakuPeerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) ) # Calculate the difference between current values and targets @@ -784,17 +785,17 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = # Get all peers for this shard var connectablePeers = - pm.peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard)) + pm.wakuPeerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard)) let shardCount = connectablePeers.len connectablePeers.keepItIf( - not pm.peerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId) + not pm.wakuPeerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId) ) let connectableCount = connectablePeers.len - connectablePeers.keepItIf(pm.peerStore.hasCapability(it.peerId, Relay)) + connectablePeers.keepItIf(pm.wakuPeerStore.hasCapability(it.peerId, Relay)) let relayCount = connectablePeers.len @@ -818,7 +819,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = if peersToConnect.len == 0: return - let uniquePeers = toSeq(peersToConnect).mapIt(pm.peerStore.get(it)) + let uniquePeers = toSeq(peersToConnect).mapIt(pm.wakuPeerStore.getPeer(it)) # Connect to all nodes for i in countup(0, uniquePeers.len, MaxParallelDials): @@ -827,8 +828,8 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = await pm.connectToNodes(uniquePeers[i ..< stop]) proc prunePeerStore*(pm: PeerManager) = - let numPeers = pm.peerStore[AddressBook].book.len - let capacity = pm.peerStore.capacity + let numPeers = pm.wakuPeerStore[AddressBook].book.len + let capacity = pm.wakuPeerStore.getCapacity() if numPeers <= capacity: return @@ -837,7 +838,7 @@ proc prunePeerStore*(pm: PeerManager) = var peersToPrune: HashSet[PeerId] # prune failed connections - for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs: + for peerId, count in pm.wakuPeerStore[NumberFailedConnBook].book.pairs: if count < pm.maxFailedAttempts: continue @@ -846,7 +847,7 @@ proc prunePeerStore*(pm: PeerManager) = peersToPrune.incl(peerId) - var notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId) + var notConnected = pm.wakuPeerStore.getDisconnectedPeers().mapIt(it.peerId) # Always pick random non-connected peers shuffle(notConnected) @@ -855,11 +856,11 @@ proc prunePeerStore*(pm: PeerManager) = var peersByShard = initTable[uint16, seq[PeerId]]() for peer in notConnected: - if not pm.peerStore[ENRBook].contains(peer): + if not pm.wakuPeerStore[ENRBook].contains(peer): shardlessPeers.add(peer) continue - let record = pm.peerStore[ENRBook][peer] + let record = pm.wakuPeerStore[ENRBook][peer] let rec = record.toTyped().valueOr: shardlessPeers.add(peer) @@ -893,9 +894,9 @@ proc prunePeerStore*(pm: PeerManager) = peersToPrune.incl(peer) for peer in peersToPrune: - pm.peerStore.delete(peer) + pm.wakuPeerStore.delete(peer) - let afterNumPeers = pm.peerStore[AddressBook].book.len + let afterNumPeers = pm.wakuPeerStore[AddressBook].book.len trace "Finished pruning peer store", beforeNumPeers = numPeers, @@ -909,7 +910,7 @@ proc selectPeer*( trace "Selecting peer from peerstore", protocol = proto # Selects the best peer for a given protocol - var peers = pm.peerStore.getPeersByProtocol(proto) + var peers = pm.wakuPeerStore.getPeersByProtocol(proto) if shard.isSome(): peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get()))) @@ -957,7 +958,7 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) excessInConns = max(inRelayPeers.len - pm.inRelayPeersTarget, 0) - # One minus the percentage of excess connections relative to the target, limited to 100% + # One minus the percentage of excess connections relative to the target, limited to 100% # We calculate one minus this percentage because we want the factor to be inversely proportional to the number of excess peers inFactor = 1 - min(excessInConns / pm.inRelayPeersTarget, 1) # Percentage of out relay peers relative to the target @@ -974,7 +975,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = # log metrics let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let maxConnections = pm.switch.connManager.inSema.size - let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt( + let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt( RemotePeerInfo.init(it.peerId, it.addrs) ) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) @@ -988,7 +989,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = outsideBackoffPeers = outsideBackoffPeers.len # update prometheus metrics - for proto in pm.peerStore.getWakuProtos(): + for proto in pm.wakuPeerStore.getWakuProtos(): let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto) let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) waku_connected_peers.set( diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 09d6ebc658..8548711741 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -16,14 +16,16 @@ import export peerstore, builders type + WakuPeerStore* = ref object + peerStore: PeerStore # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] - # Last failed connection attemp timestamp + # Keeps track of the timestamp of the last failed connection attempt LastFailedConnBook* = ref object of PeerBook[Moment] - # Failed connection attempts + # Keeps track of the number of failed connection attempts NumberFailedConnBook* = ref object of PeerBook[int] # Keeps track of when peers were disconnected in Unix timestamps @@ -32,126 +34,142 @@ type # Keeps track of the origin of a peer SourceBook* = ref object of PeerBook[PeerOrigin] - # Direction + # Keeps track of the direction of a peer connection DirectionBook* = ref object of PeerBook[PeerDirection] - # ENR Book + # Keeps track of the ENR (Ethereum Node Record) of a peer ENRBook* = ref object of PeerBook[enr.Record] -################## -# Peer Store API # -################## +# Constructor +proc new*(T: type WakuPeerStore, identify: Identify, capacity = 1000): WakuPeerStore = + let peerStore = PeerStore.new(identify, capacity) + WakuPeerStore(peerStore: peerStore) -proc delete*(peerStore: PeerStore, peerId: PeerId) = - # Delete all the information of a given peer. - peerStore.del(peerId) +proc createWakuPeerStore*(peerStore: PeerStore): WakuPeerStore = + WakuPeerStore(peerStore: peerStore) + +# Core functionality +proc `[]`*(wps: WakuPeerStore, T: typedesc): T = + wps.peerStore[T] -proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo = - ## Get the stored information of a given peer. +proc getPeer*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo = RemotePeerInfo( peerId: peerId, - addrs: peerStore[AddressBook][peerId], + addrs: wps[AddressBook][peerId], enr: - if peerStore[ENRBook][peerId] != default(enr.Record): - some(peerStore[ENRBook][peerId]) + if wps[ENRBook][peerId] != default(enr.Record): + some(wps[ENRBook][peerId]) else: none(enr.Record), - protocols: peerStore[ProtoBook][peerId], - agent: peerStore[AgentBook][peerId], - protoVersion: peerStore[ProtoVersionBook][peerId], - publicKey: peerStore[KeyBook][peerId], - - # Extended custom fields - connectedness: peerStore[ConnectionBook][peerId], - disconnectTime: peerStore[DisconnectBook][peerId], - origin: peerStore[SourceBook][peerId], - direction: peerStore[DirectionBook][peerId], - lastFailedConn: peerStore[LastFailedConnBook][peerId], - numberFailedConn: peerStore[NumberFailedConnBook][peerId], + protocols: wps[ProtoBook][peerId], + agent: wps[AgentBook][peerId], + protoVersion: wps[ProtoVersionBook][peerId], + publicKey: wps[KeyBook][peerId], + connectedness: wps[ConnectionBook][peerId], + disconnectTime: wps[DisconnectBook][peerId], + origin: wps[SourceBook][peerId], + direction: wps[DirectionBook][peerId], + lastFailedConn: wps[LastFailedConnBook][peerId], + numberFailedConn: wps[NumberFailedConnBook][peerId], ) -proc getWakuProtos*(peerStore: PeerStore): seq[string] = - ## Get the waku protocols of all the stored peers. - let wakuProtocols = toSeq(peerStore[ProtoBook].book.values()) - .flatten() - .deduplicate() - .filterIt(it.startsWith("/vac/waku")) - return wakuProtocols +proc addPeer*(wps: WakuPeerStore, peer: RemotePeerInfo) = + wps[AddressBook][peer.peerId] = peer.addrs + wps[ProtoBook][peer.peerId] = peer.protocols + wps[AgentBook][peer.peerId] = peer.agent + wps[ProtoVersionBook][peer.peerId] = peer.protoVersion + wps[KeyBook][peer.peerId] = peer.publicKey + wps[ConnectionBook][peer.peerId] = peer.connectedness + wps[DisconnectBook][peer.peerId] = peer.disconnectTime + wps[SourceBook][peer.peerId] = peer.origin + wps[DirectionBook][peer.peerId] = peer.direction + wps[LastFailedConnBook][peer.peerId] = peer.lastFailedConn + wps[NumberFailedConnBook][peer.peerId] = peer.numberFailedConn + if peer.enr.isSome(): + wps[ENRBook][peer.peerId] = peer.enr.get() + +proc delete*(wps: WakuPeerStore, peerId: PeerId) = + # Delete all the information of a given peer. + wps.peerStore.del(peerId) # TODO: Rename peers() to getPeersByProtocol() -proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = - ## Get all the stored information of every peer. +proc peers*(wps: WakuPeerStore): seq[RemotePeerInfo] = let allKeys = concat( - toSeq(peerStore[AddressBook].book.keys()), - toSeq(peerStore[ProtoBook].book.keys()), - toSeq(peerStore[KeyBook].book.keys()), + toSeq(wps[AddressBook].book.keys()), + toSeq(wps[ProtoBook].book.keys()), + toSeq(wps[KeyBook].book.keys()), ) .toHashSet() - return allKeys.mapIt(peerStore.get(it)) + return allKeys.mapIt(wps.getPeer(it)) + +proc peers*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] = + wps.peers().filterIt(it.protocols.contains(proto)) -proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = - # Return the known info for all peers registered on the specified protocol - peerStore.peers.filterIt(it.protocols.contains(proto)) +proc peers*(wps: WakuPeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = + wps.peers().filterIt(it.protocols.anyIt(protocolMatcher(it))) -proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = - # Return the known info for all peers matching the provided protocolMatcher - peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it))) +proc connectedness*(wps: WakuPeerStore, peerId: PeerId): Connectedness = + wps[ConnectionBook].book.getOrDefault(peerId, NotConnected) -proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = - peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) +proc hasShard*(wps: WakuPeerStore, peerId: PeerID, cluster, shard: uint16): bool = + wps[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) -proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool = - peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) +proc hasCapability*(wps: WakuPeerStore, peerId: PeerID, cap: Capabilities): bool = + wps[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) -proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool = - peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) +proc peerExists*(wps: WakuPeerStore, peerId: PeerId): bool = + wps[AddressBook].contains(peerId) -proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = +proc isConnected*(wps: WakuPeerStore, peerId: PeerID): bool = # Returns `true` if the peer is connected - peerStore.connectedness(peerId) == Connected + wps.connectedness(peerId) == Connected -proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = +proc hasPeer*(wps: WakuPeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol - # TODO: What if peer does not exist in the peerStore? - peerStore.get(peerId).protocols.contains(proto) + # TODO: What if peer does not exist in the wps? + wps.getPeer(peerId).protocols.contains(proto) -proc hasPeers*(peerStore: PeerStore, proto: string): bool = +proc hasPeers*(wps: WakuPeerStore, proto: string): bool = # Returns `true` if the peerstore has any peer for the specified protocol - toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) + toSeq(wps[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) -proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = +proc hasPeers*(wps: WakuPeerStore, protocolMatcher: Matcher): bool = # Returns `true` if the peerstore has any peer matching the protocolMatcher - toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) + toSeq(wps[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) + +proc getCapacity*(wps: WakuPeerStore): int = + wps.peerStore.capacity + +proc setCapacity*(wps: WakuPeerStore, capacity: int) = + wps.peerStore.capacity = capacity + +proc getWakuProtos*(wps: WakuPeerStore): seq[string] = + toSeq(wps[ProtoBook].book.values()).flatten().deduplicate().filterIt( + it.startsWith("/vac/waku") + ) proc getPeersByDirection*( - peerStore: PeerStore, direction: PeerDirection + wps: WakuPeerStore, direction: PeerDirection ): seq[RemotePeerInfo] = - return peerStore.peers.filterIt(it.direction == direction) + return wps.peers.filterIt(it.direction == direction) -proc getNotConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = - return peerStore.peers.filterIt(it.connectedness != Connected) +proc getDisconnectedPeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = + return wps.peers.filterIt(it.connectedness != Connected) -proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = - return peerStore.peers.filterIt(it.connectedness == Connected) +proc getConnectedPeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = + return wps.peers.filterIt(it.connectedness == Connected) -proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = - return peerStore.peers.filterIt(it.protocols.contains(proto)) +proc getPeersByProtocol*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] = + return wps.peers.filterIt(it.protocols.contains(proto)) -proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] = - return peerStore.peers.filterIt( - it.connectedness == CanConnect or it.connectedness == Connected - ) - -proc getPeersByShard*( - peerStore: PeerStore, cluster, shard: uint16 -): seq[RemotePeerInfo] = - return peerStore.peers.filterIt( - it.enr.isSome() and it.enr.get().containsShard(cluster, shard) - ) +proc getReachablePeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = + return + wps.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected) -proc getPeersByCapability*( - peerStore: PeerStore, cap: Capabilities -): seq[RemotePeerInfo] = +proc getPeersByShard*(wps: WakuPeerStore, cluster, shard: uint16): seq[RemotePeerInfo] = return - peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) + wps.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) + +proc getPeersByCapability*(wps: WakuPeerStore, cap: Capabilities): seq[RemotePeerInfo] = + return wps.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ca10ca799b..9996e6c138 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -426,7 +426,7 @@ proc startRelay*(node: WakuNode) {.async.} = ## Setup relay protocol # Resume previous relay connections - if node.peerManager.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): + if node.peerManager.wakuPeerStore.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." # Reconnect to previous relay peers. This will respect a backoff period, if necessary @@ -1247,7 +1247,7 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = # First get a list of connected peer infos let peers = - node.peerManager.peerStore.peers().filterIt(it.connectedness == Connected) + node.peerManager.wakuPeerStore.peers().filterIt(it.connectedness == Connected) for peer in peers: try: diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 9570e413a4..ebdcc8ef63 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -42,7 +42,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse: var peers: WakuPeers = @[] - let relayPeers = node.peerManager.peerStore.peers(WakuRelayCodec).mapIt( + let relayPeers = node.peerManager.wakuPeerStore.peers(WakuRelayCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuRelayCodec, @@ -52,7 +52,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, relayPeers) - let filterV2Peers = node.peerManager.peerStore.peers(WakuFilterSubscribeCodec).mapIt( + let filterV2Peers = node.peerManager.wakuPeerStore + .peers(WakuFilterSubscribeCodec) + .mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuFilterSubscribeCodec, @@ -62,7 +64,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, filterV2Peers) - let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec).mapIt( + let storePeers = node.peerManager.wakuPeerStore.peers(WakuStoreCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuStoreCodec, @@ -72,7 +74,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, storePeers) - let legacyStorePeers = node.peerManager.peerStore.peers(WakuLegacyStoreCodec).mapIt( + let legacyStorePeers = node.peerManager.wakuPeerStore + .peers(WakuLegacyStoreCodec) + .mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuLegacyStoreCodec, @@ -82,7 +86,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, legacyStorePeers) - let lightpushPeers = node.peerManager.peerStore.peers(WakuLightPushCodec).mapIt( + let lightpushPeers = node.peerManager.wakuPeerStore.peers(WakuLightPushCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuLightPushCodec, @@ -92,7 +96,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, lightpushPeers) - let pxPeers = node.peerManager.peerStore.peers(WakuPeerExchangeCodec).mapIt( + let pxPeers = node.peerManager.wakuPeerStore.peers(WakuPeerExchangeCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuPeerExchangeCodec, @@ -104,7 +108,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = if not node.wakuSync.isNil(): # Map WakuSync peers to WakuPeers and add to return list - let syncPeers = node.peerManager.peerStore.peers(WakuSyncCodec).mapIt( + let syncPeers = node.peerManager.wakuPeerStore.peers(WakuSyncCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuSyncCodec, diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 07ad3bc4c7..9fff76d400 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -70,8 +70,31 @@ proc init*( addrs: seq[MultiAddress] = @[], enr: Option[enr.Record] = none(enr.Record), protocols: seq[string] = @[], + publicKey: crypto.PublicKey = crypto.PublicKey(), + agent: string = "", + protoVersion: string = "", + connectedness: Connectedness = NotConnected, + disconnectTime: int64 = 0, + origin: PeerOrigin = UnknownOrigin, + direction: PeerDirection = UnknownDirection, + lastFailedConn: Moment = Moment.init(0, Second), + numberFailedConn: int = 0, ): T = - RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols) + RemotePeerInfo( + peerId: peerId, + addrs: addrs, + enr: enr, + protocols: protocols, + publicKey: publicKey, + agent: agent, + protoVersion: protoVersion, + connectedness: connectedness, + disconnectTime: disconnectTime, + origin: origin, + direction: direction, + lastFailedConn: lastFailedConn, + numberFailedConn: numberFailedConn, + ) proc init*( T: typedesc[RemotePeerInfo], diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 695093fe57..147df70a5b 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -154,7 +154,7 @@ proc handleSubscribeRequest*( proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = trace "pushing message to subscribed peer", peer_id = shortLog(peer) - if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): + if not wf.peerManager.wakuPeerStore.hasPeer(peer, WakuFilterPushCodec): # Check that peer has not been removed from peer store error "no addresses for peer", peer_id = shortLog(peer) return @@ -207,7 +207,7 @@ proc maintainSubscriptions*(wf: WakuFilter) = ## Remove subscriptions for peers that have been removed from peer store var peersToRemove: seq[PeerId] for peerId in wf.subscriptions.peersSubscribed.keys: - if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec): + if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): debug "peer has been removed from peer store, removing subscription", peerId = peerId peersToRemove.add(peerId) diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 0374e12772..9462533780 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -203,8 +203,9 @@ proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = proc populateEnrCache(wpx: WakuPeerExchange) = # share only peers that i) are reachable ii) come from discv5 iii) share cluster - let withEnr = - wpx.peerManager.peerStore.getReachablePeers().filterIt(poolFilter(wpx.cluster, it)) + let withEnr = wpx.peerManager.wakuPeerStore.getReachablePeers().filterIt( + poolFilter(wpx.cluster, it) + ) # either what we have or max cache size var newEnrCache = newSeq[enr.Record](0)