Skip to content

Commit

Permalink
Merge pull request #1459 from libp2p/merge-reuseport
Browse files Browse the repository at this point in the history
move go-reuseport-transport here
  • Loading branch information
marten-seemann authored May 20, 2022
2 parents 39fd9a0 + 492ee92 commit b121193
Show file tree
Hide file tree
Showing 12 changed files with 784 additions and 4 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
github.com/libp2p/go-reuseport v0.1.0
github.com/libp2p/go-reuseport-transport v0.1.0
github.com/libp2p/go-stream-muxer-multistream v0.4.0
github.com/libp2p/go-yamux/v3 v3.1.1
github.com/libp2p/zeroconf/v2 v2.1.1
Expand Down
113 changes: 113 additions & 0 deletions p2p/net/reuseport/dial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package reuseport

import (
"context"
"net"

"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

type dialer interface {
Dial(network, addr string) (net.Conn, error)
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}

// Dial dials the given multiaddr, reusing ports we're currently listening on if
// possible.
//
// Dial attempts to be smart about choosing the source port. For example, If
// we're dialing a loopback address and we're listening on one or more loopback
// ports, Dial will randomly choose one of the loopback ports and addresses and
// reuse it.
func (t *Transport) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
return t.DialContext(context.Background(), raddr)
}

// DialContext is like Dial but takes a context.
func (t *Transport) DialContext(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
network, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
var d dialer
switch network {
case "tcp4":
d = t.v4.getDialer(network)
case "tcp6":
d = t.v6.getDialer(network)
default:
return nil, ErrWrongProto
}
conn, err := d.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
maconn, err := manet.WrapNetConn(conn)
if err != nil {
conn.Close()
return nil, err
}
return maconn, nil
}

func (n *network) getDialer(network string) dialer {
n.mu.RLock()
d := n.dialer
n.mu.RUnlock()
if d == nil {
n.mu.Lock()
defer n.mu.Unlock()

if n.dialer == nil {
n.dialer = n.makeDialer(network)
}
d = n.dialer
}
return d
}

func (n *network) makeDialer(network string) dialer {
if !reuseport.Available() {
log.Debug("reuseport not available")
return &net.Dialer{}
}

var unspec net.IP
switch network {
case "tcp4":
unspec = net.IPv4zero
case "tcp6":
unspec = net.IPv6unspecified
default:
panic("invalid network: must be either tcp4 or tcp6")
}

// How many ports are we listening on.
var port = 0
for l := range n.listeners {
newPort := l.Addr().(*net.TCPAddr).Port
switch {
case newPort == 0: // Any port, ignore (really, we shouldn't get this case...).
case port == 0: // Haven't selected a port yet, choose this one.
port = newPort
case newPort == port: // Same as the selected port, continue...
default: // Multiple ports, use the multi dialer
return newMultiDialer(unspec, n.listeners)
}
}

// None.
if port == 0 {
return &net.Dialer{}
}

// One. Always dial from the single port we're listening on.
laddr := &net.TCPAddr{
IP: unspec,
Port: port,
}

return (*singleDialer)(laddr)
}
80 changes: 80 additions & 0 deletions p2p/net/reuseport/listen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package reuseport

import (
"net"

"github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

type listener struct {
manet.Listener
network *network
}

func (l *listener) Close() error {
l.network.mu.Lock()
delete(l.network.listeners, l)
l.network.dialer = nil
l.network.mu.Unlock()
return l.Listener.Close()
}

// Listen listens on the given multiaddr.
//
// If reuseport is supported, it will be enabled for this listener and future
// dials from this transport may reuse the port.
//
// Note: You can listen on the same multiaddr as many times as you want
// (although only *one* listener will end up handling the inbound connection).
func (t *Transport) Listen(laddr ma.Multiaddr) (manet.Listener, error) {
nw, naddr, err := manet.DialArgs(laddr)
if err != nil {
return nil, err
}
var n *network
switch nw {
case "tcp4":
n = &t.v4
case "tcp6":
n = &t.v6
default:
return nil, ErrWrongProto
}

if !reuseport.Available() {
return manet.Listen(laddr)
}
nl, err := reuseport.Listen(nw, naddr)
if err != nil {
return manet.Listen(laddr)
}

if _, ok := nl.Addr().(*net.TCPAddr); !ok {
nl.Close()
return nil, ErrWrongProto
}

malist, err := manet.WrapNetListener(nl)
if err != nil {
nl.Close()
return nil, err
}

list := &listener{
Listener: malist,
network: n,
}

n.mu.Lock()
defer n.mu.Unlock()

if n.listeners == nil {
n.listeners = make(map[*listener]struct{})
}
n.listeners[list] = struct{}{}
n.dialer = nil

return list, nil
}
90 changes: 90 additions & 0 deletions p2p/net/reuseport/multidialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package reuseport

import (
"context"
"fmt"
"math/rand"
"net"

"github.com/libp2p/go-netroute"
)

type multiDialer struct {
listeningAddresses []*net.TCPAddr
loopback []*net.TCPAddr
unspecified []*net.TCPAddr
fallback net.TCPAddr
}

func (d *multiDialer) Dial(network, addr string) (net.Conn, error) {
return d.DialContext(context.Background(), network, addr)
}

func randAddr(addrs []*net.TCPAddr) *net.TCPAddr {
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))]
}
return nil
}

