Skip to content

Commit

Permalink
Start/stop protocols (#730)
Browse files Browse the repository at this point in the history
Starting/stopping a switch now starts/stops all protocols mounted on that switch
  • Loading branch information
lchenut authored Jun 30, 2022
1 parent e6440c4 commit 0ece5ea
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 360 deletions.
8 changes: 7 additions & 1 deletion libp2p/multistream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

{.push raises: [Defect].}

import std/[strutils]
import std/[strutils, sequtils]
import chronos, chronicles, stew/byteutils
import stream/connection,
protocols/protocol
Expand Down Expand Up @@ -209,3 +209,9 @@ proc addHandler*(m: MultistreamSelect,
m.handlers.add(HandlerHolder(protos: @[codec],
protocol: protocol,
match: matcher))

proc start*(m: MultistreamSelect) {.async.} =
await allFutures(m.handlers.mapIt(it.protocol.start()))

proc stop*(m: MultistreamSelect) {.async.} =
await allFutures(m.handlers.mapIt(it.protocol.stop()))
4 changes: 4 additions & 0 deletions libp2p/protocols/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ type
LPProtocol* = ref object of RootObj
codecs*: seq[string]
handler*: LPProtoHandler ## this handler gets invoked by the protocol negotiator
started*: bool

method init*(p: LPProtocol) {.base, gcsafe.} = discard
method start*(p: LPProtocol) {.async, base.} = p.started = true
method stop*(p: LPProtocol) {.async, base.} = p.started = false


func codec*(p: LPProtocol): string =
assert(p.codecs.len > 0, "Codecs sequence was empty!")
Expand Down
34 changes: 21 additions & 13 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -562,22 +562,28 @@ method publish*(g: GossipSub,

return peers.len

proc maintainDirectPeer(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
let peer = g.peers.getOrDefault(id)
if isNil(peer):
trace "Attempting to dial a direct peer", peer = id
try:
await g.switch.connect(id, addrs)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError as exc:
trace "Direct peer dial canceled"
raise exc
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg

proc addDirectPeer*(g: GossipSub, id: PeerId, addrs: seq[MultiAddress]) {.async.} =
g.parameters.directPeers[id] = addrs
await g.maintainDirectPeer(id, addrs)

proc maintainDirectPeers(g: GossipSub) {.async.} =
heartbeat "GossipSub DirectPeers", 1.minutes:
for id, addrs in g.parameters.directPeers:
let peer = g.peers.getOrDefault(id)
if isNil(peer):
trace "Attempting to dial a direct peer", peer = id
try:
# dial, internally connection will be stored
let _ = await g.switch.dial(id, addrs, g.codecs)
# populate the peer after it's connected
discard g.getOrCreatePeer(id, g.codecs)
except CancelledError as exc:
trace "Direct peer dial canceled"
raise exc
except CatchableError as exc:
debug "Direct peer error dialing", msg = exc.msg
await g.addDirectPeer(id, addrs)

method start*(g: GossipSub) {.async.} =
trace "gossipsub start"
Expand All @@ -589,9 +595,11 @@ method start*(g: GossipSub) {.async.} =
g.heartbeatFut = g.heartbeat()
g.scoringHeartbeatFut = g.scoringHeartbeat()
g.directPeersLoop = g.maintainDirectPeers()
g.started = true

method stop*(g: GossipSub) {.async.} =
trace "gossipsub stop"
g.started = false
if g.heartbeatFut.isNil:
warn "Stopping gossipsub without starting it"
return
Expand Down
8 changes: 0 additions & 8 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,6 @@ method initPubSub*(p: PubSub)
if p.msgIdProvider == nil:
p.msgIdProvider = defaultMsgIdProvider

method start*(p: PubSub) {.async, base.} =
## start pubsub
discard

method stop*(p: PubSub) {.async, base.} =
## stopt pubsub
discard

method addValidator*(p: PubSub,
topic: varargs[string],
hook: ValidatorHandler) {.base.} =
Expand Down
11 changes: 11 additions & 0 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type
dialer*: Dial
peerStore*: PeerStore
nameResolver*: NameResolver
started: bool

proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
Expand Down Expand Up @@ -144,6 +145,9 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
raise newException(LPError,
"Protocol has to define a codec string")

if s.started and not proto.started:
raise newException(LPError, "Protocol not started")

s.ms.addHandler(proto.codecs, proto, matcher)
s.peerInfo.protocols.add(proto.codec)

Expand Down Expand Up @@ -216,6 +220,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
proc stop*(s: Switch) {.async.} =
trace "Stopping switch"

s.started = false
# close and cleanup all connections
await s.connManager.close()

Expand All @@ -239,6 +244,8 @@ proc stop*(s: Switch) {.async.} =
if not a.finished:
a.cancel()

await s.ms.stop()

trace "Switch stopped"

proc start*(s: Switch) {.async, gcsafe.} =
Expand Down Expand Up @@ -272,6 +279,10 @@ proc start*(s: Switch) {.async, gcsafe.} =

s.peerInfo.update()

await s.ms.start()

s.started = true

debug "Started libp2p node", peer = s.peerInfo

proc newSwitch*(peerInfo: PeerInfo,
Expand Down
75 changes: 0 additions & 75 deletions tests/pubsub/testfloodsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ suite "FloodSub":
nodes[1].switch.start(),
)

# start pubsub
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)

nodes[1].subscribe("foobar", handler)
Expand All @@ -74,11 +67,6 @@ suite "FloodSub":
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut.concat())

asyncTest "FloodSub basic publish/subscribe B -> A":
Expand All @@ -96,12 +84,6 @@ suite "FloodSub":
nodes[1].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)

Expand All @@ -117,11 +99,6 @@ suite "FloodSub":
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub validation should succeed":
Expand All @@ -139,13 +116,6 @@ suite "FloodSub":
nodes[1].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)

