Skip to content

Commit

Permalink
Merge pull request #1230 from libp2p/peer-connectedness
Browse files Browse the repository at this point in the history
emit the EvtPeerConnectednessChanged event
  • Loading branch information
marten-seemann authored Oct 25, 2021
2 parents db8f9c6 + e1335f6 commit ddf96ce
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
5 changes: 5 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}
h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

if !h.disableSignedPeerRecord {
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore())
Expand Down
73 changes: 73 additions & 0 deletions p2p/host/basic/peer_connectedness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package basichost

import (
"sync"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type peerConnectWatcher struct {
emitter event.Emitter

mutex sync.Mutex
connected map[peer.ID]struct{}
}

var _ network.Notifiee = &peerConnectWatcher{}

func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
return &peerConnectWatcher{
emitter: emitter,
connected: make(map[peer.ID]struct{}),
}
}

func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}

func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}

func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}

func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
if changed := w.checkTransition(p, state); !changed {
return
}
w.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: state,
})
}

func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
w.mutex.Lock()
defer w.mutex.Unlock()
switch state {
case network.Connected:
if _, ok := w.connected[p]; ok {
return false
}
w.connected[p] = struct{}{}
return true
case network.NotConnected:
if _, ok := w.connected[p]; ok {
delete(w.connected, p)
return true
}
return false
default:
return false
}
}
48 changes: 48 additions & 0 deletions p2p/host/basic/peer_connectedness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package basichost

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"

"github.com/stretchr/testify/require"
)

func TestPeerConnectedness(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)

sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub1.Close()
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub2.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.Connected,
})
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h1.ID(),
Connectedness: network.Connected,
})

// now close h2. This will disconnect it from h1.
require.NoError(t, h2.Close())
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.NotConnected,
})
}

0 comments on commit ddf96ce

Please sign in to comment.