Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net/mock: mimic Swarm's event and notification behavior in MockNet #2287

Merged
merged 2 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,12 @@ func (c *conn) Close() error {
return nil
}

func (c *conn) teardown() error {
func (c *conn) teardown() {
for _, s := range c.allStreams() {
s.Reset()
}
c.net.removeConn(c)

go func() {
c.notifLk.Lock()
defer c.notifLk.Unlock()
c.net.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
}()
return nil
Comment on lines -95 to -102
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved for nicer logic grouping.

c.net.removeConn(c)
}

func (c *conn) addStream(s *stream) {
Expand Down
5 changes: 4 additions & 1 deletion p2p/net/mock/mock_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -108,14 +109,16 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
}

func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peerstore.Peerstore) (host.Host, error) {
n, err := newPeernet(mn, p, ps)
bus := eventbus.NewBus()
n, err := newPeernet(mn, p, ps, bus)
if err != nil {
return nil, err
}

opts := &bhost.HostOpts{
NegotiationTimeout: -1,
DisableSignedPeerRecord: true,
EventBus: bus,
}

h, err := bhost.NewHost(n, opts)
Expand Down
50 changes: 35 additions & 15 deletions p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ import (
"math/rand"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"

ma "github.com/multiformats/go-multiaddr"
)

// peernet implements network.Network
type peernet struct {
mocknet *mocknet // parent

peer peer.ID
ps peerstore.Peerstore
peer peer.ID
ps peerstore.Peerstore
emitter event.Emitter

// conns are actual live connections between peers.
// many conns could run over each link.
Expand All @@ -37,11 +38,17 @@ type peernet struct {
}

// newPeernet constructs a new peernet
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore) (*peernet, error) {
func newPeernet(m *mocknet, p peer.ID, ps peerstore.Peerstore, bus event.Bus) (*peernet, error) {
emitter, err := bus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil, err
}

n := &peernet{
mocknet: m,
peer: p,
ps: ps,
emitter: emitter,

connsByPeer: map[peer.ID]map[*conn]struct{}{},
connsByLink: map[*link]map[*conn]struct{}{},
Expand All @@ -57,6 +64,7 @@ func (pn *peernet) Close() error {
for _, c := range pn.allConns() {
c.Close()
}
pn.emitter.Close()
return pn.ps.Close()
}

Expand Down Expand Up @@ -192,13 +200,16 @@ func (pn *peernet) addConn(c *conn) {
pn.notifyAll(func(n network.Notifiee) {
n.Connected(pn, c)
})

pn.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: c.remote,
Connectedness: network.Connected,
})
}

// removeConn removes a given conn
func (pn *peernet) removeConn(c *conn) {
pn.Lock()
defer pn.Unlock()

cs, found := pn.connsByLink[c.link]
if !found || len(cs) < 1 {
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %p", c.link))
Expand All @@ -210,6 +221,22 @@ func (pn *peernet) removeConn(c *conn) {
panic(fmt.Sprintf("attempting to remove a conn that doesnt exist %v", c.remote))
}
delete(cs, c)
pn.Unlock()

// notify asynchronously to mimic Swarm
// FIXME: IIRC, we wanted to make notify for Close synchronous
Comment on lines +226 to +227
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yes, this means that ideally, we'd also get rid of the Go routine for the Disconnected notification.

#2027 (comment)

go func() {
c.notifLk.Lock()
defer c.notifLk.Unlock()
pn.notifyAll(func(n network.Notifiee) {
n.Disconnected(c.net, c)
})
}()

c.net.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: c.remote,
Connectedness: network.NotConnected,
})
}

// LocalPeer the network's LocalPeer
Expand Down Expand Up @@ -355,18 +382,11 @@ func (pn *peernet) StopNotify(f network.Notifiee) {
// notifyAll runs the notification function on all Notifiees
func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
pn.notifmu.Lock()
var wg sync.WaitGroup
// notify synchronously to mimic Swarm
for n := range pn.notifs {
// make sure we dont block
// and they dont block each other.
wg.Add(1)
go func(n network.Notifiee) {
defer wg.Done()
notification(n)
}(n)
notification(n)
}
pn.notifmu.Unlock()
wg.Wait()
}

func (pn *peernet) ResourceManager() network.ResourceManager {
Expand Down
81 changes: 81 additions & 0 deletions p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -600,3 +601,83 @@ func TestStreamsWithLatency(t *testing.T) {
t.Fatalf("Expected write to take ~%s (+/- %s), but took %s", latency.String(), tolerance.String(), delta.String())
}
}

func TestEventBus(t *testing.T) {
const peers = 2

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

mn, err := FullMeshLinked(peers)
if err != nil {
t.Fatal(err)
}
defer mn.Close()

sub0, err := mn.Hosts()[0].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
t.Fatal(err)
}
defer sub0.Close()
sub1, err := mn.Hosts()[1].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
t.Fatal(err)
}
defer sub1.Close()

id0, id1 := mn.Hosts()[0].ID(), mn.Hosts()[1].ID()

_, err = mn.ConnectPeers(id0, id1)
if err != nil {
t.Fatal(err)
}
for range make([]int, peers) {
select {
case evt := <-sub0.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id1 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.Connected {
t.Fatal("wrong connectedness type")
}
case evt := <-sub1.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id0 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.Connected {
t.Fatal("wrong connectedness type")
}
case <-ctx.Done():
t.Fatal("didn't get connectedness events in time")
}
}

err = mn.DisconnectPeers(id0, id1)
if err != nil {
t.Fatal(err)
}
for range make([]int, peers) {
select {
case evt := <-sub0.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id1 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.NotConnected {
t.Fatal("wrong connectedness type")
}
case evt := <-sub1.Out():
evnt := evt.(event.EvtPeerConnectednessChanged)
if evnt.Peer != id0 {
t.Fatal("wrong remote peer")
}
if evnt.Connectedness != network.NotConnected {
t.Fatal("wrong connectedness type")
}
case <-ctx.Done():
t.Fatal("didn't get connectedness events in time")
}
}
}