Skip to content

Commit

Permalink
Merge pull request #45 from libp2p/feat/delaystart
Browse files Browse the repository at this point in the history
Limiting autonat service responses/startup
  • Loading branch information
willscott authored Mar 10, 2020
2 parents 9ad01ee + 5817227 commit f7a9551
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 24 deletions.
119 changes: 96 additions & 23 deletions p2p/host/autonat/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package autonat

import (
"context"
"math/rand"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"

pb "github.com/libp2p/go-libp2p-autonat/pb"

Expand All @@ -22,38 +26,62 @@ import (
const P_CIRCUIT = 290

var (
AutoNATServiceDialTimeout = 15 * time.Second
// AutoNATServiceDialTimeout defines how long to wait for connection
// attempts before failing.
AutoNATServiceDialTimeout = 15 * time.Second
// AutoNATServiceResetInterval defines how often to reset throttling.
AutoNATServiceResetInterval = 1 * time.Minute
// AutoNATServiceResetJitter defines the amplitude of randomness in throttle
// reset timing.
AutoNATServiceResetJitter = 15 * time.Second

// AutoNATServiceThrottle defines how many times each ResetInterval a peer
// can ask for its autonat address.
AutoNATServiceThrottle = 3
// AutoNATGlobalThrottle defines how many total autonat requests this
// service will answer each ResetInterval.
AutoNATGlobalThrottle = 30
// AutoNATMaxPeerAddresses defines maximum number of addreses the autonat
// service will consider when attempting to connect to the peer.
AutoNATMaxPeerAddresses = 16
)

// AutoNATService provides NAT autodetection services to other peers
type AutoNATService struct {
ctx context.Context
h host.Host
dialer host.Host

// rate limiter
mx sync.Mutex
reqs map[peer.ID]int
mx sync.Mutex
reqs map[peer.ID]int
globalReqMax int
globalReqs int
}

// NewAutoNATService creates a new AutoNATService instance attached to a host
func NewAutoNATService(ctx context.Context, h host.Host, opts ...libp2p.Option) (*AutoNATService, error) {
func NewAutoNATService(ctx context.Context, h host.Host, forceEnabled bool, opts ...libp2p.Option) (*AutoNATService, error) {
opts = append(opts, libp2p.NoListenAddrs)
dialer, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}

as := &AutoNATService{
ctx: ctx,
dialer: dialer,
reqs: make(map[peer.ID]int),
ctx: ctx,
h: h,
dialer: dialer,
globalReqMax: AutoNATGlobalThrottle,
reqs: make(map[peer.ID]int),
}
h.SetStreamHandler(autonat.AutoNATProto, as.handleStream)

go as.resetRateLimiter()
if forceEnabled {
as.globalReqMax = 0
h.SetStreamHandler(autonat.AutoNATProto, as.handleStream)
go as.resetRateLimiter()
} else {
go as.enableWhenPublic()
}

return as, nil
}
Expand Down Expand Up @@ -113,13 +141,15 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me
}
}

addrs := make([]ma.Multiaddr, 0)
addrs := make([]ma.Multiaddr, 0, AutoNATMaxPeerAddresses)
seen := make(map[string]struct{})

// add observed addr to the list of addresses to dial
var obsHost net.IP
if !as.skipDial(obsaddr) {
addrs = append(addrs, obsaddr)
seen[obsaddr.String()] = struct{}{}
obsHost, _ = manet.ToIP(obsaddr)
}

for _, maddr := range mpi.GetAddrs() {
Expand All @@ -133,6 +163,10 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me
continue
}

if ip, err := manet.ToIP(addr); err != nil || !obsHost.Equal(ip) {
continue
}

str := addr.String()
_, ok := seen[str]
if ok {
Expand All @@ -141,6 +175,10 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me

addrs = append(addrs, addr)
seen[str] = struct{}{}

if len(addrs) >= AutoNATMaxPeerAddresses {
break
}
}

if len(addrs) == 0 {
Expand All @@ -162,24 +200,35 @@ func (as *AutoNATService) skipDial(addr ma.Multiaddr) bool {
return true
}

// Skip dialing addresses we believe are the local node's
for _, localAddr := range as.h.Addrs() {
if localAddr.Equal(addr) {
return true
}
}

return false
}

func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
// rate limit check
as.mx.Lock()
count := as.reqs[pi.ID]
if count >= AutoNATServiceThrottle {
if count >= AutoNATServiceThrottle || (as.globalReqMax > 0 && as.globalReqs >= as.globalReqMax) {
as.mx.Unlock()
return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials")
}
as.reqs[pi.ID] = count + 1
as.globalReqs++
as.mx.Unlock()

ctx, cancel := context.WithTimeout(as.ctx, AutoNATServiceDialTimeout)
defer cancel()

err := as.dialer.Connect(ctx, pi)
as.dialer.Peerstore().ClearAddrs(pi.ID)

as.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
conn, err := as.dialer.Network().DialPeer(ctx, pi.ID)
if err != nil {
log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error())
// wait for the context to timeout to avoid leaking timing information
Expand All @@ -188,28 +237,52 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
return newDialResponseError(pb.Message_E_DIAL_ERROR, "dial failed")
}

