Skip to content

Commit

Permalink
autorelay: refactor relay finder and start autorelay after identify (#…
Browse files Browse the repository at this point in the history
…2120)

* Refactor relay_finder and start autorelay after identify

* Clock fork

* Remove multiple timers and use a single rate limiting chan for findNodes

* Remove clock fork

* Rename

* Use scheduledWork.nextAllowedCallToPeerSource.Add(rf.conf.minInterval)

* Fix flaky test that relied on time
  • Loading branch information
MarcoPolo authored Feb 23, 2023
1 parent 6406324 commit b74205d
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 169 deletions.
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ func (cfg *Config) NewNode() (host.Host, error) {
ho = routed.Wrap(h, router)
}
if ar != nil {
return autorelay.NewAutoRelayHost(ho, ar), nil
arh := autorelay.NewAutoRelayHost(ho, ar)
arh.Start()
ho = arh
}
return ho, nil
}
Expand Down
5 changes: 4 additions & 1 deletion p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
bhost.AddrsFactory = r.hostAddrs

return r, nil
}

func (r *AutoRelay) Start() {
r.refCount.Add(1)
go func() {
defer r.refCount.Done()
r.background()
}()
return r, nil
}

func (r *AutoRelay) background() {
Expand Down
106 changes: 42 additions & 64 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package autorelay_test

import (
"context"
"os"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -97,7 +95,7 @@ func newRelay(t *testing.T) host.Host {
}
}
return false
}, 500*time.Millisecond, 10*time.Millisecond)
}, time.Second, 10*time.Millisecond)
return h
}

Expand All @@ -121,7 +119,7 @@ func TestSingleCandidate(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
// test that we don't add any more relays
require.Never(t, func() bool { return numRelays(h) > 1 }, 200*time.Millisecond, 50*time.Millisecond)
require.Equal(t, 1, counter, "expected the peer source callback to only have been called once")
Expand Down Expand Up @@ -179,7 +177,7 @@ func TestWaitForCandidates(t *testing.T) {
r2 := newRelay(t)
t.Cleanup(func() { r2.Close() })
peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()}
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
}

func TestBackoff(t *testing.T) {
Expand Down Expand Up @@ -225,15 +223,19 @@ func TestBackoff(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return reservations.Load() == 1 }, 3*time.Second, 20*time.Millisecond)
require.Eventually(t, func() bool {
return reservations.Load() == 1
}, 10*time.Second, 20*time.Millisecond, "reservations load should be 1 was %d", reservations.Load())
// make sure we don't add any relays yet
for i := 0; i < 2; i++ {
cl.Add(backoff / 3)
require.Equal(t, 1, int(reservations.Load()))
}
cl.Add(backoff / 2)
require.Eventually(t, func() bool { return reservations.Load() == 2 }, 3*time.Second, 20*time.Millisecond)
require.Less(t, int(counter.Load()), 100) // just make sure we're not busy-looping
cl.Add(backoff)
require.Eventually(t, func() bool {
return reservations.Load() == 2
}, 10*time.Second, 100*time.Millisecond, "reservations load should be 2 was %d", reservations.Load())
require.Less(t, int(counter.Load()), 300) // just make sure we're not busy-looping
require.Equal(t, 2, int(reservations.Load()))
}

Expand All @@ -252,7 +254,7 @@ func TestStaticRelays(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 2*time.Second, 50*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 50*time.Millisecond)
}

func TestConnectOnDisconnect(t *testing.T) {
Expand All @@ -275,7 +277,7 @@ func TestConnectOnDisconnect(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
relaysInUse := usedRelays(h)
require.Len(t, relaysInUse, 1)
oldRelay := relaysInUse[0]
Expand All @@ -286,7 +288,7 @@ func TestConnectOnDisconnect(t *testing.T) {
}
}

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
relaysInUse = usedRelays(h)
require.Len(t, relaysInUse, 1)
require.NotEqualf(t, oldRelay, relaysInUse[0], "old relay should not be used again")
Expand Down Expand Up @@ -332,28 +334,31 @@ func TestMaxAge(t *testing.T) {
)
defer h.Close()

require.Eventually(t, func() bool { return numRelays(h) > 0 }, 3*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
return numRelays(h) > 0
}, 10*time.Second, 100*time.Millisecond)
relays := usedRelays(h)
require.Len(t, relays, 1)

