Skip to content

Commit

Permalink
[packetbeat] Expire source port mappings. (#41581)
Browse files Browse the repository at this point in the history
port->pid mappings were only overwritten, never expired, the overwriting
mechanism has a bunch of issues:
 - It only overwrites if it manages to find the new pid, so it misses short
lived processes.
 - It only refreshes the mapping of said port, if a packet arriving on _another_
port misses the lookup (otherwise the original port is found and returned).
Meaning, once all ports are used at least once, the cache is filled and never
mutated again.

The observable effect is that the user will see wrong process correlations _to_
older/long lived processes, imagine the follwing:
 - Long lived process makes _short_ lived TCP connection from src_port S.
 - Years later, a _short_ lived process makes a TCP connection to somewhere
else, but from the same src_port S. It hits the cache, since it had a mapping
for S, so packetbeat incorrectly correlates the new short-lived process
connection, with the old long lived process.

Related to a very long SDH, where a more in depth explanation of the bug can be
found here, with a program to reproduce it.
 - elastic/sdh-beats#4604 (comment)
 - elastic/sdh-beats#4604 (comment)

The solution is to discard mappings that are "old enough", with a hardcoded
window of 10 seconds, so as long as the port is not re-used in this window, we
are fine.

This also makes sure the cache never becomes "immutable", since mappings will
invariably get old, forcing a refresh.

It's a very conservative approach as I don't want to introduce other bugs by
redesigning it, work is on the way to change how the cache works in linux
anyway.

While here, I've noticed the locking was also wrong, we were doing the lookup
unlocked, and also having to relock in case we have to update the mapping, so
change this to grab the lock once and only once, interleaving is baad.
  • Loading branch information
haesbaert authored Nov 12, 2024
1 parent c07fffe commit 587dc60
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Packetbeat*

- Expire source port mappings. {pull}41581[41581]

*Winlogbeat*

Expand Down
64 changes: 49 additions & 15 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type portProcMapping struct {
endpoint endpoint // FIXME: This is never used.
pid int
proc *process
expires time.Time
}

// process describes an OS process.
Expand Down Expand Up @@ -185,8 +186,8 @@ func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {

func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process {
proc.mu.Lock()
defer proc.mu.Unlock()
procMap, ok := proc.portProcMap[transport]
proc.mu.Unlock()
if !ok {
return nil
}
Expand All @@ -206,24 +207,47 @@ func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport ap
return nil
}

func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) {
// proc.mu must be locked
func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (portProcMapping, bool) {
now := time.Now()
key := endpoint{address.String(), port}
p, found := procMap[key]

// Precedence when one socket is bound to a specific IP:port and another one
// to INADDR_ANY and same port is not clear. Seems that the last one to bind
// takes precedence, and we don't have a way to tell.
// This function takes the naive approach of giving precedence to the more
// specific address and then to INADDR_ANY.
if p, found = procMap[endpoint{address.String(), port}]; found {
return p, found
if !found {
if address.To4() != nil {
key.address = anyIPv4
} else {
key.address = anyIPv6
}

p, found = procMap[key]
}

nullAddr := anyIPv4
if asIPv4 := address.To4(); asIPv4 == nil {
nullAddr = anyIPv6
// We can't guarantee `p` doesn't point to an old entry, since
// we never remove entries from `procMap`, we only overwrite
// them, but we only overwrite them once an unrelated packet
// that doesn't have an entry on `procMap` ends up rebuilding
// the whole map.
//
// We take a conservative approach by discarding the entry if
// it's old enough. When we fail the first time here, our caller
// updates all maps and calls us again.
if found && now.After(p.expires) {
logp.Debug("procs", "PID %d (%s) port %d is too old, discarding", p.pid, p.proc.name, port)
delete(procMap, key)
p = portProcMapping{}
found = false
}
p, found = procMap[endpoint{nullAddr, port}]

return p, found
}

// proc.mu must be locked
func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
Expand All @@ -244,6 +268,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
}
}

// proc.mu must be locked
func (proc *ProcessesWatcher) expireProcessCache() {
now := time.Now()
for pid, info := range proc.processCache {
Expand All @@ -253,9 +278,8 @@ func (proc *ProcessesWatcher) expireProcessCache() {
}
}

// proc.mu must be locked
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) {
proc.mu.Lock()
defer proc.mu.Unlock()
prev, ok := proc.portProcMap[transport][e]
if ok && prev.pid == pid {
// This port->pid mapping already exists
Expand All @@ -267,11 +291,21 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e
return
}

// Simply overwrite old entries for now.
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p}
// We overwrite previous entries here, and they expire in
// lookupMapping() if they are deemed old enough.
//
// Map size is bound by the number of ports: ~65k, so it's
// fine to have old entries lingering, as long as we don't
// trust them on subsequent connections.
//
// If the source port is re-used within the hardcoded 10
// seconds window, we might end up hitting an old mapping.
proc.portProcMap[transport][e] = portProcMapping{
endpoint: e,
pid: pid,
proc: p,
expires: time.Now().Add(10 * time.Second),
}

if logp.IsDebug("procsdetailed") {
logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s'",
Expand Down

0 comments on commit 587dc60

Please sign in to comment.