Skip to content

Commit

Permalink
quic: source address and ECN support in the network layer
Browse files Browse the repository at this point in the history
Make the abstraction over UDP connections higher level,
and add support for setting the source address and ECN
bits in sent packets, and receving the destination
address and ECN bits in received packets.

There is no good way that I can find to identify the
source IP address of packets we send. Look up the
destination IP address of the first packet received on
each connection, and use this as the source address
for all future packets we send. This avoids unexpected
path migration, where the address we send from changes
without our knowing it.

Reject received packets sent from an unexpected peer
address.

In the future, when we support path migration, we will want
to relax these restrictions.

ECN bits may be used to detect network congestion.
We don't make use of them at this time, but this CL adds
the necessary UDP layer support to do so in the future.

This CL also lays the groundwork for using more efficient
platform APIs to send/receive packets in the future.
(sendmmsg/recvmmsg/GSO/GRO)

These features require platform-specific APIs.
Add support for Darwin and Linux to start with,
with a graceful fallback on other OSs.

For golang/go#58547

Change-Id: I1c97cc0d3e52fff18e724feaaac4a50d3df671bc
Reviewed-on: https://go-review.googlesource.com/c/net/+/565255
Reviewed-by: Jonathan Amsterdam <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
  • Loading branch information
neild committed Feb 21, 2024
1 parent 2a8baea commit a6a24dd
Show file tree
Hide file tree
Showing 17 changed files with 676 additions and 95 deletions.
9 changes: 7 additions & 2 deletions internal/quic/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Conn struct {
config *Config
testHooks connTestHooks
peerAddr netip.AddrPort
localAddr netip.AddrPort

msgc chan any
donec chan struct{} // closed when conn loop exits
Expand Down Expand Up @@ -97,7 +98,7 @@ func newConn(now time.Time, side connSide, cids newServerConnIDs, peerAddr netip
side: side,
endpoint: e,
config: config,
peerAddr: peerAddr,
peerAddr: unmapAddrPort(peerAddr),
msgc: make(chan any, 1),
donec: make(chan struct{}),
peerAckDelayExponent: -1,
Expand Down Expand Up @@ -317,7 +318,11 @@ func (c *Conn) loop(now time.Time) {
}
switch m := m.(type) {
case *datagram:
c.handleDatagram(now, m)
if !c.handleDatagram(now, m) {
if c.logEnabled(QLogLevelPacket) {
c.logPacketDropped(m)
}
}
m.recycle()
case timerEvent:
// A connection timer has expired.
Expand Down
38 changes: 25 additions & 13 deletions internal/quic/conn_recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,33 @@ package quic

import (
"bytes"
"context"
"encoding/binary"
"errors"
"time"
)

func (c *Conn) handleDatagram(now time.Time, dgram *datagram) {
func (c *Conn) handleDatagram(now time.Time, dgram *datagram) (handled bool) {
if !c.localAddr.IsValid() {
// We don't have any way to tell in the general case what address we're
// sending packets from. Set our address from the destination address of
// the first packet received from the peer.
c.localAddr = dgram.localAddr
}
if dgram.peerAddr.IsValid() && dgram.peerAddr != c.peerAddr {
if c.side == clientSide {
// "If a client receives packets from an unknown server address,
// the client MUST discard these packets."
// https://www.rfc-editor.org/rfc/rfc9000#section-9-6
return false
}
// We currently don't support connection migration,
// so for now the server also drops packets from an unknown address.
return false
}
buf := dgram.b
c.loss.datagramReceived(now, len(buf))
if c.isDraining() {
return
return false
}
for len(buf) > 0 {
var n int
Expand All @@ -28,7 +44,7 @@ func (c *Conn) handleDatagram(now time.Time, dgram *datagram) {
if c.side == serverSide && len(dgram.b) < paddedInitialDatagramSize {
// Discard client-sent Initial packets in too-short datagrams.
// https://www.rfc-editor.org/rfc/rfc9000#section-14.1-4
return
return false
}
n = c.handleLongHeader(now, ptype, initialSpace, c.keysInitial.r, buf)
case packetTypeHandshake:
Expand All @@ -37,10 +53,10 @@ func (c *Conn) handleDatagram(now time.Time, dgram *datagram) {
n = c.handle1RTT(now, buf)
case packetTypeRetry:
c.handleRetry(now, buf)
return
return true
case packetTypeVersionNegotiation:
c.handleVersionNegotiation(now, buf)
return
return true
default:
n = -1
}
Expand All @@ -58,20 +74,16 @@ func (c *Conn) handleDatagram(now time.Time, dgram *datagram) {
var token statelessResetToken
copy(token[:], buf[len(buf)-len(token):])
if c.handleStatelessReset(now, token) {
return
return true
}
}
// Invalid data at the end of a datagram is ignored.
if c.logEnabled(QLogLevelPacket) {
c.log.LogAttrs(context.Background(), QLogLevelPacket,
"connectivity:packet_dropped",
)
}
break
return false
}
c.idleHandlePacketReceived(now)
buf = buf[n:]
}
return true
}

func (c *Conn) handleLongHeader(now time.Time, ptype packetType, space numberSpace, k fixedKeys, buf []byte) int {
Expand Down
5 changes: 4 additions & 1 deletion internal/quic/conn_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ func (c *Conn) maybeSend(now time.Time) (next time.Time) {
}
}

c.endpoint.sendDatagram(buf, c.peerAddr)
c.endpoint.sendDatagram(datagram{
b: buf,
peerAddr: c.peerAddr,
})
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/quic/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ func (tc *testConn) writeFrames(ptype packetType, frames ...debugFrame) {
dstConnID: dstConnID,
srcConnID: tc.peerConnID,
}},
addr: tc.conn.peerAddr,
}
if ptype == packetTypeInitial && tc.conn.side == serverSide {
d.paddedSize = 1200
Expand Down Expand Up @@ -656,6 +657,12 @@ func (tc *testConn) wantPacket(expectation string, want *testPacket) {
}

func packetEqual(a, b *testPacket) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
ac := *a
ac.frames = nil
ac.header = 0
Expand Down
23 changes: 20 additions & 3 deletions internal/quic/dgram.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,25 @@ import (
)

type datagram struct {
b []byte
addr netip.AddrPort
b []byte
localAddr netip.AddrPort
peerAddr netip.AddrPort
ecn ecnBits
}

// Explicit Congestion Notification bits.
//
// https://www.rfc-editor.org/rfc/rfc3168.html#section-5
type ecnBits byte

const (
ecnMask = 0b000000_11
ecnNotECT = 0b000000_00
ecnECT1 = 0b000000_01
ecnECT0 = 0b000000_10
ecnCE = 0b000000_11
)

var datagramPool = sync.Pool{
New: func() any {
return &datagram{
Expand All @@ -26,7 +41,9 @@ var datagramPool = sync.Pool{

func newDatagram() *datagram {
m := datagramPool.Get().(*datagram)
m.b = m.b[:cap(m.b)]
*m = datagram{
b: m.b[:cap(m.b)],
}
return m
}

Expand Down
90 changes: 41 additions & 49 deletions internal/quic/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
//
// Multiple goroutines may invoke methods on an Endpoint simultaneously.
type Endpoint struct {
config *Config
udpConn udpConn
testHooks endpointTestHooks
resetGen statelessResetTokenGenerator
retry retryState
config *Config
packetConn packetConn
testHooks endpointTestHooks
resetGen statelessResetTokenGenerator
retry retryState

acceptQueue queue[*Conn] // new inbound connections
connsMap connsMap // only accessed by the listen loop
Expand All @@ -42,13 +42,12 @@ type endpointTestHooks interface {
newConn(c *Conn)
}

// A udpConn is a UDP connection.
// It is implemented by net.UDPConn.
type udpConn interface {
// A packetConn is the interface to sending and receiving UDP packets.
type packetConn interface {
Close() error
LocalAddr() net.Addr
ReadMsgUDPAddrPort(b, control []byte) (n, controln, flags int, _ netip.AddrPort, _ error)
WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error)
LocalAddr() netip.AddrPort
Read(f func(*datagram))
Write(datagram) error
}

// Listen listens on a local network address.
Expand All @@ -65,13 +64,17 @@ func Listen(network, address string, config *Config) (*Endpoint, error) {
if err != nil {
return nil, err
}
return newEndpoint(udpConn, config, nil)
pc, err := newNetUDPConn(udpConn)
if err != nil {
return nil, err
}
return newEndpoint(pc, config, nil)
}

func newEndpoint(udpConn udpConn, config *Config, hooks endpointTestHooks) (*Endpoint, error) {
func newEndpoint(pc packetConn, config *Config, hooks endpointTestHooks) (*Endpoint, error) {
e := &Endpoint{
config: config,
udpConn: udpConn,
packetConn: pc,
testHooks: hooks,
conns: make(map[*Conn]struct{}),
acceptQueue: newQueue[*Conn](),
Expand All @@ -90,8 +93,7 @@ func newEndpoint(udpConn udpConn, config *Config, hooks endpointTestHooks) (*End

// LocalAddr returns the local network address.
func (e *Endpoint) LocalAddr() netip.AddrPort {
a, _ := e.udpConn.LocalAddr().(*net.UDPAddr)
return a.AddrPort()
return e.packetConn.LocalAddr()
}

// Close closes the Endpoint.
Expand All @@ -114,7 +116,7 @@ func (e *Endpoint) Close(ctx context.Context) error {
conns = append(conns, c)
}
if len(e.conns) == 0 {
e.udpConn.Close()
e.packetConn.Close()
}
}
e.connsMu.Unlock()
Expand Down Expand Up @@ -200,34 +202,18 @@ func (e *Endpoint) connDrained(c *Conn) {
defer e.connsMu.Unlock()
delete(e.conns, c)
if e.closing && len(e.conns) == 0 {
e.udpConn.Close()
e.packetConn.Close()
}
}

func (e *Endpoint) listen() {
defer close(e.closec)
for {
m := newDatagram()
// TODO: Read and process the ECN (explicit congestion notification) field.
// https://tools.ietf.org/html/draft-ietf-quic-transport-32#section-13.4
n, _, _, addr, err := e.udpConn.ReadMsgUDPAddrPort(m.b, nil)
if err != nil {
// The user has probably closed the endpoint.
// We currently don't surface errors from other causes;
// we could check to see if the endpoint has been closed and
// record the unexpected error if it has not.
return
}
if n == 0 {
continue
}
e.packetConn.Read(func(m *datagram) {
if e.connsMap.updateNeeded.Load() {
e.connsMap.applyUpdates()
}
m.addr = addr
m.b = m.b[:n]
e.handleDatagram(m)
}
})
}

func (e *Endpoint) handleDatagram(m *datagram) {
Expand Down Expand Up @@ -277,7 +263,7 @@ func (e *Endpoint) handleUnknownDestinationDatagram(m *datagram) {
// If this is a 1-RTT packet, there's nothing productive we can do with it.
// Send a stateless reset if possible.
if !isLongHeader(m.b[0]) {
e.maybeSendStatelessReset(m.b, m.addr)
e.maybeSendStatelessReset(m.b, m.peerAddr)
return
}
p, ok := parseGenericLongHeaderPacket(m.b)
Expand All @@ -291,7 +277,7 @@ func (e *Endpoint) handleUnknownDestinationDatagram(m *datagram) {
return
default:
// Unknown version.
e.sendVersionNegotiation(p, m.addr)
e.sendVersionNegotiation(p, m.peerAddr)
return
}
if getPacketType(m.b) != packetTypeInitial {
Expand All @@ -309,15 +295,15 @@ func (e *Endpoint) handleUnknownDestinationDatagram(m *datagram) {
if e.config.RequireAddressValidation {
var ok bool
cids.retrySrcConnID = p.dstConnID
cids.originalDstConnID, ok = e.validateInitialAddress(now, p, m.addr)
cids.originalDstConnID, ok = e.validateInitialAddress(now, p, m.peerAddr)
if !ok {
return
}
} else {
cids.originalDstConnID = p.dstConnID
}
var err error
c, err := e.newConn(now, serverSide, cids, m.addr)
c, err := e.newConn(now, serverSide, cids, m.peerAddr)
if err != nil {
// The accept queue is probably full.
// We could send a CONNECTION_CLOSE to the peer to reject the connection.
Expand All @@ -329,7 +315,7 @@ func (e *Endpoint) handleUnknownDestinationDatagram(m *datagram) {
m = nil // don't recycle, sendMsg takes ownership
}

func (e *Endpoint) maybeSendStatelessReset(b []byte, addr netip.AddrPort) {
func (e *Endpoint) maybeSendStatelessReset(b []byte, peerAddr netip.AddrPort) {
if !e.resetGen.canReset {
// Config.StatelessResetKey isn't set, so we don't send stateless resets.
return
Expand Down Expand Up @@ -370,17 +356,21 @@ func (e *Endpoint) maybeSendStatelessReset(b []byte, addr netip.AddrPort) {
b[0] &^= headerFormLong // clear long header bit
b[0] |= fixedBit // set fixed bit
copy(b[len(b)-statelessResetTokenLen:], token[:])
e.sendDatagram(b, addr)
e.sendDatagram(datagram{
b: b,
peerAddr: peerAddr,
})
}

func (e *Endpoint) sendVersionNegotiation(p genericLongPacket, addr netip.AddrPort) {
func (e *Endpoint) sendVersionNegotiation(p genericLongPacket, peerAddr netip.AddrPort) {
m := newDatagram()
m.b = appendVersionNegotiation(m.b[:0], p.srcConnID, p.dstConnID, quicVersion1)
e.sendDatagram(m.b, addr)
m.peerAddr = peerAddr
e.sendDatagram(*m)
m.recycle()
}

func (e *Endpoint) sendConnectionClose(in genericLongPacket, addr netip.AddrPort, code transportError) {
func (e *Endpoint) sendConnectionClose(in genericLongPacket, peerAddr netip.AddrPort, code transportError) {
keys := initialKeys(in.dstConnID, serverSide)
var w packetWriter
p := longPacket{
Expand All @@ -399,12 +389,14 @@ func (e *Endpoint) sendConnectionClose(in genericLongPacket, addr netip.AddrPort
if len(buf) == 0 {
return
}
e.sendDatagram(buf, addr)
e.sendDatagram(datagram{
b: buf,
peerAddr: peerAddr,
})
}

func (e *Endpoint) sendDatagram(p []byte, addr netip.AddrPort) error {
_, err := e.udpConn.WriteToUDPAddrPort(p, addr)
return err
func (e *Endpoint) sendDatagram(dgram datagram) error {
return e.packetConn.Write(dgram)
}

// A connsMap is an endpoint's mapping of conn ids and reset tokens to conns.
Expand Down
Loading

0 comments on commit a6a24dd

Please sign in to comment.