Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: public rendezvous procedure #1203

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions libp2p/protocols/rendezvous.nim
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ type
salt: string
defaultDT: Moment
registerDeletionLoop: Future[void]
#registerEvent: AsyncEvent # TODO: to raise during the heartbeat
# + make the heartbeat sleep duration "smarter"
diegomrsantos marked this conversation as resolved.
Show resolved Hide resolved
sema: AsyncSemaphore
peers: seq[PeerId]
cookiesSaved: Table[PeerId, Table[string, seq[byte]]]
Expand Down Expand Up @@ -402,7 +400,6 @@ proc save(
)
)
rdv.namespaces[nsSalted].add(rdv.registered.high)
# rdv.registerEvent.fire()
except KeyError:
doAssert false, "Should have key"

Expand Down Expand Up @@ -509,8 +506,13 @@ proc advertisePeer(rdv: RendezVous, peer: PeerId, msg: seq[byte]) {.async.} =
discard await advertiseWrap().withTimeout(5.seconds)

proc advertise*(
rdv: RendezVous, ns: string, ttl: Duration, peers: seq[PeerId]
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration, peers: seq[PeerId]
) {.async.} =
## The advertise async procedure sends a registration for a namespace
## to a sequence of peers. It encodes and sends a signed peer record
## along with a time-to-live value. The registrations are sent
## concurrently to all specified peers.
##
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")

Expand All @@ -533,12 +535,15 @@ proc advertise*(

await allFutures(futs)

method advertise*(
proc advertise*(
rdv: RendezVous, ns: string, ttl: Duration = rdv.minDuration
) {.async, base.} =
) {.async.} =
await rdv.advertise(ns, ttl, rdv.peers)

proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
## This procedure returns all the peers already registered on the
## given namespace.
##
let
nsSalted = ns & rdv.salt
n = Moment.now()
Expand All @@ -553,18 +558,21 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
@[]

proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
rdv: RendezVous, ns: string, peerLimit: uint64 = DiscoverLimit, peers: seq[PeerId]
): Future[seq[PeerRecord]] {.async.} =
## This async procedure discovers and returns peers for a given namespace
## by sending requests and processing responses. It limits the number of
## peer records retrieved based on the provided limit.
##
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
limit = peerLimit
d = Discover(ns: ns)

if l <= 0 or l > DiscoverLimit.int:
if limit > DiscoverLimit:
raise newException(RendezVousError, "Invalid limit")
if ns.len notin 0 .. 255:
raise newException(RendezVousError, "Invalid namespace")
limit = l.uint64
proc requestPeer(peer: PeerId) {.async.} =
let conn = await rdv.switch.dial(peer, RendezVousCodec)
defer:
Expand Down Expand Up @@ -632,9 +640,9 @@ proc request*(
return toSeq(s.values()).mapIt(it[0])

proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
rdv: RendezVous, ns: string, limit: uint64 = DiscoverLimit
): Future[seq[PeerRecord]] {.async.} =
await rdv.request(ns, l, rdv.peers)
await rdv.request(ns, limit, rdv.peers)

proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
let nsSalted = ns & rdv.salt
Expand All @@ -646,6 +654,10 @@ proc unsubscribeLocally*(rdv: RendezVous, ns: string) =
return

proc unsubscribe*(rdv: RendezVous, ns: string, peerIds: seq[PeerId]) {.async.} =
## The async unsubscribe procedure removes peers from a namespace by
## sending an "Unregister" message to each connected peer. The operation
## is bounded by a timeout for unsubscribing from all peers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this timeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

##
if ns.len notin 1 .. 255:
raise newException(RendezVousError, "Invalid namespace")

Expand Down Expand Up @@ -708,7 +720,6 @@ proc new*(
salt: string.fromBytes(generateBytes(rng[], 8)),
registered: initOffsettedSeq[RegisteredData](1),
defaultDT: Moment.now() - 1.days,
#registerEvent: newAsyncEvent(),
sema: newAsyncSemaphore(SemaphoreDefaultSize),
minDuration: minDuration,
maxDuration: maxDuration,
Expand Down
2 changes: 0 additions & 2 deletions tests/testrendezvous.nim
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ suite "RendezVous":
switch = createSwitch(rdv)
expect RendezVousError:
discard await rdv.request("A".repeat(300))
expect RendezVousError:
discard await rdv.request("A", -1)
expect RendezVousError:
discard await rdv.request("A", 3000)
expect RendezVousError:
Expand Down
Loading