Skip to content

Commit

Permalink
feat(tcpreuse): add options for sharing TCP listeners amongst TCP, WS…
Browse files Browse the repository at this point in the history
… and WSS transports (#2984)

Allows the same socket to be shared amongst TCP,WS,WSS transports.

---------

Co-authored-by: sukun <[email protected]>
Co-authored-by: Marco Munizaga <[email protected]>
  • Loading branch information
3 people authored Nov 4, 2024
1 parent 362e583 commit 5a47a90
Show file tree
Hide file tree
Showing 32 changed files with 1,597 additions and 106 deletions.
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -145,6 +146,8 @@ type Config struct {
CustomIPv6BlackHoleSuccessCounter bool

UserFxOptions []fx.Option

ShareTCPListener bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -289,6 +292,12 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func(gater connmgr.ConnectionGater, rcmgr network.ResourceManager) *tcpreuse.ConnMgr {
if !cfg.ShareTCPListener {
return nil
}
return tcpreuse.NewConnMgr(tcpreuse.EnvReuseportVal, gater, rcmgr)
}),
fx.Provide(func(cm *quicreuse.ConnManager, sw *swarm.Swarm) libp2pwebrtc.ListenUDPFn {
hasQuicAddrPortFor := func(network string, laddr *net.UDPAddr) bool {
quicAddrPorts := map[string]struct{}{}
Expand Down
26 changes: 25 additions & 1 deletion libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTransportConstructor(t *testing.T) {
_ connmgr.ConnectionGater,
upgrader transport.Upgrader,
) transport.Transport {
tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
require.NoError(t, err)
return tpt
}
Expand Down Expand Up @@ -751,3 +751,27 @@ func getTLSConf(t *testing.T, ip net.IP, start, end time.Time) *tls.Config {
}},
}
}

func TestSharedTCPAddr(t *testing.T) {
h, err := New(
ShareTCPListener(),
Transport(tcp.NewTCPTransport),
Transport(websocket.New),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888"),
ListenAddrStrings("/ip4/0.0.0.0/tcp/8888/ws"),
)
require.NoError(t, err)
sawTCP := false
sawWS := false
for _, addr := range h.Addrs() {
if strings.HasSuffix(addr.String(), "/tcp/8888") {
sawTCP = true
}
if strings.HasSuffix(addr.String(), "/tcp/8888/ws") {
sawWS = true
}
}
require.True(t, sawTCP)
require.True(t, sawWS)
h.Close()
}
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,15 @@ func WithFxOption(opts ...fx.Option) Option {
return nil
}
}

// ShareTCPListener shares the same listen address between TCP and Websocket
// transports. This lets both transports use the same TCP port.
//
// Currently this behavior is Opt-in. In a future release this will be the
// default, and this option will be removed.
func ShareTCPListener() Option {
return func(cfg *Config) error {
cfg.ShareTCPListener = true
return nil
}
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func makeSwarmWithNoListenAddrs(t *testing.T, opts ...Option) *Swarm {
upgrader := makeUpgrader(t, s)
var tcpOpts []tcp.Option
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm_addr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestDialAddressSelection(t *testing.T) {
s, err := swarm.NewSwarm("local", nil, eventbus.NewBus())
require.NoError(t, err)

tcpTr, err := tcp.NewTCPTransport(nil, nil)
tcpTr, err := tcp.NewTCPTransport(nil, nil, nil)
require.NoError(t, err)
require.NoError(t, s.AddTransport(tcpTr))
reuse, err := quicreuse.NewConnManager(quic.StatelessResetKey{}, quic.TokenGeneratorKey{})
Expand Down
8 changes: 4 additions & 4 deletions p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestAddrsForDial(t *testing.T) {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

tpt, err := websocket.New(nil, &network.NullResourceManager{})
tpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(ResolverFromMaDNS{resolver}))
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestDedupAddrsForDial(t *testing.T) {
require.NoError(t, err)
defer s.Close()

tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
})

// Add a tcp transport so that we know we can dial a tcp multiaddr and we don't filter it out.
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand All @@ -151,7 +151,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
err = s.AddTransport(wtTpt)
require.NoError(t, err)

wsTpt, err := websocket.New(nil, &network.NullResourceManager{})
wsTpt, err := websocket.New(nil, &network.NullResourceManager{}, nil)
require.NoError(t, err)
err = s.AddTransport(wsTpt)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func GenSwarm(t testing.TB, opts ...Option) *swarm.Swarm {
if cfg.disableReuseport {
tcpOpts = append(tcpOpts, tcp.DisableReuseport())
}
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, tcpOpts...)
tcpTransport, err := tcp.NewTCPTransport(upgrader, nil, nil, tcpOpts...)
require.NoError(t, err)
if err := s.AddTransport(tcpTransport); err != nil {
t.Fatal(err)
Expand Down
38 changes: 24 additions & 14 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,33 @@ func (l *listener) handleIncoming() {
}
catcher.Reset()

// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
// Check if we already have a connection scope. See the comment in tcpreuse/listener.go for an explanation.
var connScope network.ConnManagementScope
if sc, ok := maconn.(interface {
Scope() network.ConnManagementScope
}); ok {
connScope = sc.Scope()
}
if connScope == nil {
// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
var err error
connScope, err = l.rcmgr.OpenConnection(network.DirInbound, true, maconn.RemoteMultiaddr())
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to open incoming connection. Rejected by resource manager: %s", err)
}
continue
}
continue
}

// The go routine below calls Release when the context is
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
upgrader := swarmt.GenUpgrader(t, netw, nil)
upgraders = append(upgraders, upgrader)

tpt, err := tcp.NewTCPTransport(upgrader, nil)
tpt, err := tcp.NewTCPTransport(upgrader, nil, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
25 changes: 24 additions & 1 deletion p2p/test/transport/gating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package transport_integration

import (
"context"
"encoding/binary"
"net/netip"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -30,6 +32,23 @@ func stripCertHash(addr ma.Multiaddr) ma.Multiaddr {
return addr
}

func addrPort(addr ma.Multiaddr) netip.AddrPort {
a := netip.Addr{}
p := uint16(0)
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP4 || c.Protocol().Code == ma.P_IP6 {
a, _ = netip.AddrFromSlice(c.RawValue())
return false
}
if c.Protocol().Code == ma.P_UDP || c.Protocol().Code == ma.P_TCP {
p = binary.BigEndian.Uint16(c.RawValue())
return true
}
return false
})
return netip.AddrPortFrom(a, p)
}

func TestInterceptPeerDial(t *testing.T) {
if race.WithRace() {
t.Skip("The upgrader spawns a new Go routine, which leads to race conditions when using GoMock.")
Expand Down Expand Up @@ -173,10 +192,14 @@ func TestInterceptAccept(t *testing.T) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
}).AnyTimes()
} else if strings.Contains(tc.Name, "WebSocket-Shared") {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
require.Equal(t, addrPort(h2.Addrs()[0]), addrPort(addrs.LocalMultiaddr()))
})
} else {
connGater.EXPECT().InterceptAccept(gomock.Any()).Do(func(addrs network.ConnMultiaddrs) {
// remove the certhash component from WebTransport addresses
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr())
require.Equal(t, stripCertHash(h2.Addrs()[0]), addrs.LocalMultiaddr(), "%s\n%s", h2.Addrs()[0], addrs.LocalMultiaddr())
})
}

