diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cc8fb52396a..f01136c441e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - auditd: Fix typo in `event.action` of `removed-user-role-from`. {pull}19300[19300] - auditd: Fix typo in `event.action` of `used-suspicious-link`. {pull}19300[19300] - system/socket: Fix kprobe grouping to allow running more than one instance. {pull}20325[20325] +- system/socket: Fixed a crash due to concurrent map read and write. {issue}21192[21192] {pull}21690[21690] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/events.go b/x-pack/auditbeat/module/system/socket/events.go index 76d91a8db24..c1937cb0302 100644 --- a/x-pack/auditbeat/module/system/socket/events.go +++ b/x-pack/auditbeat/module/system/socket/events.go @@ -872,8 +872,8 @@ type execveCall struct { creds *commitCreds } -func (e *execveCall) getProcess() process { - p := process{ +func (e *execveCall) getProcess() *process { + p := &process{ pid: e.Meta.PID, path: readCString(e.Path[:]), created: kernelTime(e.Meta.Timestamp), diff --git a/x-pack/auditbeat/module/system/socket/socket_linux.go b/x-pack/auditbeat/module/system/socket/socket_linux.go index 78fdd8ae4ca..11f8a22289e 100644 --- a/x-pack/auditbeat/module/system/socket/socket_linux.go +++ b/x-pack/auditbeat/module/system/socket/socket_linux.go @@ -158,7 +158,7 @@ func (m *MetricSet) Run(r mb.PushReporterV2) { } else { for _, p := range procs { if i, err := p.Info(); err == nil { - process := process{ + process := &process{ name: i.Name, pid: uint32(i.PID), args: i.Args, diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index c7b3ac761a7..fa612b56e1a 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -214,6 +214,9 @@ func (f *flow) Timestamp() time.Time { } type process struct { + // RWMutex is used to arbitrate reads and writes to resolvedDomains. + sync.RWMutex + pid uint32 name, path string args []string @@ -229,6 +232,8 @@ type process struct { } func (p *process) addTransaction(tr dns.Transaction) { + p.Lock() + defer p.Unlock() if p.resolvedDomains == nil { p.resolvedDomains = make(map[string]string) } @@ -239,6 +244,8 @@ func (p *process) addTransaction(tr dns.Transaction) { // ResolveIP returns the domain associated with the given IP. func (p *process) ResolveIP(ip net.IP) (domain string, found bool) { + p.RLock() + defer p.RUnlock() domain, found = p.resolvedDomains[ip.String()] return } @@ -542,13 +549,13 @@ func (s *state) ExpireOlder() { s.dns.CleanUp() } -func (s *state) CreateProcess(p process) error { +func (s *state) CreateProcess(p *process) error { if p.pid == 0 { return errors.New("can't create process with PID 0") } s.Lock() defer s.Unlock() - s.processes[p.pid] = &p + s.processes[p.pid] = p if p.createdTime == (time.Time{}) { p.createdTime = s.kernTimestampToTime(p.created) } diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 75b73a374d1..89ba0cde9db 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -11,6 +11,7 @@ import ( "fmt" "net" "os" + "sync" "testing" "time" @@ -835,3 +836,28 @@ func TestSocketReuse(t *testing.T) { } assert.Len(t, flows, 1) } + +func TestProcessDNSRace(t *testing.T) { + p := new(process) + var wg sync.WaitGroup + wg.Add(2) + address := func(i byte) net.IP { return net.IPv4(172, 16, 0, i) } + go func() { + for i := byte(255); i > 0; i-- { + p.addTransaction(dns.Transaction{ + Client: net.UDPAddr{IP: net.IPv4(10, 20, 30, 40)}, + Server: net.UDPAddr{IP: net.IPv4(10, 20, 30, 41)}, + Domain: "example.net", + Addresses: []net.IP{address(i)}, + }) + } + wg.Done() + }() + go func() { + for i := byte(255); i > 0; i-- { + p.ResolveIP(address(i)) + } + wg.Done() + }() + wg.Wait() +}