Skip to content

Commit

Permalink
Switch to the new peer notify mechanism
Browse files Browse the repository at this point in the history
1. Only listen for peers added and identify events.
2. Remove the old "Limited" check. Peers only show up as "Connected" if
they have non-limited connections.
3. Don't bother listening for new connections directly and/or
connectivity changes. We'll get a new identify event per new connection
regardless.

fixes #546
  • Loading branch information
Stebalien committed Jul 11, 2024
1 parent 8e498e9 commit 0b97df5
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 78 deletions.
75 changes: 0 additions & 75 deletions notify.go

This file was deleted.

91 changes: 91 additions & 0 deletions peer_notify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package pubsub

import (
"context"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

func (ps *PubSub) watchForNewPeers(ctx context.Context) {
// We don't bother subscribing to "connectivity" events because we always run identify after
// every new connection.
sub, err := ps.host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerProtocolsUpdated{},
})
if err != nil {
log.Errorf("failed to subscribe to peer identification events: %v", err)
return
}
defer sub.Close()

ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
for _, pid := range ps.host.Network().Peers() {
if ps.host.Network().Connectedness(pid) != network.Connected {
continue
}
ps.newPeersPend[pid] = struct{}{}
}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()

select {
case ps.newPeers <- struct{}{}:
default:
}

supportedProtocols := make(map[protocol.ID]struct{})
for _, proto := range ps.rt.Protocols() {
supportedProtocols[proto] = struct{}{}
}

for ctx.Err() == nil {
var ev any
select {
case <-ctx.Done():
return
case ev = <-sub.Out():
}

var protos []protocol.ID
var peer peer.ID
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
peer = ev.Peer
protos = ev.Protocols
case event.EvtPeerProtocolsUpdated:
peer = ev.Peer
protos = ev.Added
default:
continue
}

// We don't bother checking connectivity (connected and non-"limited") here because
// we'll check when actually handling the new peer.

for _, p := range protos {
if _, ok := supportedProtocols[p]; ok {
ps.notifyNewPeer(peer)
break
}
}
}

}

func (ps *PubSub) notifyNewPeer(peer peer.ID) {
ps.newPeersPrioLk.RLock()
ps.newPeersMx.Lock()
ps.newPeersPend[peer] = struct{}{}
ps.newPeersMx.Unlock()
ps.newPeersPrioLk.RUnlock()

select {
case ps.newPeers <- struct{}{}:
default:
}
}
6 changes: 3 additions & 3 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,12 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
h.SetStreamHandler(id, ps.handleNewStream)
}
}
h.Network().Notify((*PubSubNotif)(ps))
go ps.watchForNewPeers(ctx)

ps.val.Start(ps)

go ps.processLoop(ctx)

(*PubSubNotif)(ps).Initialize()

return ps, nil
}

Expand Down Expand Up @@ -687,6 +685,8 @@ func (p *PubSub) handlePendingPeers() {
p.newPeersPrioLk.Unlock()

for pid := range newPeers {
// Make sure we have a non-limited connection. We do this late because we may have
// disconnected in the meantime.
if p.host.Network().Connectedness(pid) != network.Connected {
continue
}
Expand Down

0 comments on commit 0b97df5

Please sign in to comment.