conns := as.dialer.Network().ConnsToPeer(pi.ID)
if len(conns) == 0 {
log.Errorf("supposedly connected to %s, but no connection to peer", pi.ID.Pretty())
return newDialResponseError(pb.Message_E_INTERNAL_ERROR, "internal service error")
}

ra := conns[0].RemoteMultiaddr()
ra := conn.RemoteMultiaddr()
as.dialer.Network().ClosePeer(pi.ID)
return newDialResponseOK(ra)
}

func (as *AutoNATService) enableWhenPublic() {
sub, _ := as.h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
defer sub.Close()

running := false

for {
select {
case ev, ok := <-sub.Out():
if !ok {
return
}
state := ev.(event.EvtLocalReachabilityChanged).Reachability
if state == network.ReachabilityPublic {
as.h.SetStreamHandler(autonat.AutoNATProto, as.handleStream)
if !running {
go as.resetRateLimiter()
running = true
}
} else {
as.h.RemoveStreamHandler(autonat.AutoNATProto)
}
case <-as.ctx.Done():
return
}
}
}

func (as *AutoNATService) resetRateLimiter() {
ticker := time.NewTicker(AutoNATServiceResetInterval)
defer ticker.Stop()
timer := time.NewTimer(AutoNATServiceResetInterval)
defer timer.Stop()

for {
select {
case <-ticker.C:
case <-timer.C:
as.mx.Lock()
as.reqs = make(map[peer.ID]int)
as.globalReqs = 0
as.mx.Unlock()

jitter := rand.Float32() * float32(AutoNATServiceResetJitter)
timer.Reset(AutoNATServiceResetInterval + time.Duration(int64(jitter)))
case <-as.ctx.Done():
return
}
Expand Down
124 changes: 123 additions & 1 deletion p2p/host/autonat/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

autonat "github.com/libp2p/go-libp2p-autonat"
Expand All @@ -20,7 +22,7 @@ func makeAutoNATService(ctx context.Context, t *testing.T) (host.Host, *AutoNATS
t.Fatal(err)
}

as, err := NewAutoNATService(ctx, h)
as, err := NewAutoNATService(ctx, h, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -101,6 +103,8 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
AutoNATServiceThrottle = 1
save4 := manet.Private4
manet.Private4 = []*net.IPNet{}
save5 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 0 * time.Second

hs, _ := makeAutoNATService(ctx, t)
hc, ac := makeAutoNATClient(ctx, t)
Expand Down Expand Up @@ -131,4 +135,122 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
AutoNATServiceResetInterval = save2
AutoNATServiceThrottle = save3
manet.Private4 = save4
AutoNATServiceResetJitter = save5
}

func TestAutoNATServiceGlobalLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

save1 := AutoNATServiceDialTimeout
AutoNATServiceDialTimeout = 1 * time.Second
save2 := AutoNATServiceResetInterval
AutoNATServiceResetInterval = 10 * time.Second
save3 := AutoNATServiceThrottle
AutoNATServiceThrottle = 1
save4 := manet.Private4
manet.Private4 = []*net.IPNet{}
save5 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 0 * time.Second
save6 := AutoNATGlobalThrottle
AutoNATGlobalThrottle = 5

hs, as := makeAutoNATService(ctx, t)
as.globalReqMax = 5

for i := 0; i < 5; i++ {
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)

_, err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
}

hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}

if !autonat.IsDialRefused(err) {
t.Fatal(err)
}

AutoNATServiceDialTimeout = save1
AutoNATServiceResetInterval = save2
AutoNATServiceThrottle = save3
manet.Private4 = save4
AutoNATServiceResetJitter = save5
AutoNATGlobalThrottle = save6
}

func TestAutoNATServiceRateLimitJitter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

save1 := AutoNATServiceResetInterval
AutoNATServiceResetInterval = 100 * time.Millisecond
save2 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 100 * time.Millisecond

_, svc := makeAutoNATService(ctx, t)
svc.mx.Lock()
svc.globalReqs = 1
svc.mx.Unlock()
time.Sleep(200 * time.Millisecond)

svc.mx.Lock()
defer svc.mx.Unlock()
if svc.globalReqs != 0 {
t.Fatal("reset of rate limitter occured slower than expected")
}

cancel()

AutoNATServiceResetInterval = save1
AutoNATServiceResetJitter = save2
}

func TestAutoNATServiceStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

save := manet.Private4
manet.Private4 = []*net.IPNet{}

h, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}

_, err = NewAutoNATService(ctx, h, false)
if err != nil {
t.Fatal(err)
}

eb, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged))

hc, ac := makeAutoNATClient(ctx, t)
connect(t, h, hc)

_, err = ac.DialBack(ctx, h.ID())
if err == nil {
t.Fatal("autonat should not be started / advertising.")
}

eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
_, err = ac.DialBack(ctx, h.ID())
if err != nil {
t.Fatalf("autonat should be active, was %v", err)
}

eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
_, err = ac.DialBack(ctx, h.ID())
if err == nil {
t.Fatal("autonat should not be started / advertising.")
}

manet.Private4 = save
}

0 comments on commit f7a9551

Please sign in to comment.