Expand Down
32 changes: 32 additions & 0 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ var transportsToTest = []TransportTestCase{
return h
},
},
{
Name: "TCP-Shared / TLS / Yamux",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
libp2pOpts = append(libp2pOpts, libp2p.Security(tls.ID, tls.New))
libp2pOpts = append(libp2pOpts, libp2p.Muxer(yamux.ID, yamux.DefaultTransport))
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket-Shared",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
libp2pOpts := transformOpts(opts)
libp2pOpts = append(libp2pOpts, libp2p.ShareTCPListener())
if opts.NoListen {
libp2pOpts = append(libp2pOpts, libp2p.NoListenAddrs)
} else {
libp2pOpts = append(libp2pOpts, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0/ws"))
}
h, err := libp2p.New(libp2pOpts...)
require.NoError(t, err)
return h
},
},
{
Name: "WebSocket",
HostGenerator: func(t *testing.T, opts TransportTestCaseOpts) host.Host {
Expand Down
54 changes: 54 additions & 0 deletions p2p/transport/tcp/metrics_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// go:build: unix

package tcp

import (
"testing"

tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/transport/tcpreuse"
ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"

"github.com/stretchr/testify/require"
)

func TestTcpTransportCollectsMetricsWithSharedTcpSocket(t *testing.T) {

peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

sharedTCPSocketA := tcpreuse.NewConnMgr(false, nil, nil)
sharedTCPSocketB := tcpreuse.NewConnMgr(false, nil, nil)

ua, err := tptu.New(ia, muxers, nil, nil, nil)
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil, sharedTCPSocketA, WithMetrics())
require.NoError(t, err)
ub, err := tptu.New(ib, muxers, nil, nil, nil)
require.NoError(t, err)
tb, err := NewTCPTransport(ub, nil, sharedTCPSocketB, WithMetrics())
require.NoError(t, err)

zero := "/ip4/127.0.0.1/tcp/0"

// Not running any test that needs more than 1 conn because the testsuite
// opens multiple conns via multiple listeners, which is not expected to work
// with the shared TCP socket.
subtestsToRun := []ttransport.TransportSubTestFn{
ttransport.SubtestProtocols,
ttransport.SubtestBasic,
ttransport.SubtestCancel,
ttransport.SubtestPingPong,

// Stolen from the stream muxer test suite.
ttransport.SubtestStress1Conn1Stream1Msg,
ttransport.SubtestStress1Conn1Stream100Msg,
ttransport.SubtestStress1Conn100Stream100Msg,
ttransport.SubtestStress1Conn1000Stream10Msg,
ttransport.SubtestStress1Conn100Stream100Msg10MB,
ttransport.SubtestStreamOpenStress,
ttransport.SubtestStreamReset,
}

ttransport.SubtestTransportWithFs(t, ta, tb, zero, peerA, subtestsToRun)
}
Loading

0 comments on commit 5a47a90

Please sign in to comment.