nodes[1].subscribe("foobar", handler)
Expand All @@ -168,11 +138,6 @@ suite "FloodSub":
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub validation should fail":
Expand All @@ -188,13 +153,6 @@ suite "FloodSub":
nodes[1].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)
nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
Expand All @@ -214,11 +172,6 @@ suite "FloodSub":
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub validation one fails and one succeeds":
Expand All @@ -236,13 +189,6 @@ suite "FloodSub":
nodes[1].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
nodes[0].start(),
nodes[1].start(),
))

await subscribeNodes(nodes)
nodes[1].subscribe("foo", handler)
await waitSub(nodes[0], nodes[1], "foo")
Expand All @@ -266,11 +212,6 @@ suite "FloodSub":
nodes[1].switch.stop()
)

await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop()
)

await allFuturesThrowing(nodesFut)

asyncTest "FloodSub multiple peers, no self trigger":
Expand All @@ -296,7 +237,6 @@ suite "FloodSub":
nodes = generateNodes(runs, triggerSelf = false)
nodesFut = nodes.mapIt(it.switch.start())

await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)

for i in 0..<runs:
Expand All @@ -318,7 +258,6 @@ suite "FloodSub":
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))

await allFuturesThrowing(nodesFut)
Expand Down Expand Up @@ -346,7 +285,6 @@ suite "FloodSub":
nodes = generateNodes(runs, triggerSelf = true)
nodesFut = nodes.mapIt(it.switch.start())

await allFuturesThrowing(nodes.mapIt(it.start()))
await subscribeNodes(nodes)

for i in 0..<runs:
Expand Down Expand Up @@ -379,7 +317,6 @@ suite "FloodSub":
await allFuturesThrowing(
nodes.mapIt(
allFutures(
it.stop(),
it.switch.stop())))

await allFuturesThrowing(nodesFut)
Expand All @@ -400,13 +337,6 @@ suite "FloodSub":
smallNode[0].switch.start(),
)

# start pubsubcon
await allFuturesThrowing(
allFinished(
bigNode[0].start(),
smallNode[0].start(),
))

await subscribeNodes(bigNode & smallNode)
bigNode[0].subscribe("foo", handler)
smallNode[0].subscribe("foo", handler)
Expand All @@ -431,9 +361,4 @@ suite "FloodSub":
bigNode[0].switch.stop()
)

await allFuturesThrowing(
smallNode[0].stop(),
bigNode[0].stop()
)

await allFuturesThrowing(nodesFut)
Loading

0 comments on commit 0ece5ea

Please sign in to comment.