waitFor := 500 * time.Millisecond
tick := 100 * time.Millisecond
if os.Getenv("CI") != "" {
// Only increase the waitFor since we are increasing the mock clock every tick.
waitFor *= 10
}
require.Eventually(t, func() bool {
// we don't know exactly when the timer is reset, just advance our timer multiple times if necessary
cl.Add(time.Second)
cl.Add(30 * time.Second)
return len(peerChans) == 0
}, waitFor, tick)
}, 10*time.Second, 100*time.Millisecond)

cl.Add(10 * time.Minute)
for _, r := range relays2 {
peerChan2 <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
}
cl.Add(11 * time.Minute)

require.Eventually(t, func() bool {
relays = usedRelays(h)
return len(relays) == 1
}, 10*time.Second, 100*time.Millisecond)

// by now the 3 relays should have been garbage collected
// And we should only be using a single relay. Lets close it.
var oldRelay peer.ID
for _, r := range relays1 {
if r.ID() == relays[0] {
Expand All @@ -369,7 +374,7 @@ func TestMaxAge(t *testing.T) {
return false
}
return relays[0] != oldRelay
}, 3*time.Second, 100*time.Millisecond)
}, 10*time.Second, 100*time.Millisecond)

require.Len(t, relays, 1)
ids := make([]peer.ID, 0, len(relays2))
Expand All @@ -379,40 +384,6 @@ func TestMaxAge(t *testing.T) {
require.Contains(t, ids, relays[0])
}

func expectDeltaInAddrUpdated(t *testing.T, addrUpdated event.Subscription, expectedDelta int) {
t.Helper()
delta := 0
for {
select {
case evAny := <-addrUpdated.Out():
ev := evAny.(event.EvtLocalAddressesUpdated)
for _, updatedAddr := range ev.Removed {
if updatedAddr.Action == event.Removed {
if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil {
delta--
if delta == expectedDelta {
return
}
}
}
}
for _, updatedAddr := range ev.Current {
if updatedAddr.Action == event.Added {
if _, err := updatedAddr.Address.ValueForProtocol(ma.P_CIRCUIT); err == nil {
delta++
if delta == expectedDelta {
return
}
}
}
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for address updated event")
}
}

}

func TestReconnectToStaticRelays(t *testing.T) {
cl := clock.NewMock()
var staticRelays []peer.AddrInfo
Expand All @@ -428,16 +399,14 @@ func TestReconnectToStaticRelays(t *testing.T) {
h := newPrivateNodeWithStaticRelays(t,
staticRelays,
autorelay.WithClock(cl),
autorelay.WithBackoff(30*time.Minute),
)

defer h.Close()

addrUpdated, err := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
require.NoError(t, err)

expectDeltaInAddrUpdated(t, addrUpdated, 1)

cl.Add(time.Minute)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)

relaysInUse := usedRelays(h)
oldRelay := relaysInUse[0]
Expand All @@ -446,12 +415,18 @@ func TestReconnectToStaticRelays(t *testing.T) {
r.Network().ClosePeer(h.ID())
}
}
require.Eventually(t, func() bool {
return numRelays(h) == 0
}, 10*time.Second, 100*time.Millisecond)

cl.Add(time.Hour)
expectDeltaInAddrUpdated(t, addrUpdated, -1)
require.Eventually(t, func() bool {
return numRelays(h) == 1
}, 10*time.Second, 100*time.Millisecond)
}

func TestMinInterval(t *testing.T) {
cl := clock.NewMock()
h := newPrivateNode(t,
func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
Expand All @@ -461,14 +436,17 @@ func TestMinInterval(t *testing.T) {
peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()}
return peerChan
},
autorelay.WithClock(cl),
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
autorelay.WithMinInterval(500*time.Millisecond),
)
defer h.Close()

cl.Add(500 * time.Millisecond)
// The second call to peerSource should happen after 1 second
require.Never(t, func() bool { return numRelays(h) > 0 }, 500*time.Millisecond, 100*time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 1*time.Second, 100*time.Millisecond)
cl.Add(500 * time.Millisecond)
require.Eventually(t, func() bool { return numRelays(h) > 0 }, 10*time.Second, 100*time.Millisecond)
}
4 changes: 4 additions & 0 deletions p2p/host/autorelay/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ func (h *AutoRelayHost) Close() error {
return h.Host.Close()
}

func (h *AutoRelayHost) Start() {
h.ar.Start()
}

func NewAutoRelayHost(h host.Host, ar *AutoRelay) *AutoRelayHost {
return &AutoRelayHost{Host: h, ar: ar}
}
Loading

0 comments on commit b74205d

Please sign in to comment.