Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race and deadlock in file_integrity module #8027

Merged
merged 2 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
*Auditbeat*

- Fixed a crash in the file_integrity module under Linux. {issue}7753[7753]
- Fixed a data race in the file_integrity module. {issue}8009[8009]
- Fixed a deadlock in the file_integrity module. {pull}8027[8027]

*Filebeat*

Expand Down
51 changes: 46 additions & 5 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewEventReader(c Config) (EventProducer, error) {
return &reader{
watcher: watcher,
config: c,
eventC: make(chan Event, 1),
log: logp.NewLogger(moduleName),
}, nil
}
Expand All @@ -56,7 +55,16 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)

queueDone := make(chan struct{})
queueC := make(chan []*Event)

// Launch a separate goroutine to fetch all events that happen while
// watches are being installed.
go func() {
defer close(queueC)
queueC <- r.enqueueEvents(queueDone)
}()

// Windows implementation of fsnotify needs to have the watched paths
// installed after the event consumer is started, to avoid a potential
Expand All @@ -73,21 +81,53 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

close(queueDone)
events := <-queueC

// Populate callee's event channel with the previously received events
r.eventC = make(chan Event, 1+len(events))
for _, ev := range events {
r.eventC <- *ev
}

go r.consumeEvents(done)

r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
return r.eventC, nil
}

func (r *reader) enqueueEvents(done <-chan struct{}) (events []*Event) {
for {
ev := r.nextEvent(done)
if ev == nil {
return
}
events = append(events, ev)
}
}

func (r *reader) consumeEvents(done <-chan struct{}) {
defer close(r.eventC)
defer r.watcher.Close()

for {
select {
case <-done:
ev := r.nextEvent(done)
if ev == nil {
r.log.Debug("fsnotify reader terminated")
return
}
r.eventC <- *ev
}
}

func (r *reader) nextEvent(done <-chan struct{}) *Event {
for {
select {
case <-done:
return nil

case event := <-r.watcher.EventChannel():
if event.Name == "" || r.config.IsExcludedPath(event.Name) ||
!r.config.IsIncludedPath(event.Name) {
Expand All @@ -102,7 +142,8 @@ func (r *reader) consumeEvents(done <-chan struct{}) {
r.config.MaxFileSizeBytes, r.config.HashTypes)
e.rtt = time.Since(start)

r.eventC <- e
return &e

case err := <-r.watcher.ErrorChannel():
// a bug in fsnotify can cause spurious nil errors to be sent
// on the error channel.
Expand Down
70 changes: 70 additions & 0 deletions auditbeat/module/file_integrity/eventreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package file_integrity

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -233,6 +235,74 @@ func TestEventReader(t *testing.T) {
})
}

func TestRaces(t *testing.T) {
const (
fileMode os.FileMode = 0640
N = 100
)

var dirs []string

for i := 0; i < N; i++ {
dir, err := ioutil.TempDir("", "audit")
if err != nil {
t.Fatal(err)
}
if dir, err = filepath.EvalSymlinks(dir); err != nil {
t.Fatal(err)
}
dirs = append(dirs, dir)
}

defer func() {
for _, dir := range dirs {
os.RemoveAll(dir)
}
}()

// Create a new EventProducer.
config := defaultConfig
config.Paths = dirs
config.Recursive = true
r, err := NewEventReader(config)
if err != nil {
t.Fatal(err)
}

done := make(chan struct{})
defer close(done)

// Generate a lot of events in parallel to Start() so there is a chance of
// events arriving before all watched dirs are Add()-ed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

go func() {
for i := 0; i < 10; i++ {
for _, dir := range dirs {
fname := filepath.Join(dir, fmt.Sprintf("%d.dat", i))
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}
}
}()
eventC, err := r.Start(done)
if err != nil {
t.Fatal(err)
}

const marker = "test_file"
for _, dir := range dirs {
fname := filepath.Join(dir, marker)
ioutil.WriteFile(fname, []byte("hello"), fileMode)
}

got := 0
for i := 0; got < N; i++ {
ev := readTimeout(t, eventC)
if strings.Contains(ev.Path, marker) {
got++
}
}
assert.Equal(t, N, got)
}

// readTimeout reads one event from the channel and returns it. If it does
// not receive an event after one second it will time-out and fail the test.
func readTimeout(t testing.TB, events <-chan Event) Event {
Expand Down
3 changes: 2 additions & 1 deletion auditbeat/module/file_integrity/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ func TestRecursiveSubdirPermissions(t *testing.T) {
watcher, err := New(true)
assertNoError(t, err)

assertNoError(t, watcher.Add(dir))
assertNoError(t, watcher.Start())
assertNoError(t, watcher.Add(dir))

defer func() {
assertNoError(t, watcher.Close())
}()
Expand Down
50 changes: 38 additions & 12 deletions auditbeat/module/file_integrity/monitor/recursive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ import (
)

type recursiveWatcher struct {
inner *fsnotify.Watcher
tree FileTree
eventC chan fsnotify.Event
done chan bool
inner *fsnotify.Watcher
tree FileTree
eventC chan fsnotify.Event
done chan bool
addC chan string
addErrC chan error
}

func newRecursiveWatcher(inner *fsnotify.Watcher) *recursiveWatcher {
return &recursiveWatcher{
inner: inner,
tree: FileTree{},
eventC: make(chan fsnotify.Event, 1),
inner: inner,
tree: FileTree{},
eventC: make(chan fsnotify.Event, 1),
addC: make(chan string),
addErrC: make(chan error),
}
}

Expand All @@ -48,6 +52,10 @@ func (watcher *recursiveWatcher) Start() error {
}

func (watcher *recursiveWatcher) Add(path string) error {
if watcher.done != nil {
watcher.addC <- path
return <-watcher.addErrC
}
return watcher.addRecursive(path)
}

Expand Down Expand Up @@ -104,6 +112,21 @@ func (watcher *recursiveWatcher) close() error {
return watcher.inner.Close()
}

func (watcher *recursiveWatcher) deliver(ev fsnotify.Event) {
for {
select {
case <-watcher.done:
return

case path := <-watcher.addC:
watcher.addErrC <- watcher.addRecursive(path)

case watcher.eventC <- ev:
return
}
}
}

func (watcher *recursiveWatcher) forwardEvents() error {
defer watcher.close()

Expand All @@ -112,6 +135,9 @@ func (watcher *recursiveWatcher) forwardEvents() error {
case <-watcher.done:
return nil

case path := <-watcher.addC:
watcher.addErrC <- watcher.addRecursive(path)

case event, ok := <-watcher.inner.Events:
if !ok {
return nil
Expand All @@ -125,19 +151,19 @@ func (watcher *recursiveWatcher) forwardEvents() error {
watcher.inner.Errors <- errors.Wrapf(err, "unable to recurse path '%s'", event.Name)
}
watcher.tree.Visit(event.Name, PreOrder, func(path string, _ bool) error {
watcher.eventC <- fsnotify.Event{
watcher.deliver(fsnotify.Event{
Name: path,
Op: event.Op,
}
})
return nil
})

case fsnotify.Remove:
watcher.tree.Visit(event.Name, PostOrder, func(path string, _ bool) error {
watcher.eventC <- fsnotify.Event{
watcher.deliver(fsnotify.Event{
Name: path,
Op: event.Op,
}
})
return nil
})
watcher.tree.Remove(event.Name)
Expand All @@ -151,7 +177,7 @@ func (watcher *recursiveWatcher) forwardEvents() error {
fallthrough

default:
watcher.eventC <- event
watcher.deliver(event)
}
}
}
Expand Down