From f94315700bd16d0ff78a92ce3692394aa9c2f585 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 13 Oct 2020 00:43:40 +0200 Subject: [PATCH] Fix concurrent map read and write in socket dataset (#21690) This fixes a panic caused by a concurrent map read and write in Auditbeat's system/socket dataset. Fixes #21192 (cherry picked from commit 9ab0a918237369343cab4e7d17ac09d08e887625) --- CHANGELOG.next.asciidoc | 1 + .../auditbeat/module/system/socket/events.go | 4 +-- .../module/system/socket/socket_linux.go | 2 +- .../auditbeat/module/system/socket/state.go | 11 ++++++-- .../module/system/socket/state_test.go | 26 +++++++++++++++++++ 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eb46131cfba..2f6d603d477 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -56,6 +56,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - system/package: Fix parsing of Installed-Size field of DEB packages. {issue}16661[16661] {pull}17188[17188] - system module: Fix panic during initialisation when /proc/stat can't be read. {pull}17569[17569] +- system/socket: Fixed a crash due to concurrent map read and write. {issue}21192[21192] {pull}21690[21690] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/events.go b/x-pack/auditbeat/module/system/socket/events.go index 76d91a8db24..c1937cb0302 100644 --- a/x-pack/auditbeat/module/system/socket/events.go +++ b/x-pack/auditbeat/module/system/socket/events.go @@ -872,8 +872,8 @@ type execveCall struct { creds *commitCreds } -func (e *execveCall) getProcess() process { - p := process{ +func (e *execveCall) getProcess() *process { + p := &process{ pid: e.Meta.PID, path: readCString(e.Path[:]), created: kernelTime(e.Meta.Timestamp), diff --git a/x-pack/auditbeat/module/system/socket/socket_linux.go b/x-pack/auditbeat/module/system/socket/socket_linux.go index 78fdd8ae4ca..11f8a22289e 100644 --- a/x-pack/auditbeat/module/system/socket/socket_linux.go +++ b/x-pack/auditbeat/module/system/socket/socket_linux.go @@ -158,7 +158,7 @@ func (m *MetricSet) Run(r mb.PushReporterV2) { } else { for _, p := range procs { if i, err := p.Info(); err == nil { - process := process{ + process := &process{ name: i.Name, pid: uint32(i.PID), args: i.Args, diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index c7b3ac761a7..fa612b56e1a 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -214,6 +214,9 @@ func (f *flow) Timestamp() time.Time { } type process struct { + // RWMutex is used to arbitrate reads and writes to resolvedDomains. + sync.RWMutex + pid uint32 name, path string args []string @@ -229,6 +232,8 @@ type process struct { } func (p *process) addTransaction(tr dns.Transaction) { + p.Lock() + defer p.Unlock() if p.resolvedDomains == nil { p.resolvedDomains = make(map[string]string) } @@ -239,6 +244,8 @@ func (p *process) addTransaction(tr dns.Transaction) { // ResolveIP returns the domain associated with the given IP. func (p *process) ResolveIP(ip net.IP) (domain string, found bool) { + p.RLock() + defer p.RUnlock() domain, found = p.resolvedDomains[ip.String()] return } @@ -542,13 +549,13 @@ func (s *state) ExpireOlder() { s.dns.CleanUp() } -func (s *state) CreateProcess(p process) error { +func (s *state) CreateProcess(p *process) error { if p.pid == 0 { return errors.New("can't create process with PID 0") } s.Lock() defer s.Unlock() - s.processes[p.pid] = &p + s.processes[p.pid] = p if p.createdTime == (time.Time{}) { p.createdTime = s.kernTimestampToTime(p.created) } diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 75b73a374d1..89ba0cde9db 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -11,6 +11,7 @@ import ( "fmt" "net" "os" + "sync" "testing" "time" @@ -835,3 +836,28 @@ func TestSocketReuse(t *testing.T) { } assert.Len(t, flows, 1) } + +func TestProcessDNSRace(t *testing.T) { + p := new(process) + var wg sync.WaitGroup + wg.Add(2) + address := func(i byte) net.IP { return net.IPv4(172, 16, 0, i) } + go func() { + for i := byte(255); i > 0; i-- { + p.addTransaction(dns.Transaction{ + Client: net.UDPAddr{IP: net.IPv4(10, 20, 30, 40)}, + Server: net.UDPAddr{IP: net.IPv4(10, 20, 30, 41)}, + Domain: "example.net", + Addresses: []net.IP{address(i)}, + }) + } + wg.Done() + }() + go func() { + for i := byte(255); i > 0; i-- { + p.ResolveIP(address(i)) + } + wg.Done() + }() + wg.Wait() +}