Skip to content

Commit

Permalink
chore: create basic abstraction of peerstore to wakupeerstore
Browse files Browse the repository at this point in the history
  • Loading branch information
darshankabariya committed Sep 25, 2024
1 parent f2efc36 commit b11f23c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 64 deletions.
6 changes: 3 additions & 3 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const

type PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
peerStore*: WakuPeerStore
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
Expand Down Expand Up @@ -503,7 +503,7 @@ proc new*(
let pm = PeerManager(
switch: switch,
wakuMetadata: wakuMetadata,
peerStore: switch.peerStore,
peerStore: WakuPeerStore(store: switch.peerStore),
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
Expand Down Expand Up @@ -828,7 +828,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =

proc prunePeerStore*(pm: PeerManager) =
let numPeers = pm.peerStore[AddressBook].book.len
let capacity = pm.peerStore.capacity
let capacity = pm.peerStore.getCapacity()
if numPeers <= capacity:
return

Expand Down
118 changes: 57 additions & 61 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ import
export peerstore, builders

type
WakuPeerStore* = ref object
store*: PeerStore

# Keeps track of the Connectedness state of a peer
ConnectionBook* = ref object of PeerBook[Connectedness]

# Last failed connection attemp timestamp
# Keeps track of the timestamp of the last failed connection attempt
LastFailedConnBook* = ref object of PeerBook[Moment]

# Failed connection attempts
# Keeps track of the number of failed connection attempts
NumberFailedConnBook* = ref object of PeerBook[int]

# Keeps track of when peers were disconnected in Unix timestamps
Expand All @@ -32,126 +34,120 @@ type
# Keeps track of the origin of a peer
SourceBook* = ref object of PeerBook[PeerOrigin]

# Direction
# Keeps track of the direction of a peer connection
DirectionBook* = ref object of PeerBook[PeerDirection]

# ENR Book
# Keeps track of the ENR (Ethereum Node Record) of a peer
ENRBook* = ref object of PeerBook[enr.Record]

##################
# Peer Store API #
##################
proc getCapacity*(wps: WakuPeerStore): int =
wps.store.capacity

proc delete*(peerStore: PeerStore, peerId: PeerId) =
proc `[]`*(wps: WakuPeerStore, T: typedesc): T =
wps.store[T]

proc delete*(wps: WakuPeerStore, peerId: PeerId) =
# Delete all the information of a given peer.
peerStore.del(peerId)
wps.store.del(peerId)

proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo =
## Get the stored information of a given peer.
proc get*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo =
RemotePeerInfo(
peerId: peerId,
addrs: peerStore[AddressBook][peerId],
addrs: wps.store[AddressBook][peerId],
enr:
if peerStore[ENRBook][peerId] != default(enr.Record):
some(peerStore[ENRBook][peerId])
if wps.store[ENRBook].book.hasKey(peerId):
some(wps.store[ENRBook][peerId])
else:
none(enr.Record),
protocols: peerStore[ProtoBook][peerId],
agent: peerStore[AgentBook][peerId],
protoVersion: peerStore[ProtoVersionBook][peerId],
publicKey: peerStore[KeyBook][peerId],

# Extended custom fields
connectedness: peerStore[ConnectionBook][peerId],
disconnectTime: peerStore[DisconnectBook][peerId],
origin: peerStore[SourceBook][peerId],
direction: peerStore[DirectionBook][peerId],
lastFailedConn: peerStore[LastFailedConnBook][peerId],
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
none(enr.Record)
,
protocols: wps.store[ProtoBook][peerId],
agent: wps.store[AgentBook][peerId],
protoVersion: wps.store[ProtoVersionBook][peerId],
publicKey: wps.store[KeyBook][peerId],
connectedness: wps.store[ConnectionBook][peerId],
disconnectTime: wps.store[DisconnectBook][peerId],
origin: wps.store[SourceBook][peerId],
direction: wps.store[DirectionBook][peerId],
lastFailedConn: wps.store[LastFailedConnBook][peerId],
numberFailedConn: wps.store[NumberFailedConnBook][peerId],
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
## Get the waku protocols of all the stored peers.
let wakuProtocols = toSeq(peerStore[ProtoBook].book.values())
.flatten()
.deduplicate()
.filterIt(it.startsWith("/vac/waku"))
return wakuProtocols
proc getWakuProtos*(wps: WakuPeerStore): seq[string] =
toSeq(wps.store[ProtoBook].book.values()).flatten().deduplicate().filterIt(
it.startsWith("/vac/waku")
)

# TODO: Rename peers() to getPeersByProtocol()
proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
## Get all the stored information of every peer.
proc peers*(wps: WakuPeerStore): seq[RemotePeerInfo] =
let allKeys = concat(
toSeq(peerStore[AddressBook].book.keys()),
toSeq(peerStore[ProtoBook].book.keys()),
toSeq(peerStore[KeyBook].book.keys()),
toSeq(wps.store[AddressBook].book.keys()),
toSeq(wps.store[ProtoBook].book.keys()),
toSeq(wps.store[KeyBook].book.keys()),
)
.toHashSet()

return allKeys.mapIt(peerStore.get(it))
return allKeys.mapIt(wps.get(it))

proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
# Return the known info for all peers registered on the specified protocol
peerStore.peers.filterIt(it.protocols.contains(proto))
proc peers*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] =
wps.peers().filterIt(it.protocols.contains(proto))

proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] =
# Return the known info for all peers matching the provided protocolMatcher
peerStore.peers.filterIt(it.protocols.anyIt(protocolMatcher(it)))
proc peers*(wps: WakuPeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] =
wps.peers().filterIt(it.protocols.anyIt(protocolMatcher(it)))

proc connectedness*(peerStore: PeerStore, peerId: PeerID): Connectedness =
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc connectedness*(wps: WakuPeerStore, peerId: PeerId): Connectedness =
wps.store[ConnectionBook].book.getOrDefault(peerId, NotConnected)

proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
proc hasShard*(peerStore: WakuPeerStore, peerId: PeerID, cluster, shard: uint16): bool =
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)

proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
proc hasCapability*(peerStore: WakuPeerStore, peerId: PeerID, cap: Capabilities): bool =
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)

proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool =
proc isConnected*(peerStore: WakuPeerStore, peerId: PeerID): bool =
# Returns `true` if the peer is connected
peerStore.connectedness(peerId) == Connected

proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool =
proc hasPeer*(peerStore: WakuPeerStore, peerId: PeerID, proto: string): bool =
# Returns `true` if peer is included in manager for the specified protocol
# TODO: What if peer does not exist in the peerStore?
peerStore.get(peerId).protocols.contains(proto)

proc hasPeers*(peerStore: PeerStore, proto: string): bool =
proc hasPeers*(peerStore: WakuPeerStore, proto: string): bool =
# Returns `true` if the peerstore has any peer for the specified protocol
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto))

proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool =
proc hasPeers*(peerStore: WakuPeerStore, protocolMatcher: Matcher): bool =
# Returns `true` if the peerstore has any peer matching the protocolMatcher
toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it)))

proc getPeersByDirection*(
peerStore: PeerStore, direction: PeerDirection
peerStore: WakuPeerStore, direction: PeerDirection
): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.direction == direction)

proc getNotConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
proc getNotConnectedPeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness != Connected)

proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
proc getConnectedPeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.connectedness == Connected)

proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
proc getPeersByProtocol*(peerStore: WakuPeerStore, proto: string): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(it.protocols.contains(proto))

proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] =
proc getReachablePeers*(peerStore: WakuPeerStore): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(
it.connectedness == CanConnect or it.connectedness == Connected
)

proc getPeersByShard*(
peerStore: PeerStore, cluster, shard: uint16
peerStore: WakuPeerStore, cluster, shard: uint16
): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(
it.enr.isSome() and it.enr.get().containsShard(cluster, shard)
)

proc getPeersByCapability*(
peerStore: PeerStore, cap: Capabilities
peerStore: WakuPeerStore, cap: Capabilities
): seq[RemotePeerInfo] =
return
peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap))

0 comments on commit b11f23c

Please sign in to comment.