Skip to content

Commit

Permalink
identify: refactor sending of Identify pushes (#1984)
Browse files Browse the repository at this point in the history
* identify: cache the snapshot

* identify: refactor sending of Identify pushes

* identify: fix concurrency when sending pushes

* identify: fix timestamp handling

* identify: remove unneeded pushSemaphore

* identify: improve logging

* identify: use a sequence number instead of a timestamp

* identify: start with an empty snapshot

* identify: wait until we've actually finished setting up
  • Loading branch information
marten-seemann authored Feb 8, 2023
1 parent ad2f6d0 commit 235f25b
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 433 deletions.
3 changes: 2 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {

h.updateLocalIpAddr()

if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
Expand Down Expand Up @@ -367,6 +367,7 @@ func (h *BasicHost) updateLocalIpAddr() {
func (h *BasicHost) Start() {
h.psManager.Start()
h.refCount.Add(1)
h.ids.Start()
go h.background()
}

Expand Down
21 changes: 12 additions & 9 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestHostAddrsFactory(t *testing.T) {

addrs := h.Addrs()
if len(addrs) != 1 {
t.Fatalf("expected 1 addr, got %d", len(addrs))
t.Fatalf("expected 1 addr, got %+v", addrs)
}
if !addrs[0].Equal(maddr) {
t.Fatalf("expected %s, got %s", maddr.String(), addrs[0].String())
Expand Down Expand Up @@ -245,8 +245,10 @@ func getHostPair(t *testing.T) (host.Host, host.Host) {

h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2.Start()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -342,14 +344,12 @@ func TestHostProtoMismatch(t *testing.T) {
}

func TestHostProtoPreknowledge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()

h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
defer h2.Close()

conn := make(chan protocol.ID)
Expand All @@ -362,8 +362,11 @@ func TestHostProtoPreknowledge(t *testing.T) {
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)

h1.Start()
h2.Start()

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
require.NoError(t, h1.Connect(context.Background(), h2pi))

// wait for identify handshake to finish completely
select {
Expand All @@ -380,12 +383,12 @@ func TestHostProtoPreknowledge(t *testing.T) {

h2.SetStreamHandler("/foo", handler)

s, err := h1.NewStream(ctx, h2.ID(), "/foo", "/bar", "/super")
s, err := h1.NewStream(context.Background(), h2.ID(), "/foo", "/bar", "/super")
require.NoError(t, err)

select {
case p := <-conn:
t.Fatal("shouldnt have gotten connection yet, we should have a lazy stream: ", p)
t.Fatal("shouldn't have gotten connection yet, we should have a lazy stream: ", p)
case <-time.After(time.Millisecond * 50):
}

Expand Down Expand Up @@ -532,7 +535,6 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
return taddrs
}})
require.NoError(t, err)
h.Start()
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
Expand All @@ -541,6 +543,7 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
t.Error(err)
}
defer sub.Close()
h.Start()

expected := event.EvtLocalAddressesUpdated{
Diffs: true,
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestLimitedStreams(t *testing.T) {
}

wg.Wait()
if !within(time.Since(before), time.Second*2, time.Second) {
if !within(time.Since(before), time.Second*5/2, time.Second) {
t.Fatal("Expected 2ish seconds but got ", time.Since(before))
}
}
Expand Down
21 changes: 20 additions & 1 deletion p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/p2p/transport/tcp"

"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-testing/race"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -68,6 +72,8 @@ var _ identify.IDService = &mockIDService{}
func newMockIDService(t *testing.T, h host.Host) identify.IDService {
ids, err := identify.NewIDService(h)
require.NoError(t, err)
ids.Start()
t.Cleanup(func() { ids.Close() })
return &mockIDService{IDService: ids}
}

Expand Down Expand Up @@ -448,10 +454,23 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)

_, err = relayv1.NewRelay(relay)
require.NoError(t, err)

// make sure the relay service is started and advertised by Identify
h, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.Transport(tcp.NewTCPTransport),
libp2p.DisableRelay(),
)
require.NoError(t, err)
defer h.Close()
require.NoError(t, h.Connect(context.Background(), peer.AddrInfo{ID: relay.ID(), Addrs: relay.Addrs()}))
require.Eventually(t, func() bool {
supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop, relayv1.ProtoID)
return err == nil && len(supported) > 0
}, 3*time.Second, 100*time.Millisecond)

h2 = mkHostWithStaticAutoRelay(t, relay)
if addHolePuncher {
hps = addHolePunchService(t, h2, h2opt...)
Expand Down
Loading

0 comments on commit 235f25b

Please sign in to comment.