diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 01d14c7be493..0a69607f708d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -98,7 +98,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188] - system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569] - system/package: Fix an error that can occur while trying to persist package metadata. {issue}18536[18536] {pull}18887[18887] -- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] +- system/socket: Fix dataset using 100% CPU and becoming unresponsive in some scenarios. {pull}19033[19033] {pull}19764[19764] - system/socket: Fixed tracking of long-running connections. {pull}19033[19033] - system/package: Fix librpm loading on Fedora 31/32. {pull}NNNN[NNNN] diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 9917608f7e66..45bc28446f2c 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -523,7 +523,7 @@ func (s *state) ExpireOlder() { deadline = s.clock().Add(-s.socketTimeout) for item := s.socketLRU.peek(); item != nil && item.Timestamp().Before(deadline); { if sock, ok := item.(*socket); ok { - s.onSockDestroyed(sock.sock, 0) + s.onSockDestroyed(sock.sock, sock, 0) } else { s.socketLRU.get() } @@ -704,13 +704,16 @@ func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error { s.Lock() defer s.Unlock() - return s.onSockDestroyed(ptr, pid) + return s.onSockDestroyed(ptr, nil, pid) } -func (s *state) onSockDestroyed(ptr uintptr, pid uint32) error { - sock, found := s.socks[ptr] - if !found { - return nil +func (s *state) onSockDestroyed(ptr uintptr, sock *socket, pid uint32) error { + var found bool + if sock == nil { + sock, found = s.socks[ptr] + if !found { + return nil + } } // Enrich with pid if sock.pid == 0 && pid != 0 { diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 9b36c8b5dd4a..75b73a374d18 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -152,7 +152,6 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{ - callExecve(meta(1234, 1234, 1), []string{"/usr/bin/curl", "https://example.net/", "-o", "/tmp/site.html"}), &commitCreds{Meta: meta(1234, 1234, 2), UID: 501, GID: 20, EUID: 501, EGID: 20}, &execveRet{Meta: meta(1234, 1234, 2), Retval: 1234}, @@ -302,6 +301,32 @@ func TestTCPConnWithProcessSocketTimeouts(t *testing.T) { } } +func TestSocketExpirationWithOverwrittenSockets(t *testing.T) { + const ( + sock uintptr = 0xff1234 + flowTimeout = time.Hour + socketTimeout = time.Minute * 3 + closeTimeout = time.Minute + ) + st := makeState(nil, (*logWrapper)(t), flowTimeout, socketTimeout, closeTimeout, time.Second) + now := time.Now() + st.clock = func() time.Time { + return now + } + if err := feedEvents([]event{ + &inetCreate{Meta: meta(1234, 1236, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1236, 5), Sock: sock}, + &inetCreate{Meta: meta(1234, 1237, 5), Proto: 0}, + &sockInitData{Meta: meta(1234, 1237, 5), Sock: sock}, + }, st, t); err != nil { + t.Fatal(err) + } + now = now.Add(closeTimeout + 1) + st.ExpireOlder() + now = now.Add(socketTimeout + 1) + st.ExpireOlder() +} + func TestUDPOutgoingSinglePacketWithProcess(t *testing.T) { const ( localIP = "192.168.33.10"