From b952c16927e04a2aed5c4d69692a01e717d3db65 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 12 Feb 2020 17:04:29 -0800 Subject: [PATCH 01/13] rate limiting and selectivity of autonat svc * limits addresses for a peer (at most 4 chosen) - fix #39 * clears addresses before dialing back - fix #38 * global rate limit of 30 responses per (1 - 1.25 min) - fix #36 * only dial back on the source IP - fix #32 --- p2p/host/autonat/svc.go | 59 ++++++++++++++++++++++++++++++------ p2p/host/autonat/svc_test.go | 30 ++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index a685f3d36d..98af2941da 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -1,7 +1,10 @@ package autonat import ( + "bytes" "context" + "math/rand" + "net" "sync" "time" @@ -24,8 +27,11 @@ const P_CIRCUIT = 290 var ( AutoNATServiceDialTimeout = 15 * time.Second AutoNATServiceResetInterval = 1 * time.Minute + AutoNATServiceResetJitter = 15 * time.Second - AutoNATServiceThrottle = 3 + AutoNATServiceThrottle = 3 + AutoNATGlobalThrottle = 30 + AutoNATMaxPeerAddresses = 4 ) // AutoNATService provides NAT autodetection services to other peers @@ -34,8 +40,9 @@ type AutoNATService struct { dialer host.Host // rate limiter - mx sync.Mutex - reqs map[peer.ID]int + mx sync.Mutex + reqs map[peer.ID]int + globalReqs int } // NewAutoNATService creates a new AutoNATService instance attached to a host @@ -96,6 +103,21 @@ func (as *AutoNATService) handleStream(s network.Stream) { } } +// Optimistically extract the net.IP host from a multiaddress. +func addrToIP(addr ma.Multiaddr) net.IP { + if v4, err := addr.ValueForProtocol(ma.P_IP4); err == nil { + if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP4).Name, v4); err == nil { + return net.IP(c.RawValue()) + } + } + if v6, err := addr.ValueForProtocol(ma.P_IP6); err == nil { + if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP6).Name, v6); err == nil { + return net.IP(c.RawValue()) + } + } + return nil +} + func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { if mpi == nil { return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info") @@ -113,13 +135,15 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me } } - addrs := make([]ma.Multiaddr, 0) + addrs := make([]ma.Multiaddr, 0, AutoNATMaxPeerAddresses) seen := make(map[string]struct{}) // add observed addr to the list of addresses to dial + var obsHost net.IP if !as.skipDial(obsaddr) { addrs = append(addrs, obsaddr) seen[obsaddr.String()] = struct{}{} + obsHost = addrToIP(obsaddr) } for _, maddr := range mpi.GetAddrs() { @@ -129,10 +153,22 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me continue } + if len(addrs) >= AutoNATMaxPeerAddresses { + continue + } + if as.skipDial(addr) { continue } + if err != nil { + log.Debugf("Unexpected public, non-IP multiaddr: %s", err) + continue + } + if !bytes.Equal(obsHost, addrToIP(addr)) { + continue + } + str := addr.String() _, ok := seen[str] if ok { @@ -169,16 +205,19 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { // rate limit check as.mx.Lock() count := as.reqs[pi.ID] - if count >= AutoNATServiceThrottle { + if count >= AutoNATServiceThrottle || as.globalReqs >= AutoNATGlobalThrottle { as.mx.Unlock() return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials") } as.reqs[pi.ID] = count + 1 + as.globalReqs++ as.mx.Unlock() ctx, cancel := context.WithTimeout(as.ctx, AutoNATServiceDialTimeout) defer cancel() + as.dialer.Peerstore().ClearAddrs(pi.ID) + err := as.dialer.Connect(ctx, pi) if err != nil { log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error()) @@ -200,16 +239,18 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { } func (as *AutoNATService) resetRateLimiter() { - ticker := time.NewTicker(AutoNATServiceResetInterval) - defer ticker.Stop() + timer := time.NewTimer(AutoNATServiceResetInterval) + defer timer.Stop() for { select { - case <-ticker.C: + case <-timer.C: as.mx.Lock() as.reqs = make(map[peer.ID]int) + as.globalReqs = 0 as.mx.Unlock() - + jitter := rand.Float32() * float32(AutoNATServiceResetJitter) + timer.Reset(AutoNATServiceResetInterval + time.Duration(int64(jitter))) case <-as.ctx.Done(): return } diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index 9ef9ec0197..f2aa91a6b3 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" autonat "github.com/libp2p/go-libp2p-autonat" + ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" ) @@ -101,6 +102,8 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { AutoNATServiceThrottle = 1 save4 := manet.Private4 manet.Private4 = []*net.IPNet{} + save5 := AutoNATServiceResetJitter + AutoNATServiceResetJitter = 0 * time.Second hs, _ := makeAutoNATService(ctx, t) hc, ac := makeAutoNATClient(ctx, t) @@ -131,4 +134,31 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { AutoNATServiceResetInterval = save2 AutoNATServiceThrottle = save3 manet.Private4 = save4 + AutoNATServiceResetJitter = save5 +} + +func TestAddrToIP(t *testing.T) { + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") + if !addrToIP(addr).Equal(net.IPv4(127, 0, 0, 1)) { + t.Fatal("addrToIP of ipv4 localhost incorrect!") + } + + addr, _ = ma.NewMultiaddr("/ip4/192.168.0.1/tcp/6") + if !addrToIP(addr).Equal(net.IPv4(192, 168, 0, 1)) { + t.Fatal("addrToIP of ipv4 incorrect!") + } + + addr, _ = ma.NewMultiaddr("/ip6/::ffff:127.0.0.1/tcp/111") + if !addrToIP(addr).Equal(net.ParseIP("::ffff:127.0.0.1")) { + t.Fatal("addrToIP of ipv6 incorrect!") + } + addr, _ = ma.NewMultiaddr("/ip6zone/eth0/ip6/fe80::1") + if !addrToIP(addr).Equal(net.ParseIP("fe80::1")) { + t.Fatal("addrToIP of ip6zone incorrect!") + } + + addr, _ = ma.NewMultiaddr("/unix/a/b/c/d") + if addrToIP(addr) != nil { + t.Fatal("invalid addrToIP populates") + } } From 10b0a942b847c0797acf674d5c481e288ccf4530 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Fri, 21 Feb 2020 13:54:33 -0800 Subject: [PATCH 02/13] cleaner addrToIP implementation --- p2p/host/autonat/svc.go | 45 +++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 98af2941da..141f4ee776 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -3,8 +3,10 @@ package autonat import ( "bytes" "context" + "fmt" "math/rand" "net" + "strings" "sync" "time" @@ -31,7 +33,7 @@ var ( AutoNATServiceThrottle = 3 AutoNATGlobalThrottle = 30 - AutoNATMaxPeerAddresses = 4 + AutoNATMaxPeerAddresses = 16 ) // AutoNATService provides NAT autodetection services to other peers @@ -105,17 +107,30 @@ func (as *AutoNATService) handleStream(s network.Stream) { // Optimistically extract the net.IP host from a multiaddress. func addrToIP(addr ma.Multiaddr) net.IP { - if v4, err := addr.ValueForProtocol(ma.P_IP4); err == nil { - if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP4).Name, v4); err == nil { - return net.IP(c.RawValue()) - } + n, ip, err := manet.DialArgs(addr) + if err != nil { + return nil } - if v6, err := addr.ValueForProtocol(ma.P_IP6); err == nil { - if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP6).Name, v6); err == nil { - return net.IP(c.RawValue()) + + // if no port: + if n == "ip" || n == "ip4" || n == "ip6" { + // Strip v6 zone if it's there. + if strings.Contains(ip, "%") { + ip = ip[:strings.Index(ip, "%")] } + return net.ParseIP(ip) } - return nil + + ip, _, err = net.SplitHostPort(ip) + if err != nil { + fmt.Printf("failed to split: %v", err) + return nil + } + // Strip v6 zone if it's there. + if strings.Contains(ip, "%") { + ip = ip[:strings.Index(ip, "%")] + } + return net.ParseIP(ip) } func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { @@ -153,18 +168,10 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me continue } - if len(addrs) >= AutoNATMaxPeerAddresses { - continue - } - if as.skipDial(addr) { continue } - if err != nil { - log.Debugf("Unexpected public, non-IP multiaddr: %s", err) - continue - } if !bytes.Equal(obsHost, addrToIP(addr)) { continue } @@ -177,6 +184,10 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me addrs = append(addrs, addr) seen[str] = struct{}{} + + if len(addrs) >= AutoNATMaxPeerAddresses { + break + } } if len(addrs) == 0 { From 426d729c513205073e4fdc71c6d2dd5f87fa65b2 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 25 Feb 2020 10:24:47 -0800 Subject: [PATCH 03/13] cleanup addrToIP logic --- p2p/host/autonat/svc.go | 28 +++++++++++----------------- p2p/host/autonat/svc_test.go | 10 +++++----- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 141f4ee776..1073e19022 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -3,7 +3,7 @@ package autonat import ( "bytes" "context" - "fmt" + "errors" "math/rand" "net" "strings" @@ -106,31 +106,25 @@ func (as *AutoNATService) handleStream(s network.Stream) { } // Optimistically extract the net.IP host from a multiaddress. -func addrToIP(addr ma.Multiaddr) net.IP { +func addrToIP(addr ma.Multiaddr) (net.IP, error) { n, ip, err := manet.DialArgs(addr) if err != nil { - return nil + return nil, err } - // if no port: - if n == "ip" || n == "ip4" || n == "ip6" { - // Strip v6 zone if it's there. - if strings.Contains(ip, "%") { - ip = ip[:strings.Index(ip, "%")] - } - return net.ParseIP(ip) + if strings.HasPrefix(n, "tcp") || strings.HasPrefix(n, "udp") { + ip, _, err = net.SplitHostPort(ip) + } else if !strings.HasPrefix(n, "ip") { + return nil, errors.New("non-ip multiaddr") } - - ip, _, err = net.SplitHostPort(ip) if err != nil { - fmt.Printf("failed to split: %v", err) - return nil + return nil, err } // Strip v6 zone if it's there. if strings.Contains(ip, "%") { ip = ip[:strings.Index(ip, "%")] } - return net.ParseIP(ip) + return net.ParseIP(ip), nil } func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { @@ -158,7 +152,7 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me if !as.skipDial(obsaddr) { addrs = append(addrs, obsaddr) seen[obsaddr.String()] = struct{}{} - obsHost = addrToIP(obsaddr) + obsHost, _ = addrToIP(obsaddr) } for _, maddr := range mpi.GetAddrs() { @@ -172,7 +166,7 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me continue } - if !bytes.Equal(obsHost, addrToIP(addr)) { + if ip, err := addrToIP(addr); err != nil || !bytes.Equal(obsHost, ip) { continue } diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index f2aa91a6b3..9be52fbf61 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -139,26 +139,26 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { func TestAddrToIP(t *testing.T) { addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - if !addrToIP(addr).Equal(net.IPv4(127, 0, 0, 1)) { + if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(127, 0, 0, 1)) { t.Fatal("addrToIP of ipv4 localhost incorrect!") } addr, _ = ma.NewMultiaddr("/ip4/192.168.0.1/tcp/6") - if !addrToIP(addr).Equal(net.IPv4(192, 168, 0, 1)) { + if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(192, 168, 0, 1)) { t.Fatal("addrToIP of ipv4 incorrect!") } addr, _ = ma.NewMultiaddr("/ip6/::ffff:127.0.0.1/tcp/111") - if !addrToIP(addr).Equal(net.ParseIP("::ffff:127.0.0.1")) { + if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.ParseIP("::ffff:127.0.0.1")) { t.Fatal("addrToIP of ipv6 incorrect!") } addr, _ = ma.NewMultiaddr("/ip6zone/eth0/ip6/fe80::1") - if !addrToIP(addr).Equal(net.ParseIP("fe80::1")) { + if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.ParseIP("fe80::1")) { t.Fatal("addrToIP of ip6zone incorrect!") } addr, _ = ma.NewMultiaddr("/unix/a/b/c/d") - if addrToIP(addr) != nil { + if _, err := addrToIP(addr); err == nil { t.Fatal("invalid addrToIP populates") } } From 9dcbc44a26bc79e0d6f78dd8a150946d055d5db3 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 25 Feb 2020 12:24:33 -0800 Subject: [PATCH 04/13] limit autonat-svc to LocalRoutabilityPublic nodes per #43 --- p2p/host/autonat/svc.go | 17 +++++++++++++++-- p2p/host/autonat/svc_test.go | 3 +++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index a685f3d36d..c1f9e53ccd 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -6,6 +6,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -51,9 +52,21 @@ func NewAutoNATService(ctx context.Context, h host.Host, opts ...libp2p.Option) dialer: dialer, reqs: make(map[peer.ID]int), } - h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) - go as.resetRateLimiter() + s, err := h.EventBus().Subscribe(&event.EvtLocalRoutabilityPublic{}) + if err != nil { + return nil, err + } + + go func() { + defer s.Close() + select { + case <-s.Out(): + h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) + go as.resetRateLimiter() + case <-ctx.Done(): + } + }() return as, nil } diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index 9ef9ec0197..eec48f07c2 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -24,6 +25,8 @@ func makeAutoNATService(ctx context.Context, t *testing.T) (host.Host, *AutoNATS if err != nil { t.Fatal(err) } + emitPublic, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic)) + emitPublic.Emit(event.EvtLocalRoutabilityPublic{}) return h, as } From 44e9abed05726d212064bd4b61d62b7e6f2aa17b Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 26 Feb 2020 12:27:44 -0800 Subject: [PATCH 05/13] address review comments use ip.Equal for comparison add test on jitter simplify addrToIP --- p2p/host/autonat/svc.go | 30 +++++++++++++----------------- p2p/host/autonat/svc_test.go | 24 ++++++++++++++++++++---- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 1073e19022..1bf37683c7 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -1,12 +1,10 @@ package autonat import ( - "bytes" "context" - "errors" + "fmt" "math/rand" "net" - "strings" "sync" "time" @@ -106,25 +104,23 @@ func (as *AutoNATService) handleStream(s network.Stream) { } // Optimistically extract the net.IP host from a multiaddress. +// TODO: use upstream manet.ToIP func addrToIP(addr ma.Multiaddr) (net.IP, error) { - n, ip, err := manet.DialArgs(addr) + n, err := manet.ToNetAddr(addr) if err != nil { return nil, err } - if strings.HasPrefix(n, "tcp") || strings.HasPrefix(n, "udp") { - ip, _, err = net.SplitHostPort(ip) - } else if !strings.HasPrefix(n, "ip") { - return nil, errors.New("non-ip multiaddr") + switch netAddr := n.(type) { + case *net.UDPAddr: + return netAddr.IP, nil + case *net.TCPAddr: + return netAddr.IP, nil + case *net.IPAddr: + return netAddr.IP, nil + default: + return nil, fmt.Errorf("non IP Multiaddr: %T", netAddr) } - if err != nil { - return nil, err - } - // Strip v6 zone if it's there. - if strings.Contains(ip, "%") { - ip = ip[:strings.Index(ip, "%")] - } - return net.ParseIP(ip), nil } func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { @@ -166,7 +162,7 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me continue } - if ip, err := addrToIP(addr); err != nil || !bytes.Equal(obsHost, ip) { + if ip, err := addrToIP(addr); err != nil || !obsHost.Equal(ip) { continue } diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index 9be52fbf61..b93e6e28ac 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -137,6 +137,26 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { AutoNATServiceResetJitter = save5 } +func TestAutoNATServiceRateLimitJitter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + save1 := AutoNATServiceResetInterval + AutoNATServiceResetInterval = 100 * time.Millisecond + save2 := AutoNATServiceResetJitter + AutoNATServiceResetJitter = 100 * time.Millisecond + + _, svc := makeAutoNATService(ctx, t) + svc.globalReqs = 1 + time.Sleep(200 * time.Millisecond) + if svc.globalReqs != 0 { + t.Fatal("reset of rate limitter occured slower than expected") + } + + AutoNATServiceResetInterval = save1 + AutoNATServiceResetJitter = save2 +} + func TestAddrToIP(t *testing.T) { addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(127, 0, 0, 1)) { @@ -148,10 +168,6 @@ func TestAddrToIP(t *testing.T) { t.Fatal("addrToIP of ipv4 incorrect!") } - addr, _ = ma.NewMultiaddr("/ip6/::ffff:127.0.0.1/tcp/111") - if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.ParseIP("::ffff:127.0.0.1")) { - t.Fatal("addrToIP of ipv6 incorrect!") - } addr, _ = ma.NewMultiaddr("/ip6zone/eth0/ip6/fe80::1") if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.ParseIP("fe80::1")) { t.Fatal("addrToIP of ip6zone incorrect!") From 370d2efb54c338ce39c211fcdf1ec1d22a2d8391 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 26 Feb 2020 16:39:17 -0800 Subject: [PATCH 06/13] Add option for forcing service startup --- p2p/host/autonat/svc.go | 15 +++++++++------ p2p/host/autonat/svc_test.go | 5 +---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index c1f9e53ccd..8c1191e3c3 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -40,7 +40,7 @@ type AutoNATService struct { } // NewAutoNATService creates a new AutoNATService instance attached to a host -func NewAutoNATService(ctx context.Context, h host.Host, opts ...libp2p.Option) (*AutoNATService, error) { +func NewAutoNATService(ctx context.Context, h host.Host, forceEnabled bool, opts ...libp2p.Option) (*AutoNATService, error) { opts = append(opts, libp2p.NoListenAddrs) dialer, err := libp2p.New(ctx, opts...) if err != nil { @@ -60,12 +60,15 @@ func NewAutoNATService(ctx context.Context, h host.Host, opts ...libp2p.Option) go func() { defer s.Close() - select { - case <-s.Out(): - h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) - go as.resetRateLimiter() - case <-ctx.Done(): + if !forceEnabled { + select { + case <-ctx.Done(): + return + case <-s.Out(): + } } + h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) + go as.resetRateLimiter() }() return as, nil diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index eec48f07c2..b666e7ef03 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -21,12 +20,10 @@ func makeAutoNATService(ctx context.Context, t *testing.T) (host.Host, *AutoNATS t.Fatal(err) } - as, err := NewAutoNATService(ctx, h) + as, err := NewAutoNATService(ctx, h, true) if err != nil { t.Fatal(err) } - emitPublic, _ := h.EventBus().Emitter(new(event.EvtLocalRoutabilityPublic)) - emitPublic.Emit(event.EvtLocalRoutabilityPublic{}) return h, as } From 3a21c6dea31ead2c24df6a21e9867e8cc387d06e Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 26 Feb 2020 17:29:10 -0800 Subject: [PATCH 07/13] skip locally held addresses fix #44 --- p2p/host/autonat/svc.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 8c1191e3c3..6835e4d486 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -32,6 +32,7 @@ var ( // AutoNATService provides NAT autodetection services to other peers type AutoNATService struct { ctx context.Context + h host.Host dialer host.Host // rate limiter @@ -49,6 +50,7 @@ func NewAutoNATService(ctx context.Context, h host.Host, forceEnabled bool, opts as := &AutoNATService{ ctx: ctx, + h: h, dialer: dialer, reqs: make(map[peer.ID]int), } @@ -178,6 +180,13 @@ func (as *AutoNATService) skipDial(addr ma.Multiaddr) bool { return true } + // Skip dialing addresses we believe are the local node's + for _, localAddr := range as.h.Addrs() { + if localAddr.Equal(addr) { + return true + } + } + return false } From 3069cfec014b6d761b4a7255bd4e5df9465ca06f Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 3 Mar 2020 09:56:36 -0800 Subject: [PATCH 08/13] unadvertise autonat service when private. remove global request limit when forced on. --- p2p/host/autonat/svc.go | 62 ++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 669c0eff11..3beda641e5 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -42,9 +42,10 @@ type AutoNATService struct { dialer host.Host // rate limiter - mx sync.Mutex - reqs map[peer.ID]int - globalReqs int + mx sync.Mutex + reqs map[peer.ID]int + globalReqMax int + globalReqs int } // NewAutoNATService creates a new AutoNATService instance attached to a host @@ -56,29 +57,20 @@ func NewAutoNATService(ctx context.Context, h host.Host, forceEnabled bool, opts } as := &AutoNATService{ - ctx: ctx, - h: h, - dialer: dialer, - reqs: make(map[peer.ID]int), + ctx: ctx, + h: h, + dialer: dialer, + globalReqMax: AutoNATGlobalThrottle, + reqs: make(map[peer.ID]int), } - s, err := h.EventBus().Subscribe(&event.EvtLocalRoutabilityPublic{}) - if err != nil { - return nil, err - } - - go func() { - defer s.Close() - if !forceEnabled { - select { - case <-ctx.Done(): - return - case <-s.Out(): - } - } + if forceEnabled { + as.globalReqMax = 0 h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) go as.resetRateLimiter() - }() + } else { + go as.enableWhenPublic() + } return as, nil } @@ -231,7 +223,7 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { // rate limit check as.mx.Lock() count := as.reqs[pi.ID] - if count >= AutoNATServiceThrottle || as.globalReqs >= AutoNATGlobalThrottle { + if count >= AutoNATServiceThrottle || (as.globalReqMax > 0 && as.globalReqs >= as.globalReqMax) { as.mx.Unlock() return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials") } @@ -264,6 +256,30 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { return newDialResponseOK(ra) } +func (as *AutoNATService) enableWhenPublic() { + pubSub, _ := as.h.EventBus().Subscribe(&event.EvtLocalRoutabilityPublic{}) + priSub, _ := as.h.EventBus().Subscribe(&event.EvtLocalRoutabilityPrivate{}) + defer pubSub.Close() + defer priSub.Close() + + running := false + + for { + select { + case <-pubSub.Out(): + as.h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) + if !running { + go as.resetRateLimiter() + running = true + } + case <-priSub.Out(): + as.h.RemoveStreamHandler(autonat.AutoNATProto) + case <-as.ctx.Done(): + return + } + } +} + func (as *AutoNATService) resetRateLimiter() { timer := time.NewTimer(AutoNATServiceResetInterval) defer timer.Stop() From b7f53da72e340c9ba4743ffab345e7e46edfde8e Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 3 Mar 2020 18:20:55 -0800 Subject: [PATCH 09/13] force net connection in autonat svc --- p2p/host/autonat/svc.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 3beda641e5..46fd301d08 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" pb "github.com/libp2p/go-libp2p-autonat/pb" @@ -236,7 +237,8 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { as.dialer.Peerstore().ClearAddrs(pi.ID) - err := as.dialer.Connect(ctx, pi) + as.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) + conn, err := as.dialer.Network().DialPeer(ctx, pi.ID) if err != nil { log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error()) // wait for the context to timeout to avoid leaking timing information @@ -245,13 +247,7 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { return newDialResponseError(pb.Message_E_DIAL_ERROR, "dial failed") } - conns := as.dialer.Network().ConnsToPeer(pi.ID) - if len(conns) == 0 { - log.Errorf("supposedly connected to %s, but no connection to peer", pi.ID.Pretty()) - return newDialResponseError(pb.Message_E_INTERNAL_ERROR, "internal service error") - } - - ra := conns[0].RemoteMultiaddr() + ra := conn.RemoteMultiaddr() as.dialer.Network().ClosePeer(pi.ID) return newDialResponseOK(ra) } From edaee9d512bc8cd66a32822b36b3768f725cdbf4 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 5 Mar 2020 00:15:56 -0800 Subject: [PATCH 10/13] update to single event type --- p2p/host/autonat/svc.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 46fd301d08..2475f03cb9 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -253,23 +253,27 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { } func (as *AutoNATService) enableWhenPublic() { - pubSub, _ := as.h.EventBus().Subscribe(&event.EvtLocalRoutabilityPublic{}) - priSub, _ := as.h.EventBus().Subscribe(&event.EvtLocalRoutabilityPrivate{}) - defer pubSub.Close() - defer priSub.Close() + sub, _ := as.h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + defer sub.Close() running := false for { select { - case <-pubSub.Out(): - as.h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) - if !running { - go as.resetRateLimiter() - running = true + case ev, ok := <-sub.Out(): + if !ok { + return + } + state := ev.(event.EvtLocalReachabilityChanged).Reachability + if state == network.ReachabilityPublic { + as.h.SetStreamHandler(autonat.AutoNATProto, as.handleStream) + if !running { + go as.resetRateLimiter() + running = true + } + } else { + as.h.RemoveStreamHandler(autonat.AutoNATProto) } - case <-priSub.Out(): - as.h.RemoveStreamHandler(autonat.AutoNATProto) case <-as.ctx.Done(): return } From 41ac79c8352801dace673dc8479fdd5bee39ebd0 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Fri, 6 Mar 2020 11:27:34 -0800 Subject: [PATCH 11/13] mitigate race in test --- p2p/host/autonat/svc_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index 979854d430..a36340913c 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -139,7 +139,6 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { func TestAutoNATServiceRateLimitJitter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() save1 := AutoNATServiceResetInterval AutoNATServiceResetInterval = 100 * time.Millisecond @@ -147,12 +146,19 @@ func TestAutoNATServiceRateLimitJitter(t *testing.T) { AutoNATServiceResetJitter = 100 * time.Millisecond _, svc := makeAutoNATService(ctx, t) + svc.mx.Lock() svc.globalReqs = 1 + svc.mx.Unlock() time.Sleep(200 * time.Millisecond) + + svc.mx.Lock() + defer svc.mx.Unlock() if svc.globalReqs != 0 { t.Fatal("reset of rate limitter occured slower than expected") } + cancel() + AutoNATServiceResetInterval = save1 AutoNATServiceResetJitter = save2 } From 43e6a96bdcd1b09ed6adf488a368c0f76f20367d Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 9 Mar 2020 16:25:55 -0700 Subject: [PATCH 12/13] additional tests --- p2p/host/autonat/svc.go | 21 ++++++-- p2p/host/autonat/svc_test.go | 93 ++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 5 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 2475f03cb9..3f857b3058 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -27,12 +27,23 @@ import ( const P_CIRCUIT = 290 var ( - AutoNATServiceDialTimeout = 15 * time.Second + // AutoNATServiceDialTimeout defines how long to wait for connection + // attempts before failing. + AutoNATServiceDialTimeout = 15 * time.Second + // AutoNATServiceResetInterval defines how often to reset throttling. AutoNATServiceResetInterval = 1 * time.Minute - AutoNATServiceResetJitter = 15 * time.Second - - AutoNATServiceThrottle = 3 - AutoNATGlobalThrottle = 30 + // AutoNATServiceResetJitter defines the amplitude of randomness in throttle + // reset timing. + AutoNATServiceResetJitter = 15 * time.Second + + // AutoNATServiceThrottle defines how many times each ResetInterval a peer + // can ask for its autonat address. + AutoNATServiceThrottle = 3 + // AutoNATGlobalThrottle defines how many total autonat requests this + // service will answer each ResetInterval. + AutoNATGlobalThrottle = 30 + // AutoNATMaxPeerAddresses defines maximum number of addreses the autonat + // service will consider when attempting to connect to the peer. AutoNATMaxPeerAddresses = 16 ) diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index a36340913c..9a19a7350c 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -7,7 +7,9 @@ import ( "time" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" autonat "github.com/libp2p/go-libp2p-autonat" @@ -137,6 +139,55 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) { AutoNATServiceResetJitter = save5 } +func TestAutoNATServiceGlobalLimiter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + save1 := AutoNATServiceDialTimeout + AutoNATServiceDialTimeout = 1 * time.Second + save2 := AutoNATServiceResetInterval + AutoNATServiceResetInterval = 10 * time.Second + save3 := AutoNATServiceThrottle + AutoNATServiceThrottle = 1 + save4 := manet.Private4 + manet.Private4 = []*net.IPNet{} + save5 := AutoNATServiceResetJitter + AutoNATServiceResetJitter = 0 * time.Second + save6 := AutoNATGlobalThrottle + AutoNATGlobalThrottle = 5 + + hs, as := makeAutoNATService(ctx, t) + as.globalReqMax = 5 + + for i := 0; i < 5; i++ { + hc, ac := makeAutoNATClient(ctx, t) + connect(t, hs, hc) + + _, err := ac.DialBack(ctx, hs.ID()) + if err != nil { + t.Fatal(err) + } + } + + hc, ac := makeAutoNATClient(ctx, t) + connect(t, hs, hc) + _, err := ac.DialBack(ctx, hs.ID()) + if err == nil { + t.Fatal("Dial back succeeded unexpectedly!") + } + + if !autonat.IsDialRefused(err) { + t.Fatal(err) + } + + AutoNATServiceDialTimeout = save1 + AutoNATServiceResetInterval = save2 + AutoNATServiceThrottle = save3 + manet.Private4 = save4 + AutoNATServiceResetJitter = save5 + AutoNATGlobalThrottle = save6 +} + func TestAutoNATServiceRateLimitJitter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -163,6 +214,48 @@ func TestAutoNATServiceRateLimitJitter(t *testing.T) { AutoNATServiceResetJitter = save2 } +func TestAutoNATServiceStartup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + save := manet.Private4 + manet.Private4 = []*net.IPNet{} + + h, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + if err != nil { + t.Fatal(err) + } + + _, err = NewAutoNATService(ctx, h, false) + if err != nil { + t.Fatal(err) + } + + eb, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged)) + + hc, ac := makeAutoNATClient(ctx, t) + connect(t, h, hc) + + _, err = ac.DialBack(ctx, h.ID()) + if err == nil { + t.Fatal("autonat should not be started / advertising.") + } + + eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic}) + _, err = ac.DialBack(ctx, h.ID()) + if err != nil { + t.Fatalf("autonat should be active, was %v", err) + } + + eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate}) + _, err = ac.DialBack(ctx, h.ID()) + if err == nil { + t.Fatal("autonat should not be started / advertising.") + } + + manet.Private4 = save +} + func TestAddrToIP(t *testing.T) { addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(127, 0, 0, 1)) { From 58172272ae26801a425a3045f249b596714d2300 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 10 Mar 2020 11:47:56 -0700 Subject: [PATCH 13/13] bump to new go-multiaddr-net --- p2p/host/autonat/svc.go | 25 ++----------------------- p2p/host/autonat/svc_test.go | 23 ----------------------- 2 files changed, 2 insertions(+), 46 deletions(-) diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go index 3f857b3058..c492b1bd8d 100644 --- a/p2p/host/autonat/svc.go +++ b/p2p/host/autonat/svc.go @@ -2,7 +2,6 @@ package autonat import ( "context" - "fmt" "math/rand" "net" "sync" @@ -125,26 +124,6 @@ func (as *AutoNATService) handleStream(s network.Stream) { } } -// Optimistically extract the net.IP host from a multiaddress. -// TODO: use upstream manet.ToIP -func addrToIP(addr ma.Multiaddr) (net.IP, error) { - n, err := manet.ToNetAddr(addr) - if err != nil { - return nil, err - } - - switch netAddr := n.(type) { - case *net.UDPAddr: - return netAddr.IP, nil - case *net.TCPAddr: - return netAddr.IP, nil - case *net.IPAddr: - return netAddr.IP, nil - default: - return nil, fmt.Errorf("non IP Multiaddr: %T", netAddr) - } -} - func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { if mpi == nil { return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info") @@ -170,7 +149,7 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me if !as.skipDial(obsaddr) { addrs = append(addrs, obsaddr) seen[obsaddr.String()] = struct{}{} - obsHost, _ = addrToIP(obsaddr) + obsHost, _ = manet.ToIP(obsaddr) } for _, maddr := range mpi.GetAddrs() { @@ -184,7 +163,7 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me continue } - if ip, err := addrToIP(addr); err != nil || !obsHost.Equal(ip) { + if ip, err := manet.ToIP(addr); err != nil || !obsHost.Equal(ip) { continue } diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go index 9a19a7350c..5e845c737f 100644 --- a/p2p/host/autonat/svc_test.go +++ b/p2p/host/autonat/svc_test.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" autonat "github.com/libp2p/go-libp2p-autonat" - ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" ) @@ -255,25 +254,3 @@ func TestAutoNATServiceStartup(t *testing.T) { manet.Private4 = save } - -func TestAddrToIP(t *testing.T) { - addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0") - if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(127, 0, 0, 1)) { - t.Fatal("addrToIP of ipv4 localhost incorrect!") - } - - addr, _ = ma.NewMultiaddr("/ip4/192.168.0.1/tcp/6") - if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.IPv4(192, 168, 0, 1)) { - t.Fatal("addrToIP of ipv4 incorrect!") - } - - addr, _ = ma.NewMultiaddr("/ip6zone/eth0/ip6/fe80::1") - if ip, err := addrToIP(addr); err != nil || !ip.Equal(net.ParseIP("fe80::1")) { - t.Fatal("addrToIP of ip6zone incorrect!") - } - - addr, _ = ma.NewMultiaddr("/unix/a/b/c/d") - if _, err := addrToIP(addr); err == nil { - t.Fatal("invalid addrToIP populates") - } -}