Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webrtc: correctly report incoming packet address on muxed connection #2586

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions p2p/transport/webrtc/udpmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"sync"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -42,9 +43,15 @@ type UDPMux struct {

queue chan Candidate

mx sync.Mutex
mx sync.Mutex
// ufragMap allows us to multiplex incoming STUN packets based on ufrag
ufragMap map[ufragConnKey]*muxedConnection
addrMap map[string]*muxedConnection
// addrMap allows us to correctly direct incoming packets after the connection
// is established and ufrag isn't available on all packets
addrMap map[string]*muxedConnection
// ufragAddrMap allows us to clean up all addresses from the addrMap once
// connection is closed
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
ufragAddrMap map[ufragConnKey][]net.Addr

// the context controls the lifecycle of the mux
wg sync.WaitGroup
Expand All @@ -57,12 +64,13 @@ var _ ice.UDPMux = &UDPMux{}
func NewUDPMux(socket net.PacketConn) *UDPMux {
ctx, cancel := context.WithCancel(context.Background())
mux := &UDPMux{
ctx: ctx,
cancel: cancel,
socket: socket,
ufragMap: make(map[ufragConnKey]*muxedConnection),
addrMap: make(map[string]*muxedConnection),
queue: make(chan Candidate, 32),
ctx: ctx,
cancel: cancel,
socket: socket,
ufragMap: make(map[ufragConnKey]*muxedConnection),
addrMap: make(map[string]*muxedConnection),
ufragAddrMap: make(map[ufragConnKey][]net.Addr),
queue: make(chan Candidate, 32),
}

return mux
Expand Down Expand Up @@ -130,7 +138,11 @@ func (mux *UDPMux) readLoop() {

n, addr, err := mux.socket.ReadFrom(buf)
if err != nil {
log.Errorf("error reading from socket: %v", err)
if strings.Contains(err.Error(), "use of closed network connection") {
log.Debugf("readLoop exiting: socket %s closed", mux.socket.LocalAddr())
} else {
log.Errorf("error reading from socket %s: %v", mux.socket.LocalAddr(), err)
}
pool.Put(buf)
return
}
Expand All @@ -157,7 +169,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
conn, ok := mux.addrMap[addr.String()]
mux.mx.Unlock()
if ok {
if err := conn.Push(buf); err != nil {
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
Expand Down Expand Up @@ -196,7 +208,7 @@ func (mux *UDPMux) processPacket(buf []byte, addr net.Addr) (processed bool) {
}
}

if err := conn.Push(buf); err != nil {
if err := conn.Push(buf, addr); err != nil {
log.Debugf("could not push packet: %v", err)
return false
}
Expand Down Expand Up @@ -250,9 +262,12 @@ func (mux *UDPMux) RemoveConnByUfrag(ufrag string) {

for _, isIPv6 := range [...]bool{true, false} {
key := ufragConnKey{ufrag: ufrag, isIPv6: isIPv6}
if conn, ok := mux.ufragMap[key]; ok {
if _, ok := mux.ufragMap[key]; ok {
delete(mux.ufragMap, key)
delete(mux.addrMap, conn.RemoteAddr().String())
for _, addr := range mux.ufragAddrMap[key] {
delete(mux.addrMap, addr.String())
}
delete(mux.ufragAddrMap, key)
}
}
}
Expand All @@ -264,12 +279,14 @@ func (mux *UDPMux) getOrCreateConn(ufrag string, isIPv6 bool, _ *UDPMux, addr ne
defer mux.mx.Unlock()

if conn, ok := mux.ufragMap[key]; ok {
mux.addrMap[addr.String()] = conn
mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return false, conn
}

conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) }, addr)
conn := newMuxedConnection(mux, func() { mux.RemoveConnByUfrag(ufrag) })
mux.ufragMap[key] = conn
mux.addrMap[addr.String()] = conn

mux.ufragAddrMap[key] = append(mux.ufragAddrMap[key], addr)
return true, conn
}
Loading