diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index f51cfa31ac..9c55fcc703 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -88,6 +88,17 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag +#~~~~~~~~~~~~~~~~~~~# +# Helper Functions # +#~~~~~~~~~~~~~~~~~~~# + +proc calculateBackoff( + initialBackoffInSec: int, backoffFactor: int, failedAttempts: int +): timer.Duration = + if failedAttempts == 0: + return chronos.seconds(0) + return chronos.seconds(initialBackoffInSec * (backoffFactor ^ (failedAttempts - 1))) + proc protocolMatcher*(codec: string): Matcher = ## Returns a protocol matcher function for the provided codec proc match(proto: string): bool {.gcsafe.} = @@ -98,16 +109,9 @@ proc protocolMatcher*(codec: string): Matcher = return match -proc calculateBackoff( - initialBackoffInSec: int, backoffFactor: int, failedAttempts: int -): timer.Duration = - if failedAttempts == 0: - return chronos.seconds(0) - return chronos.seconds(initialBackoffInSec * (backoffFactor ^ (failedAttempts - 1))) - -#################### -# Helper functions # -#################### +#~~~~~~~~~~~~~~~~~~~~~~~~~~# +# Peer Storage Management # +#~~~~~~~~~~~~~~~~~~~~~~~~~~# proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) {.gcsafe.} = ## Insert peer entry into persistent storage, or replace existing entry with updated info @@ -167,6 +171,109 @@ proc addPeer*( pm.storage.insertOrReplace(remotePeerInfo) +proc loadFromStorage(pm: PeerManager) {.gcsafe.} = + ## Load peers from storage, if available + + trace "loading peers from storage" + + var amount = 0 + + proc onData(remotePeerInfo: RemotePeerInfo) = + let peerId = remotePeerInfo.peerId + + if pm.switch.peerInfo.peerId == peerId: + # Do not manage self + return + + trace "loading peer", + peerId = peerId, + address = remotePeerInfo.addrs, + protocols = remotePeerInfo.protocols, + agent = remotePeerInfo.agent, + version = remotePeerInfo.protoVersion + + # nim-libp2p books + pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs + pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols + pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey + pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent + pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion + + # custom books + pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state + pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime + pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin + + if remotePeerInfo.enr.isSome(): + pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get() + + amount.inc() + + pm.storage.getAll(onData).isOkOr: + warn "loading peers from storage failed", err = error + waku_peers_errors.inc(labelValues = ["storage_load_failure"]) + return + + trace "recovered peers from storage", amount = amount + +proc selectPeer*( + pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) +): Option[RemotePeerInfo] = + trace "Selecting peer from peerstore", protocol = proto + + # Selects the best peer for a given protocol + var peers = pm.wakuPeerStore.getPeersByProtocol(proto) + + if shard.isSome(): + peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get()))) + + # No criteria for selecting a peer for WakuRelay, random one + if proto == WakuRelayCodec: + # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned + if peers.len > 0: + trace "Got peer from peerstore", + peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto + return some(peers[0]) + trace "No peer found for protocol", protocol = proto + return none(RemotePeerInfo) + + # For other protocols, we select the peer that is slotted for the given protocol + pm.serviceSlots.withValue(proto, serviceSlot): + trace "Got peer from service slots", + peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto + return some(serviceSlot[]) + + # If not slotted, we select a random peer for the given protocol + if peers.len > 0: + trace "Got peer from peerstore", + peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto + return some(peers[0]) + trace "No peer found for protocol", protocol = proto + return none(RemotePeerInfo) + +# Adds a peer to the service slots, which is a list of peers that are slotted for a given protocol +proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = + # Do not add relay peers + if proto == WakuRelayCodec: + warn "Can't add relay peer to service peers slots" + return + + info "Adding peer to service slots", + peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto + waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]]) + + # Set peer for service slot + pm.serviceSlots[proto] = remotePeerInfo + + pm.addPeer(remotePeerInfo) + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +# Connection Lifecycle Management # +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# + +# require pre-connection +proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} + # Connects to a given node. Note that this function uses `connect` and # does not provide a protocol. Streams for relay (gossipsub) are created # automatically without the needing to dial. @@ -227,6 +334,53 @@ proc connectRelay*( return false +proc connectToNodes*( + pm: PeerManager, + nodes: seq[string] | seq[RemotePeerInfo], + dialTimeout = DefaultDialTimeout, + source = "api", +) {.async.} = + if nodes.len == 0: + return + + info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes + + var futConns: seq[Future[bool]] + var connectedPeers: seq[RemotePeerInfo] + for node in nodes: + let node = parsePeerInfo(node) + if node.isOk(): + futConns.add(pm.connectRelay(node.value)) + connectedPeers.add(node.value) + else: + error "Couldn't parse node info", error = node.error + + await allFutures(futConns) + + # Filtering successful connectedPeers based on futConns + let combined = zip(connectedPeers, futConns) + connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0]) + + when defined(debugDiscv5): + let peerIds = connectedPeers.mapIt(it.peerId) + let origin = connectedPeers.mapIt(it.origin) + if peerIds.len > 0: + notice "established connections with found peers", + peerIds = peerIds.mapIt(shortLog(it)), origin = origin + else: + notice "could not connect to new peers", attempted = nodes.len + + info "Finished dialing multiple peers", + successfulConns = connectedPeers.len, attempted = nodes.len + + # The issue seems to be around peers not being fully connected when + # trying to subscribe. So what we do is sleep to guarantee nodes are + # fully connected. + # + # This issue was known to Dmitiry on nim-libp2p and may be resolvable + # later. + await sleepAsync(chronos.seconds(5)) + proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} = let peerId = peer.peerId await pm.switch.disconnect(peerId) @@ -267,50 +421,39 @@ proc dialPeer( return none(Connection) -proc loadFromStorage(pm: PeerManager) {.gcsafe.} = - ## Load peers from storage, if available - - trace "loading peers from storage" - - var amount = 0 - - proc onData(remotePeerInfo: RemotePeerInfo) = - let peerId = remotePeerInfo.peerId - - if pm.switch.peerInfo.peerId == peerId: - # Do not manage self - return - - trace "loading peer", - peerId = peerId, - address = remotePeerInfo.addrs, - protocols = remotePeerInfo.protocols, - agent = remotePeerInfo.agent, - version = remotePeerInfo.protoVersion - - # nim-libp2p books - pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs - pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols - pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey - pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent - pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion - - # custom books - pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state - pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime - pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin +proc dialPeer*( + pm: PeerManager, + remotePeerInfo: RemotePeerInfo, + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api", +): Future[Option[Connection]] {.async.} = + # Dial a given peer and add it to the list of known peers + # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - if remotePeerInfo.enr.isSome(): - pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get() + # First add dialed peer info to peer store, if it does not exist yet.. + # TODO: nim libp2p peerstore already adds them + if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto): + trace "Adding newly dialed peer to manager", + peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto + pm.addPeer(remotePeerInfo) - amount.inc() + return await pm.dialPeer( + remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout, source + ) - pm.storage.getAll(onData).isOkOr: - warn "loading peers from storage failed", err = error - waku_peers_errors.inc(labelValues = ["storage_load_failure"]) - return +proc dialPeer*( + pm: PeerManager, + peerId: PeerID, + proto: string, + dialTimeout = DefaultDialTimeout, + source = "api", +): Future[Option[Connection]] {.async.} = + # Dial an existing peer by looking up it's existing addrs in the switch's peerStore + # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - trace "recovered peers from storage", amount = amount + let addrs = pm.switch.peerStore[AddressBook][peerId] + return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source) proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = # Returns if we can try to connect to this peer, based on past failed attempts @@ -335,33 +478,112 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = return now >= (lastFailed + backoff) -################## -# Initialisation # -################## +proc connectedPeers*( + pm: PeerManager, protocol: string = "" +): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## If a protocol is specified, only returns peers with at least one stream of that protocol -proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = - if not pm.switch.connManager.getConnections().hasKey(peerId): - return none(string) + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] - let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) - if conns.len == 0: - return none(string) + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + let streams = peerConn.getStreams() + if protocol.len == 0 or streams.anyIt(it.protocol == protocol): + if peerConn.connection.transportDir == Direction.In: + inPeers.add(peerId) + elif peerConn.connection.transportDir == Direction.Out: + outPeers.add(peerId) - let obAddr = conns[0].connection.observedAddr.valueOr: - return none(string) + return (inPeers, outPeers) - # TODO: think if circuit relay ips should be handled differently +proc connectToRelayPeers*(pm: PeerManager) {.async.} = + var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + let totalRelayPeers = inRelayPeers.len + outRelayPeers.len + + if inRelayPeers.len > pm.inRelayPeersTarget: + await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget) + + if outRelayPeers.len >= pm.outRelayPeersTarget: + return + + let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers() + + var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) + + shuffle(outsideBackoffPeers) + + var index = 0 + var numPendingConnReqs = + min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len) + ## number of outstanding connection requests + + while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget: + let numPeersToConnect = min(numPendingConnReqs, MaxParallelDials) + await pm.connectToNodes(outsideBackoffPeers[index ..< (index + numPeersToConnect)]) + + (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + + index += numPeersToConnect + numPendingConnReqs -= numPeersToConnect + +proc reconnectPeers*( + pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0) +) {.async.} = + ## Reconnect to peers registered for this protocol. This will update connectedness. + ## Especially useful to resume connections from persistent storage after a restart. + + debug "Reconnecting peers", proto = proto + + # Proto is not persisted, we need to iterate over all peers. + for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)): + # Check that the peer can be connected + if peerInfo.connectedness == CannotConnect: + error "Not reconnecting to unreachable or non-existing peer", + peerId = peerInfo.peerId + continue + + if backoffTime > ZeroDuration: + debug "Backing off before reconnect", + peerId = peerInfo.peerId, backoffTime = backoffTime + # We disconnected recently and still need to wait for a backoff period before connecting + await sleepAsync(backoffTime) + + await pm.connectToNodes(@[peerInfo]) + +proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = + var + numStreamsIn = 0 + numStreamsOut = 0 + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + for stream in peerConn.getStreams(): + if stream.protocol == protocol: + if stream.dir == Direction.In: + numStreamsIn += 1 + elif stream.dir == Direction.Out: + numStreamsOut += 1 + return (numStreamsIn, numStreamsOut) + +proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = + if not pm.switch.connManager.getConnections().hasKey(peerId): + return none(string) + + let conns = pm.switch.connManager.getConnections().getOrDefault(peerId) + if conns.len == 0: + return none(string) + + let obAddr = conns[0].connection.observedAddr.valueOr: + return none(string) + + # TODO: think if circuit relay ips should be handled differently return some(obAddr.getHostname()) -# called when a connection i) is created or ii) is closed -proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = - case event.kind - of ConnEventKind.Connected: - #let direction = if event.incoming: Inbound else: Outbound - discard - of ConnEventKind.Disconnected: - discard +#~~~~~~~~~~~~~~~~~# +# Event Handling # +#~~~~~~~~~~~~~~~~~# proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = let res = catch: @@ -404,25 +626,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) pm.wakuPeerStore.delete(peerId) -proc connectedPeers*( - pm: PeerManager, protocol: string = "" -): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## If a protocol is specified, only returns peers with at least one stream of that protocol - - var inPeers: seq[PeerId] - var outPeers: seq[PeerId] - - for peerId, muxers in pm.switch.connManager.getConnections(): - for peerConn in muxers: - let streams = peerConn.getStreams() - if protocol.len == 0 or streams.anyIt(it.protocol == protocol): - if peerConn.connection.transportDir == Direction.In: - inPeers.add(peerId) - elif peerConn.connection.transportDir == Direction.Out: - outPeers.add(peerId) - - return (inPeers, outPeers) +# called when a connection i) is created or ii) is closed +proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = + case event.kind + of ConnEventKind.Connected: + #let direction = if event.incoming: Inbound else: Outbound + discard + of ConnEventKind.Disconnected: + discard # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = @@ -447,318 +658,86 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = inRelayPeersTarget = pm.inRelayPeersTarget await pm.switch.disconnect(peerId) - ## Apply max ip colocation limit - if (let ip = pm.getPeerIp(peerId); ip.isSome()): - pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - - # in theory this should always be one, but just in case - let peersBehindIp = pm.ipTable[ip.get] - - # pm.colocationLimit == 0 disables the ip colocation limit - if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit: - for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: - debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip - asyncSpawn(pm.switch.disconnect(peerId)) - pm.wakuPeerStore.delete(peerId) - of Left: - direction = UnknownDirection - connectedness = CanConnect - - # note we cant access the peerId ip here as the connection was already closed - for ip, peerIds in pm.ipTable.pairs: - if peerIds.contains(peerId): - pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId) - if pm.ipTable[ip].len == 0: - pm.ipTable.del(ip) - break - of Identified: - debug "event identified", peerId = peerId - - pm.wakuPeerStore[ConnectionBook][peerId] = connectedness - pm.wakuPeerStore[DirectionBook][peerId] = direction - - if not pm.storage.isNil: - var remotePeerInfo = pm.wakuPeerStore.getPeer(peerId) - - if event.kind == PeerEventKind.Left: - remotePeerInfo.disconnectTime = getTime().toUnix - - pm.storage.insertOrReplace(remotePeerInfo) - -proc new*( - T: type PeerManager, - switch: Switch, - wakuMetadata: WakuMetadata = nil, - maxRelayPeers: Option[int] = none(int), - storage: PeerStorage = nil, - initialBackoffInSec = InitialBackoffInSec, - backoffFactor = BackoffFactor, - maxFailedAttempts = MaxFailedAttempts, - colocationLimit = DefaultColocationLimit, - shardedPeerManagement = false, -): PeerManager {.gcsafe.} = - let capacity = switch.peerStore.capacity - let maxConnections = switch.connManager.inSema.size - if maxConnections > capacity: - error "Max number of connections can't be greater than PeerManager capacity", - capacity = capacity, maxConnections = maxConnections - raise newException( - Defect, "Max number of connections can't be greater than PeerManager capacity" - ) - - var maxRelayPeersValue = 0 - if maxRelayPeers.isSome(): - if maxRelayPeers.get() > maxConnections: - error "Max number of relay peers can't be greater than the max amount of connections", - maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() - raise newException( - Defect, - "Max number of relay peers can't be greater than the max amount of connections", - ) - - if maxRelayPeers.get() == maxConnections: - warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers", - maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() - maxRelayPeersValue = maxRelayPeers.get() - else: - # Leave by default 20% of connections for service peers - maxRelayPeersValue = maxConnections - (maxConnections div 5) - - # attempt to calculate max backoff to prevent potential overflows or unreasonably high values - let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts) - if backoff.weeks() > 1: - error "Max backoff time can't be over 1 week", maxBackoff = backoff - raise newException(Defect, "Max backoff time can't be over 1 week") - - let outRelayPeersTarget = maxRelayPeersValue div 3 - - let pm = PeerManager( - switch: switch, - wakuMetadata: wakuMetadata, - wakuPeerStore: createWakuPeerStore(switch.peerStore), - storage: storage, - initialBackoffInSec: initialBackoffInSec, - backoffFactor: backoffFactor, - outRelayPeersTarget: outRelayPeersTarget, - inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget, - maxRelayPeers: maxRelayPeersValue, - maxFailedAttempts: maxFailedAttempts, - colocationLimit: colocationLimit, - shardedPeerManagement: shardedPeerManagement, - ) - - proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = - onConnEvent(pm, peerId, event) - - proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = - onPeerEvent(pm, peerId, event) - - proc peerStoreChanged(peerId: PeerId) {.gcsafe.} = - waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64) - - # currently disabled - #pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected) - #pm.switch.addConnEventHandler(connHook, ConnEventKind.Disconnected) - - pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined) - pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left) - - # called every time the peerstore is updated - pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged) - - pm.serviceSlots = initTable[string, RemotePeerInfo]() - pm.ipTable = initTable[string, seq[PeerId]]() - - if not storage.isNil(): - trace "found persistent peer storage" - pm.loadFromStorage() # Load previously managed peers. - else: - trace "no peer storage found" - - return pm - -##################### -# Manager interface # -##################### - -proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string) = - # Do not add relay peers - if proto == WakuRelayCodec: - warn "Can't add relay peer to service peers slots" - return - - info "Adding peer to service slots", - peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], service = proto - waku_service_peers.set(1, labelValues = [$proto, $remotePeerInfo.addrs[0]]) - - # Set peer for service slot - pm.serviceSlots[proto] = remotePeerInfo - - pm.addPeer(remotePeerInfo) - -#################### -# Dialer interface # -#################### - -proc dialPeer*( - pm: PeerManager, - remotePeerInfo: RemotePeerInfo, - proto: string, - dialTimeout = DefaultDialTimeout, - source = "api", -): Future[Option[Connection]] {.async.} = - # Dial a given peer and add it to the list of known peers - # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - - # First add dialed peer info to peer store, if it does not exist yet.. - # TODO: nim libp2p peerstore already adds them - if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto): - trace "Adding newly dialed peer to manager", - peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto - pm.addPeer(remotePeerInfo) - - return await pm.dialPeer( - remotePeerInfo.peerId, remotePeerInfo.addrs, proto, dialTimeout, source - ) - -proc dialPeer*( - pm: PeerManager, - peerId: PeerID, - proto: string, - dialTimeout = DefaultDialTimeout, - source = "api", -): Future[Option[Connection]] {.async.} = - # Dial an existing peer by looking up it's existing addrs in the switch's peerStore - # TODO: check peer validity and score before continuing. Limit number of peers to be managed. - - let addrs = pm.switch.peerStore[AddressBook][peerId] - return await pm.dialPeer(peerId, addrs, proto, dialTimeout, source) - -proc connectToNodes*( - pm: PeerManager, - nodes: seq[string] | seq[RemotePeerInfo], - dialTimeout = DefaultDialTimeout, - source = "api", -) {.async.} = - if nodes.len == 0: - return - - info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes - - var futConns: seq[Future[bool]] - var connectedPeers: seq[RemotePeerInfo] - for node in nodes: - let node = parsePeerInfo(node) - if node.isOk(): - futConns.add(pm.connectRelay(node.value)) - connectedPeers.add(node.value) - else: - error "Couldn't parse node info", error = node.error - - await allFutures(futConns) - - # Filtering successful connectedPeers based on futConns - let combined = zip(connectedPeers, futConns) - connectedPeers = combined.filterIt(it[1].read() == true).mapIt(it[0]) - - when defined(debugDiscv5): - let peerIds = connectedPeers.mapIt(it.peerId) - let origin = connectedPeers.mapIt(it.origin) - if peerIds.len > 0: - notice "established connections with found peers", - peerIds = peerIds.mapIt(shortLog(it)), origin = origin - else: - notice "could not connect to new peers", attempted = nodes.len - - info "Finished dialing multiple peers", - successfulConns = connectedPeers.len, attempted = nodes.len - - # The issue seems to be around peers not being fully connected when - # trying to subscribe. So what we do is sleep to guarantee nodes are - # fully connected. - # - # This issue was known to Dmitiry on nim-libp2p and may be resolvable - # later. - await sleepAsync(chronos.seconds(5)) - -proc reconnectPeers*( - pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0) -) {.async.} = - ## Reconnect to peers registered for this protocol. This will update connectedness. - ## Especially useful to resume connections from persistent storage after a restart. - - debug "Reconnecting peers", proto = proto - - # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)): - # Check that the peer can be connected - if peerInfo.connectedness == CannotConnect: - error "Not reconnecting to unreachable or non-existing peer", - peerId = peerInfo.peerId - continue - - if backoffTime > ZeroDuration: - debug "Backing off before reconnect", - peerId = peerInfo.peerId, backoffTime = backoffTime - # We disconnected recently and still need to wait for a backoff period before connecting - await sleepAsync(backoffTime) - - await pm.connectToNodes(@[peerInfo]) - -proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = - var - numStreamsIn = 0 - numStreamsOut = 0 - for peerId, muxers in pm.switch.connManager.getConnections(): - for peerConn in muxers: - for stream in peerConn.getStreams(): - if stream.protocol == protocol: - if stream.dir == Direction.In: - numStreamsIn += 1 - elif stream.dir == Direction.Out: - numStreamsOut += 1 - return (numStreamsIn, numStreamsOut) - -proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = - if amount <= 0: - return - - let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec) - let connsToPrune = min(amount, inRelayPeers.len) + ## Apply max ip colocation limit + if (let ip = pm.getPeerIp(peerId); ip.isSome()): + pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) - for p in inRelayPeers[0 ..< connsToPrune]: - trace "Pruning Peer", Peer = $p - asyncSpawn(pm.switch.disconnect(p)) + # in theory this should always be one, but just in case + let peersBehindIp = pm.ipTable[ip.get] -proc connectToRelayPeers*(pm: PeerManager) {.async.} = - var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let totalRelayPeers = inRelayPeers.len + outRelayPeers.len + # pm.colocationLimit == 0 disables the ip colocation limit + if pm.colocationLimit != 0 and peersBehindIp.len > pm.colocationLimit: + for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: + debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip + asyncSpawn(pm.switch.disconnect(peerId)) + pm.wakuPeerStore.delete(peerId) + of Left: + direction = UnknownDirection + connectedness = CanConnect - if inRelayPeers.len > pm.inRelayPeersTarget: - await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget) + # note we cant access the peerId ip here as the connection was already closed + for ip, peerIds in pm.ipTable.pairs: + if peerIds.contains(peerId): + pm.ipTable[ip] = pm.ipTable[ip].filterIt(it != peerId) + if pm.ipTable[ip].len == 0: + pm.ipTable.del(ip) + break + of Identified: + debug "event identified", peerId = peerId - if outRelayPeers.len >= pm.outRelayPeersTarget: - return + pm.wakuPeerStore[ConnectionBook][peerId] = connectedness + pm.wakuPeerStore[DirectionBook][peerId] = direction - let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers() + if not pm.storage.isNil: + var remotePeerInfo = pm.wakuPeerStore.getPeer(peerId) - var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) + if event.kind == PeerEventKind.Left: + remotePeerInfo.disconnectTime = getTime().toUnix - shuffle(outsideBackoffPeers) + pm.storage.insertOrReplace(remotePeerInfo) - var index = 0 - var numPendingConnReqs = - min(outsideBackoffPeers.len, pm.outRelayPeersTarget - outRelayPeers.len) - ## number of outstanding connection requests +#~~~~~~~~~~~~~~~~~# +# Metrics Logging # +#~~~~~~~~~~~~~~~~~# - while numPendingConnReqs > 0 and outRelayPeers.len < pm.outRelayPeersTarget: - let numPeersToConnect = min(numPendingConnReqs, MaxParallelDials) - await pm.connectToNodes(outsideBackoffPeers[index ..< (index + numPeersToConnect)]) +proc logAndMetrics(pm: PeerManager) {.async.} = + heartbeat "Scheduling log and metrics run", LogAndMetricsInterval: + # log metrics + let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + let maxConnections = pm.switch.connManager.inSema.size + let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt( + RemotePeerInfo.init(it.peerId, it.addrs) + ) + let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) + let totalConnections = pm.switch.connManager.getConnections().len - (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) + info "Relay peer connections", + inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget, + outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget, + totalConnections = $totalConnections & "/" & $maxConnections, + notConnectedPeers = notConnectedPeers.len, + outsideBackoffPeers = outsideBackoffPeers.len - index += numPeersToConnect - numPendingConnReqs -= numPeersToConnect + # update prometheus metrics + for proto in pm.wakuPeerStore.getWakuProtos(): + let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto) + let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) + waku_connected_peers.set( + protoConnsIn.len.float64, labelValues = [$Direction.In, proto] + ) + waku_connected_peers.set( + protoConnsOut.len.float64, labelValues = [$Direction.Out, proto] + ) + waku_streams_peers.set( + protoStreamsIn.float64, labelValues = [$Direction.In, proto] + ) + waku_streams_peers.set( + protoStreamsOut.float64, labelValues = [$Direction.Out, proto] + ) + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +# Pruning and Maintenance (Stale Peers Management) # +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# proc manageRelayPeers*(pm: PeerManager) {.async.} = if pm.wakuMetadata.shards.len == 0: @@ -915,41 +894,6 @@ proc prunePeerStore*(pm: PeerManager) = capacity = capacity, pruned = peersToPrune.len -proc selectPeer*( - pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) -): Option[RemotePeerInfo] = - trace "Selecting peer from peerstore", protocol = proto - - # Selects the best peer for a given protocol - var peers = pm.wakuPeerStore.getPeersByProtocol(proto) - - if shard.isSome(): - peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get()))) - - # No criteria for selecting a peer for WakuRelay, random one - if proto == WakuRelayCodec: - # TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned - if peers.len > 0: - trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) - trace "No peer found for protocol", protocol = proto - return none(RemotePeerInfo) - - # For other protocols, we select the peer that is slotted for the given protocol - pm.serviceSlots.withValue(proto, serviceSlot): - trace "Got peer from service slots", - peerId = serviceSlot[].peerId, multi = serviceSlot[].addrs[0], protocol = proto - return some(serviceSlot[]) - - # If not slotted, we select a random peer for the given protocol - if peers.len > 0: - trace "Got peer from peerstore", - peerId = peers[0].peerId, multi = peers[0].addrs[0], protocol = proto - return some(peers[0]) - trace "No peer found for protocol", protocol = proto - return none(RemotePeerInfo) - # Prunes peers from peerstore to remove old/stale ones proc prunePeerStoreLoop(pm: PeerManager) {.async.} = trace "Starting prune peerstore loop" @@ -981,40 +925,20 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = # Shorten the connectivity loop interval dynamically based on percentage of peers to fill or connections to prune await sleepAsync(dynamicSleepInterval) -proc logAndMetrics(pm: PeerManager) {.async.} = - heartbeat "Scheduling log and metrics run", LogAndMetricsInterval: - # log metrics - let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size - let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt( - RemotePeerInfo.init(it.peerId, it.addrs) - ) - let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) - let totalConnections = pm.switch.connManager.getConnections().len +proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = + if amount <= 0: + return - info "Relay peer connections", - inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget, - outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget, - totalConnections = $totalConnections & "/" & $maxConnections, - notConnectedPeers = notConnectedPeers.len, - outsideBackoffPeers = outsideBackoffPeers.len + let (inRelayPeers, _) = pm.connectedPeers(WakuRelayCodec) + let connsToPrune = min(amount, inRelayPeers.len) - # update prometheus metrics - for proto in pm.wakuPeerStore.getWakuProtos(): - let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto) - let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) - waku_connected_peers.set( - protoConnsIn.len.float64, labelValues = [$Direction.In, proto] - ) - waku_connected_peers.set( - protoConnsOut.len.float64, labelValues = [$Direction.Out, proto] - ) - waku_streams_peers.set( - protoStreamsIn.float64, labelValues = [$Direction.In, proto] - ) - waku_streams_peers.set( - protoStreamsOut.float64, labelValues = [$Direction.Out, proto] - ) + for p in inRelayPeers[0 ..< connsToPrune]: + trace "Pruning Peer", Peer = $p + asyncSpawn(pm.switch.disconnect(p)) + +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# +# Initialization and Constructor # +#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# proc start*(pm: PeerManager) = pm.started = true @@ -1024,3 +948,95 @@ proc start*(pm: PeerManager) = proc stop*(pm: PeerManager) = pm.started = false + +proc new*( + T: type PeerManager, + switch: Switch, + wakuMetadata: WakuMetadata = nil, + maxRelayPeers: Option[int] = none(int), + storage: PeerStorage = nil, + initialBackoffInSec = InitialBackoffInSec, + backoffFactor = BackoffFactor, + maxFailedAttempts = MaxFailedAttempts, + colocationLimit = DefaultColocationLimit, + shardedPeerManagement = false, +): PeerManager {.gcsafe.} = + let capacity = switch.peerStore.capacity + let maxConnections = switch.connManager.inSema.size + if maxConnections > capacity: + error "Max number of connections can't be greater than PeerManager capacity", + capacity = capacity, maxConnections = maxConnections + raise newException( + Defect, "Max number of connections can't be greater than PeerManager capacity" + ) + + var maxRelayPeersValue = 0 + if maxRelayPeers.isSome(): + if maxRelayPeers.get() > maxConnections: + error "Max number of relay peers can't be greater than the max amount of connections", + maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() + raise newException( + Defect, + "Max number of relay peers can't be greater than the max amount of connections", + ) + + if maxRelayPeers.get() == maxConnections: + warn "Max number of relay peers is equal to max amount of connections, peer won't be contributing to service peers", + maxConnections = maxConnections, maxRelayPeers = maxRelayPeers.get() + maxRelayPeersValue = maxRelayPeers.get() + else: + # Leave by default 20% of connections for service peers + maxRelayPeersValue = maxConnections - (maxConnections div 5) + + # attempt to calculate max backoff to prevent potential overflows or unreasonably high values + let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts) + if backoff.weeks() > 1: + error "Max backoff time can't be over 1 week", maxBackoff = backoff + raise newException(Defect, "Max backoff time can't be over 1 week") + + let outRelayPeersTarget = maxRelayPeersValue div 3 + + let pm = PeerManager( + switch: switch, + wakuMetadata: wakuMetadata, + wakuPeerStore: createWakuPeerStore(switch.peerStore), + storage: storage, + initialBackoffInSec: initialBackoffInSec, + backoffFactor: backoffFactor, + outRelayPeersTarget: outRelayPeersTarget, + inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget, + maxRelayPeers: maxRelayPeersValue, + maxFailedAttempts: maxFailedAttempts, + colocationLimit: colocationLimit, + shardedPeerManagement: shardedPeerManagement, + ) + + proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = + onConnEvent(pm, peerId, event) + + proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = + onPeerEvent(pm, peerId, event) + + proc peerStoreChanged(peerId: PeerId) {.gcsafe.} = + waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64) + + # currently disabled + #pm.switch.addConnEventHandler(connHook, ConnEventKind.Connected) + #pm.switch.addConnEventHandler(connHook, ConnEventKind.Disconnected) + + pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined) + pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left) + + # called every time the peerstore is updated + pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged) + + pm.serviceSlots = initTable[string, RemotePeerInfo]() + pm.ipTable = initTable[string, seq[PeerId]]() + + if not storage.isNil(): + trace "found persistent peer storage" + pm.loadFromStorage() # Load previously managed peers. + else: + trace "no peer storage found" + + return pm