// DialContext dials a target addr.
// Dialing preference is
// * If there is a listener on the local interface the OS expects to use to route towards addr, use that.
// * If there is a listener on a loopback address, addr is loopback, use that.
// * If there is a listener on an undefined address (0.0.0.0 or ::), use that.
// * Use the fallback IP specified during construction, with a port that's already being listened on, if one exists.
func (d *multiDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
tcpAddr, err := net.ResolveTCPAddr(network, addr)
if err != nil {
return nil, err
}
ip := tcpAddr.IP
if !ip.IsLoopback() && !ip.IsGlobalUnicast() {
return nil, fmt.Errorf("undialable IP: %s", ip)
}

if router, err := netroute.New(); err == nil {
if _, _, preferredSrc, err := router.Route(ip); err == nil {
for _, optAddr := range d.listeningAddresses {
if optAddr.IP.Equal(preferredSrc) {
return reuseDial(ctx, optAddr, network, addr)
}
}
}
}

if ip.IsLoopback() && len(d.loopback) > 0 {
return reuseDial(ctx, randAddr(d.loopback), network, addr)
}
if len(d.unspecified) == 0 {
return reuseDial(ctx, &d.fallback, network, addr)
}

return reuseDial(ctx, randAddr(d.unspecified), network, addr)
}

func newMultiDialer(unspec net.IP, listeners map[*listener]struct{}) (m dialer) {
addrs := make([]*net.TCPAddr, 0)
loopback := make([]*net.TCPAddr, 0)
unspecified := make([]*net.TCPAddr, 0)
existingPort := 0

for l := range listeners {
addr := l.Addr().(*net.TCPAddr)
addrs = append(addrs, addr)
if addr.IP.IsLoopback() {
loopback = append(loopback, addr)
} else if addr.IP.IsGlobalUnicast() && existingPort == 0 {
existingPort = addr.Port
} else if addr.IP.IsUnspecified() {
unspecified = append(unspecified, addr)
}
}
m = &multiDialer{
listeningAddresses: addrs,
loopback: loopback,
unspecified: unspecified,
fallback: net.TCPAddr{IP: unspec, Port: existingPort},
}
return
}
35 changes: 35 additions & 0 deletions p2p/net/reuseport/reuseport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package reuseport

import (
"context"
"net"

"github.com/libp2p/go-reuseport"
)

var fallbackDialer net.Dialer

// Dials using reuseport and then redials normally if that fails.
func reuseDial(ctx context.Context, laddr *net.TCPAddr, network, raddr string) (con net.Conn, err error) {
if laddr == nil {
return fallbackDialer.DialContext(ctx, network, raddr)
}

d := net.Dialer{
LocalAddr: laddr,
Control: reuseport.Control,
}

con, err = d.DialContext(ctx, network, raddr)
if err == nil {
return con, nil
}

if reuseErrShouldRetry(err) && ctx.Err() == nil {
// We could have an existing socket open or we could have one
// stuck in TIME-WAIT.
log.Debugf("failed to reuse port, will try again with a random port: %s", err)
con, err = fallbackDialer.DialContext(ctx, network, raddr)
}
return con, err
}
44 changes: 44 additions & 0 deletions p2p/net/reuseport/reuseport_plan9.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package reuseport

import (
"net"
"os"
)

const (
EADDRINUSE = "address in use"
ECONNREFUSED = "connection refused"
)

// reuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func reuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}

// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}

e, ok := err.(*net.OpError)
if !ok {
return true
}

e1, ok := e.Err.(*os.PathError)
if !ok {
return true
}

switch e1.Err.Error() {
case EADDRINUSE:
return true
case ECONNREFUSED:
return false
default:
return true // optimistically default to retry.
}
}
Loading

0 comments on commit b121193

Please sign in to comment.