From b11f23cca6dc7b3f438e543bb2c55eeb856e8b6c Mon Sep 17 00:00:00 2001 From: DarshanBPatel Date: Wed, 25 Sep 2024 20:58:39 +0530 Subject: [PATCH] chore: create basic abstraction of peerstore to wakupeerstore --- waku/node/peer_manager/peer_manager.nim | 6 +- waku/node/peer_manager/waku_peer_store.nim | 118 ++++++++++----------- 2 files changed, 60 insertions(+), 64 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 87506e7a4c..03d514d8d5 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -73,7 +73,7 @@ const type PeerManager* = ref object of RootObj switch*: Switch - peerStore*: PeerStore + peerStore*: WakuPeerStore wakuMetadata*: WakuMetadata initialBackoffInSec*: int backoffFactor*: int @@ -503,7 +503,7 @@ proc new*( let pm = PeerManager( switch: switch, wakuMetadata: wakuMetadata, - peerStore: switch.peerStore, + peerStore: WakuPeerStore(store: switch.peerStore), storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, @@ -828,7 +828,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = proc prunePeerStore*(pm: PeerManager) = let numPeers = pm.peerStore[AddressBook].book.len - let capacity = pm.peerStore.capacity + let capacity = pm.peerStore.getCapacity() if numPeers <= capacity: return diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 09d6ebc658..d9af0d6e6b 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -16,14 +16,16 @@ import export peerstore, builders type + WakuPeerStore* = ref object + store*: PeerStore # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] - # Last failed connection attemp timestamp + # Keeps track of the timestamp of the last failed connection attempt LastFailedConnBook* = ref object of PeerBook[Moment] - # Failed connection attempts + # Keeps track of the number of failed connection attempts NumberFailedConnBook* = ref object of PeerBook[int] # Keeps track of when peers were disconnected in Unix timestamps @@ -32,126 +34,120 @@ type # Keeps track of the origin of a peer SourceBook* = ref object of PeerBook[PeerOrigin] - # Direction + # Keeps track of the direction of a peer connection DirectionBook* = ref object of PeerBook[PeerDirection] - # ENR Book + # Keeps track of the ENR (Ethereum Node Record) of a peer ENRBook* = ref object of PeerBook[enr.Record] -################## -# Peer Store API # -################## +proc getCapacity*(wps: WakuPeerStore): int = + wps.store.capacity -proc delete*(peerStore: PeerStore, peerId: PeerId) = +proc `[]`*(wps: WakuPeerStore, T: typedesc): T = + wps.store[T] + +proc delete*(wps: WakuPeerStore, peerId: PeerId) = # Delete all the information of a given peer. - peerStore.del(peerId) + wps.store.del(peerId) -proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo = - ## Get the stored information of a given peer. +proc get*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo = RemotePeerInfo( peerId: peerId, - addrs: peerStore[AddressBook][peerId], + addrs: wps.store[AddressBook][peerId], enr: - if peerStore[ENRBook][peerId] != default(enr.Record): - some(peerStore[ENRBook][peerId]) + if wps.store[ENRBook].book.hasKey(peerId): + some(wps.store[ENRBook][peerId]) else: - none(enr.Record), - protocols: peerStore[ProtoBook][peerId], - agent: peerStore[AgentBook][peerId], - protoVersion: peerStore[ProtoVersionBook][peerId], - publicKey: peerStore[KeyBook][peerId], - - # Extended custom fields - connectedness: peerStore[ConnectionBook][peerId], - disconnectTime: peerStore[DisconnectBook][peerId], - origin: peerStore[SourceBook][peerId], - direction: peerStore[DirectionBook][peerId], - lastFailedConn: peerStore[LastFailedConnBook][peerId], - numberFailedConn: peerStore[NumberFailedConnBook][peerId], + none(enr.Record) + , + protocols: wps.store[ProtoBook][peerId], + agent: wps.store[AgentBook][peerId], + protoVersion: wps.store[ProtoVersionBook][peerId], + publicKey: wps.store[KeyBook][peerId], + connectedness: wps.store[ConnectionBook][peerId], + disconnectTime: wps.store[DisconnectBook][peerId], + origin: wps.store[SourceBook][peerId], + direction: wps.store[DirectionBook][peerId], + lastFailedConn: wps.store[LastFailedConnBook][peerId], + numberFailedConn: wps.store[NumberFailedConnBook][peerId], ) -proc getWakuProtos*(peerStore: PeerStore): seq[string] = - ## Get the waku protocols of all the stored peers. - let wakuProtocols = toSeq(peerStore[ProtoBook].book.values()) - .flatten() - .deduplicate() - .filterIt(it.startsWith("/vac/waku")) - return wakuProtocols +proc getWakuProtos*(wps: WakuPeerStore): seq[string] = + toSeq(wps.store[ProtoBook].book.values()).flatten().deduplicate().filterIt( + it.startsWith("/vac/waku") + ) # TODO: Rename peers() to getPeersByProtocol() -proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = - ## Get all the stored information of every peer. +proc peers*(wps: WakuPeerStore): seq[RemotePeerInfo] = let allKeys = concat( - toSeq(peerStore[AddressBook].book.keys()), - toSeq(peerStore[ProtoBook].book.keys()), - toSeq(peerStore[KeyBook].book.keys()), + toSeq(wps.store[AddressBook].book.keys()), + toSeq(wps.store[ProtoBook].book.keys()), + toSeq(wps.store[KeyBook].book.keys()), ) .toHashSet() - return allKeys.mapIt(peerStore.get(it)) + return allKeys.mapIt(wps.get(it)) -proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = - # Return the known info for all peers registered on the specified protocol - peerStore.peers.filterIt(it.protocols.contains(proto)) +proc peers*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] = + wps.peers().filterIt(it.protocols.contains(proto)) -proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = - # Return the known info for all peers matching the provided protocolMatcher - peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it))) +proc peers*(wps: WakuPeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = + wps.peers().filterIt(it.protocols.anyIt(protocolMatcher(it))) -proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness = - peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) +proc connectedness*(wps: WakuPeerStore, peerId: PeerId): Connectedness = + wps.store[ConnectionBook].book.getOrDefault(peerId, NotConnected) -proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool = +proc hasShard*(peerStore: WakuPeerStore, peerId: PeerID, cluster, shard: uint16): bool = peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) -proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool = +proc hasCapability*(peerStore: WakuPeerStore, peerId: PeerID, cap: Capabilities): bool = peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) -proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = +proc isConnected*(peerStore: WakuPeerStore, peerId: PeerID): bool = # Returns `true` if the peer is connected peerStore.connectedness(peerId) == Connected -proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = +proc hasPeer*(peerStore: WakuPeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol # TODO: What if peer does not exist in the peerStore? peerStore.get(peerId).protocols.contains(proto) -proc hasPeers*(peerStore: PeerStore, proto: string): bool = +proc hasPeers*(peerStore: WakuPeerStore, proto: string): bool = # Returns `true` if the peerstore has any peer for the specified protocol toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) -proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = +proc hasPeers*(peerStore: WakuPeerStore, protocolMatcher: Matcher): bool = # Returns `true` if the peerstore has any peer matching the protocolMatcher toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) proc getPeersByDirection*( - peerStore: PeerStore, direction: PeerDirection + peerStore: WakuPeerStore, direction: PeerDirection ): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.direction == direction) -proc getNotConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = +proc getNotConnectedPeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) -proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = +proc getConnectedPeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.connectedness == Connected) -proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = +proc getPeersByProtocol*(peerStore: WakuPeerStore, proto: string): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.protocols.contains(proto)) -proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] = +proc getReachablePeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] = return peerStore.peers.filterIt( it.connectedness == CanConnect or it.connectedness == Connected ) proc getPeersByShard*( - peerStore: PeerStore, cluster, shard: uint16 + peerStore: WakuPeerStore, cluster, shard: uint16 ): seq[RemotePeerInfo] = return peerStore.peers.filterIt( it.enr.isSome() and it.enr.get().containsShard(cluster, shard) ) proc getPeersByCapability*( - peerStore: PeerStore, cap: Capabilities + peerStore: WakuPeerStore, cap: Capabilities ): seq[RemotePeerInfo] = return peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))