Skip to content

Commit

Permalink
quic: Update to quic-go v0.36.2 (#2424)
Browse files Browse the repository at this point in the history
* Update go.mod files

* Update to new quic-go API

* More renaming

* Add test back in

* Workaround quic-go#3947

* Fix transitive dep

* Use own pointer to packetConn

* Update to quic-go v0.36.2

* Remove workaround

* Embed quic.Transport

* Downgrade qtls-go1-20

* Rename ConnManager.metricsTracer to mt

* Close transport after test ends

* Close conn when transport closes

* Return better error

* Avoid conflicts with parallel tests

* Skip conn assert on windows

* Add metrics tracer back in

* Don't use tracers here

* Add comment to WriteTo

* Finish renaming conn -> transport where appropriate

* Back out unrelated change

* One more rename
  • Loading branch information
MarcoPolo authored and marten-seemann committed Jul 14, 2023
1 parent f802e7e commit a33370b
Show file tree
Hide file tree
Showing 13 changed files with 478 additions and 409 deletions.
38 changes: 19 additions & 19 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ require (
github.com/golang/mock v1.6.0
github.com/google/gopacket v1.1.19
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/hashicorp/golang-lru/v2 v2.0.4
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/klauspost/compress v1.16.5
github.com/klauspost/compress v1.16.7
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-flow-metrics v0.1.0
github.com/libp2p/go-libp2p-asn-util v0.3.0
Expand All @@ -29,7 +29,7 @@ require (
github.com/libp2p/go-nat v0.2.0
github.com/libp2p/go-netroute v0.2.1
github.com/libp2p/go-reuseport v0.3.0
github.com/libp2p/go-yamux/v4 v4.0.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
Expand All @@ -41,24 +41,24 @@ require (
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.2
github.com/multiformats/go-multihash v0.2.3
github.com/multiformats/go-multistream v0.4.1
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.4.0
github.com/quic-go/quic-go v0.33.0
github.com/quic-go/quic-go v0.36.2
github.com/quic-go/webtransport-go v0.5.3
github.com/raulk/go-watchdog v1.3.0
github.com/stretchr/testify v1.8.2
go.uber.org/fx v1.19.2
github.com/stretchr/testify v1.8.4
go.uber.org/fx v1.20.0
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.7.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/sync v0.2.0
golang.org/x/sys v0.8.0
golang.org/x/tools v0.9.1
google.golang.org/protobuf v1.30.0
golang.org/x/crypto v0.11.0
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
golang.org/x/sys v0.10.0
golang.org/x/tools v0.11.0
google.golang.org/protobuf v1.31.0
)

require (
Expand All @@ -81,7 +81,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
Expand All @@ -91,11 +91,11 @@ require (
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.54 // indirect
github.com/miekg/dns v1.1.55 // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -110,9 +110,9 @@ require (
go.uber.org/dig v1.17.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/text v0.11.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)
82 changes: 39 additions & 43 deletions go.sum

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ func (t *transport) holePunch(ctx context.Context, raddr ma.Multiaddr, p peer.ID
if err != nil {
return nil, err
}
pconn, err := t.connManager.Dial(network, addr)
tr, err := t.connManager.TransportForDial(network, addr)
if err != nil {
return nil, err
}
defer pconn.DecreaseCount()
defer tr.DecreaseCount()

ctx, cancel := context.WithTimeout(ctx, HolePunchTimeout)
defer cancel()
Expand Down Expand Up @@ -227,7 +227,7 @@ loop:
punchErr = err
break
}
if _, err := pconn.WriteTo(payload, addr); err != nil {
if _, err := tr.WriteTo(payload, addr); err != nil {
punchErr = err
break
}
Expand Down
106 changes: 64 additions & 42 deletions p2p/transport/quicreuse/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
quiclogging "github.com/quic-go/quic-go/logging"
)

var quicDialContext = quic.DialContext // so we can mock it in tests

type ConnManager struct {
reuseUDP4 *reuse
reuseUDP6 *reuse
Expand All @@ -25,20 +23,24 @@ type ConnManager struct {
serverConfig *quic.Config
clientConfig *quic.Config

connsMu sync.Mutex
conns map[string]connListenerEntry
quicListenersMu sync.Mutex
quicListeners map[string]quicListenerEntry

srk quic.StatelessResetKey
mt *metricsTracer
}

type connListenerEntry struct {
type quicListenerEntry struct {
refCount int
ln *connListener
ln *quicListener
}

func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*ConnManager, error) {
cm := &ConnManager{
enableReuseport: true,
enableDraft29: true,
conns: make(map[string]connListenerEntry),
quicListeners: make(map[string]quicListenerEntry),
srk: statelessResetKey,
}
for _, o := range opts {
if err := o(cm); err != nil {
Expand All @@ -47,17 +49,19 @@ func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*
}

quicConf := quicConfig.Clone()
quicConf.StatelessResetKey = &statelessResetKey

var tracers []quiclogging.Tracer
if qlogTracer != nil {
tracers = append(tracers, qlogTracer)
}
if cm.enableMetrics {
tracers = append(tracers, newMetricsTracer())
cm.mt = newMetricsTracer()
}
if len(tracers) > 0 {
quicConf.Tracer = quiclogging.NewMultiplexedTracer(tracers...)
quicConf.Tracer = func(ctx context.Context, p quiclogging.Perspective, ci quic.ConnectionID) quiclogging.ConnectionTracer {
tracers := make([]quiclogging.ConnectionTracer, 0, 2)
if qlogTracerDir != "" {
tracers = append(tracers, qloggerForDir(qlogTracerDir, p, ci))
}
if cm.mt != nil {
tracers = append(tracers, cm.mt.TracerForConnection(ctx, p, ci))
}
return quiclogging.NewMultiplexedConnectionTracer(tracers...)
}
serverConfig := quicConf.Clone()
if !cm.enableDraft29 {
Expand All @@ -67,8 +71,8 @@ func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*
cm.clientConfig = quicConf
cm.serverConfig = serverConfig
if cm.enableReuseport {
cm.reuseUDP4 = newReuse()
cm.reuseUDP6 = newReuse()
cm.reuseUDP4 = newReuse(&statelessResetKey, cm.mt)
cm.reuseUDP6 = newReuse(&statelessResetKey, cm.mt)
}
return cm, nil
}
Expand Down Expand Up @@ -100,22 +104,22 @@ func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWi
return nil, err
}

c.connsMu.Lock()
defer c.connsMu.Unlock()
c.quicListenersMu.Lock()
defer c.quicListenersMu.Unlock()

key := laddr.String()
entry, ok := c.conns[key]
entry, ok := c.quicListeners[key]
if !ok {
conn, err := c.listen(netw, laddr)
tr, err := c.transportForListen(netw, laddr)
if err != nil {
return nil, err
}
ln, err := newConnListener(conn, c.serverConfig, c.enableDraft29)
ln, err := newQuicListener(tr, c.serverConfig, c.enableDraft29)
if err != nil {
return nil, err
}
key = conn.LocalAddr().String()
entry = connListenerEntry{ln: ln}
key = tr.LocalAddr().String()
entry = quicListenerEntry{ln: ln}
}
l, err := entry.ln.Add(tlsConf, allowWindowIncrease, func() { c.onListenerClosed(key) })
if err != nil {
Expand All @@ -125,46 +129,50 @@ func (c *ConnManager) ListenQUIC(addr ma.Multiaddr, tlsConf *tls.Config, allowWi
return nil, err
}
entry.refCount++
c.conns[key] = entry
c.quicListeners[key] = entry
return l, nil
}

func (c *ConnManager) onListenerClosed(key string) {
c.connsMu.Lock()
defer c.connsMu.Unlock()
c.quicListenersMu.Lock()
defer c.quicListenersMu.Unlock()

entry := c.conns[key]
entry := c.quicListeners[key]
entry.refCount = entry.refCount - 1
if entry.refCount <= 0 {
delete(c.conns, key)
delete(c.quicListeners, key)
entry.ln.Close()
} else {
c.conns[key] = entry
c.quicListeners[key] = entry
}
}

func (c *ConnManager) listen(network string, laddr *net.UDPAddr) (pConn, error) {
func (c *ConnManager) transportForListen(network string, laddr *net.UDPAddr) (refCountedQuicTransport, error) {
if c.enableReuseport {
reuse, err := c.getReuse(network)
if err != nil {
return nil, err
}
return reuse.Listen(network, laddr)
return reuse.TransportForListen(network, laddr)
}

conn, err := net.ListenUDP(network, laddr)
conn, err := listenAndOptimize(network, laddr)
if err != nil {
return nil, err
}
return &noreuseConn{conn}, nil
tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}
if c.mt != nil {
tr.Transport.Tracer = c.mt
}
return tr, nil
}

func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf *tls.Config, allowWindowIncrease func(conn quic.Connection, delta uint64) bool) (quic.Connection, error) {
naddr, v, err := FromQuicMultiaddr(raddr)
if err != nil {
return nil, err
}
netw, host, err := manet.DialArgs(raddr)
netw, _, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
Expand All @@ -181,25 +189,25 @@ func (c *ConnManager) DialQUIC(ctx context.Context, raddr ma.Multiaddr, tlsConf
return nil, errors.New("unknown QUIC version")
}

pconn, err := c.Dial(netw, naddr)
tr, err := c.TransportForDial(netw, naddr)
if err != nil {
return nil, err
}
conn, err := quicDialContext(ctx, pconn, naddr, host, tlsConf, quicConf)
conn, err := tr.Dial(ctx, naddr, tlsConf, quicConf)
if err != nil {
pconn.DecreaseCount()
tr.DecreaseCount()
return nil, err
}
return conn, nil
}

func (c *ConnManager) Dial(network string, raddr *net.UDPAddr) (pConn, error) {
func (c *ConnManager) TransportForDial(network string, raddr *net.UDPAddr) (refCountedQuicTransport, error) {
if c.enableReuseport {
reuse, err := c.getReuse(network)
if err != nil {
return nil, err
}
return reuse.Dial(network, raddr)
return reuse.TransportForDial(network, raddr)
}

var laddr *net.UDPAddr
Expand All @@ -209,11 +217,16 @@ func (c *ConnManager) Dial(network string, raddr *net.UDPAddr) (pConn, error) {
case "udp6":
laddr = &net.UDPAddr{IP: net.IPv6zero, Port: 0}
}
conn, err := net.ListenUDP(network, laddr)
conn, err := listenAndOptimize(network, laddr)
if err != nil {
return nil, err
}
return &noreuseConn{conn}, nil
tr := &singleOwnerTransport{Transport: quic.Transport{Conn: conn, StatelessResetKey: &c.srk}, packetConn: conn}
if c.mt != nil {
tr.Transport.Tracer = c.mt
}

return tr, nil
}

func (c *ConnManager) Protocols() []int {
Expand All @@ -232,3 +245,12 @@ func (c *ConnManager) Close() error {
}
return c.reuseUDP4.Close()
}

// listenAndOptimize same as net.ListenUDP, but also calls quic.OptimizeConn
func listenAndOptimize(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
}
return quic.OptimizeConn(conn)
}
Loading

0 comments on commit a33370b

Please sign in to comment.