Skip to content

Commit

Permalink
Revert "replace stale peer management with timestemp approch"
Browse files Browse the repository at this point in the history
This reverts commit 731fe6c.
  • Loading branch information
darshankabariya committed Sep 25, 2024
1 parent 0da0d94 commit f2efc36
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 31 deletions.
48 changes: 23 additions & 25 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ const
# TODO: Make configurable
DefaultDialTimeout* = chronos.seconds(10)

# Max attempts before removing the peer
MaxFailedAttempts = 5

# Time to wait before attempting to dial again is calculated as:
# initialBackoffInSec*(backoffFactor^(failedAttempts-1))
# 120s, 480s, 1920, 7680s
Expand All @@ -68,14 +71,13 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5

Threshold = chronos.hours(2)

type PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
storage*: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
Expand Down Expand Up @@ -182,8 +184,9 @@ proc connectRelay*(
if not pm.peerStore.hasPeer(peerId, WakuRelayCodec):
pm.addPeer(peer)

let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
trace "Connecting to relay peer",
wireAddr = peer.addrs, peerId = peerId
wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts

var deadline = sleepAsync(dialTimeout)
let workfut = pm.switch.connect(peerId, peer.addrs)
Expand All @@ -205,21 +208,20 @@ proc connectRelay*(
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])

if pm.peerStore[FirstFailedConnBook].contains(peerId):
discard pm.peerStore[FirstFailedConnBook].del(peerId)

pm.peerStore[NumberFailedConnBook][peerId] = 0

return true

# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] =
pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
pm.peerStore[FirstFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)

trace "Connecting relay peer failed",
peerId = peerId,
reason = reasonFailed

reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
waku_peers_dials.inc(labelValues = [reasonFailed])

return false
Expand Down Expand Up @@ -313,25 +315,22 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]

# if it never errored, we can try to connect
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
if failedAttempts == 0:
return true

# if it's break threshold then do not reconnect
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)

if (currentTime - disconnectTime) > Threshold:

# if there are too many failed attempts, do not reconnect
if failedAttempts >= pm.maxFailedAttempts:
return false

# If it errored we wait an exponential backoff from last connection
# the more failed attempts, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff =
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, 5)
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)

return now >= (lastFailed + backoff)

Expand Down Expand Up @@ -462,6 +461,7 @@ proc new*(
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
): PeerManager {.gcsafe.} =
Expand Down Expand Up @@ -493,7 +493,7 @@ proc new*(
maxRelayPeersValue = maxConnections - (maxConnections div 5)

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, 5)
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
Expand All @@ -510,6 +510,7 @@ proc new*(
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
)
Expand Down Expand Up @@ -836,11 +837,8 @@ proc prunePeerStore*(pm: PeerManager) =
var peersToPrune: HashSet[PeerId]

# prune failed connections
for peerId in pm.peerStore[FirstFailedConnBook].book.keys:
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)
if (currentTime - disconnectTime) < Threshold:
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
continue

if peersToPrune.len >= pruningCount:
Expand Down
6 changes: 3 additions & 3 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type
# Last failed connection attemp timestamp
LastFailedConnBook* = ref object of PeerBook[Moment]

# First failed connection attemp timestamp
FirstFailedConnBook* = ref object of PeerBook[Moment]
# Failed connection attempts
NumberFailedConnBook* = ref object of PeerBook[int]

# Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
Expand Down Expand Up @@ -67,7 +67,7 @@ proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo =
origin: peerStore[SourceBook][peerId],
direction: peerStore[DirectionBook][peerId],
lastFailedConn: peerStore[LastFailedConnBook][peerId],
firstFailedConn: peerStore[FirstFailedConnBook][peerId],
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
Expand Down
5 changes: 2 additions & 3 deletions waku/waku_core/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type RemotePeerInfo* = ref object
disconnectTime*: int64
origin*: PeerOrigin
direction*: PeerDirection
firstFailedConn*: Moment
lastFailedConn*: Moment

numberFailedConn*: int

func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId

Expand Down

0 comments on commit f2efc36

Please sign in to comment.