diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9d3a3a92c72..e2a07b3fe3a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Packetbeat* +- Expire source port mappings. {pull}41581[41581] *Winlogbeat* diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index 11b9fc2e8d8..f578b49a5d4 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -64,6 +64,7 @@ type portProcMapping struct { endpoint endpoint // FIXME: This is never used. pid int proc *process + expires time.Time } // process describes an OS process. @@ -185,8 +186,8 @@ func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process { proc.mu.Lock() + defer proc.mu.Unlock() procMap, ok := proc.portProcMap[transport] - proc.mu.Unlock() if !ok { return nil } @@ -206,24 +207,47 @@ func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport ap return nil } -func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) { +// proc.mu must be locked +func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (portProcMapping, bool) { + now := time.Now() + key := endpoint{address.String(), port} + p, found := procMap[key] + // Precedence when one socket is bound to a specific IP:port and another one // to INADDR_ANY and same port is not clear. Seems that the last one to bind // takes precedence, and we don't have a way to tell. // This function takes the naive approach of giving precedence to the more // specific address and then to INADDR_ANY. - if p, found = procMap[endpoint{address.String(), port}]; found { - return p, found + if !found { + if address.To4() != nil { + key.address = anyIPv4 + } else { + key.address = anyIPv6 + } + + p, found = procMap[key] } - nullAddr := anyIPv4 - if asIPv4 := address.To4(); asIPv4 == nil { - nullAddr = anyIPv6 + // We can't guarantee `p` doesn't point to an old entry, since + // we never remove entries from `procMap`, we only overwrite + // them, but we only overwrite them once an unrelated packet + // that doesn't have an entry on `procMap` ends up rebuilding + // the whole map. + // + // We take a conservative approach by discarding the entry if + // it's old enough. When we fail the first time here, our caller + // updates all maps and calls us again. + if found && now.After(p.expires) { + logp.Debug("procs", "PID %d (%s) port %d is too old, discarding", p.pid, p.proc.name, port) + delete(procMap, key) + p = portProcMapping{} + found = false } - p, found = procMap[endpoint{nullAddr, port}] + return p, found } +// proc.mu must be locked func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() @@ -244,6 +268,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { } } +// proc.mu must be locked func (proc *ProcessesWatcher) expireProcessCache() { now := time.Now() for pid, info := range proc.processCache { @@ -253,9 +278,8 @@ func (proc *ProcessesWatcher) expireProcessCache() { } } +// proc.mu must be locked func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) { - proc.mu.Lock() - defer proc.mu.Unlock() prev, ok := proc.portProcMap[transport][e] if ok && prev.pid == pid { // This port->pid mapping already exists @@ -267,11 +291,21 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e return } - // Simply overwrite old entries for now. - // We never expire entries from this map. Since there are 65k possible - // ports, the size of the dict can be max 1.5 MB, which we consider - // reasonable. - proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p} + // We overwrite previous entries here, and they expire in + // lookupMapping() if they are deemed old enough. + // + // Map size is bound by the number of ports: ~65k, so it's + // fine to have old entries lingering, as long as we don't + // trust them on subsequent connections. + // + // If the source port is re-used within the hardcoded 10 + // seconds window, we might end up hitting an old mapping. + proc.portProcMap[transport][e] = portProcMapping{ + endpoint: e, + pid: pid, + proc: p, + expires: time.Now().Add(10 * time.Second), + } if logp.IsDebug("procsdetailed") { logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s'",