Skip to content

Commit

Permalink
Improve error catching for recursiveWatcher (elastic#8087)
Browse files Browse the repository at this point in the history
There have been spurious test failures in test_file_integrity.Test.test_recursive (elastic#7731). This makes sure all errors encountered in recursiveWatcher are caught and logged, and also adds a debug message when a new recursive watch is added.

(cherry picked from commit e5f16e2)
  • Loading branch information
Christoph Wurm committed Sep 18, 2018
1 parent 95d661f commit ba7cc31
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 6 deletions.
4 changes: 4 additions & 0 deletions auditbeat/module/file_integrity/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/fsnotify/fsnotify"
)

const (
moduleName = "file_integrity"
)

// Watcher is an interface for a file watcher akin to fsnotify.Watcher
// with an additional Start method.
type Watcher interface {
Expand Down
32 changes: 26 additions & 6 deletions auditbeat/module/file_integrity/monitor/recursive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
)

type recursiveWatcher struct {
Expand All @@ -33,6 +35,7 @@ type recursiveWatcher struct {
done chan bool
addC chan string
addErrC chan error
log *logp.Logger
}

func newRecursiveWatcher(inner *fsnotify.Watcher) *recursiveWatcher {
Expand All @@ -42,6 +45,7 @@ func newRecursiveWatcher(inner *fsnotify.Watcher) *recursiveWatcher {
eventC: make(chan fsnotify.Event, 1),
addC: make(chan string),
addErrC: make(chan error),
log: logp.NewLogger(moduleName),
}
}

Expand Down Expand Up @@ -101,6 +105,8 @@ func (watcher *recursiveWatcher) addRecursive(path string) error {
}
return err
})
watcher.log.Debugw("Added recursive watch", "path", path)

if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to walk path '%s'", path))
}
Expand Down Expand Up @@ -147,33 +153,47 @@ func (watcher *recursiveWatcher) forwardEvents() error {
}
switch event.Op {
case fsnotify.Create:
if err := watcher.addRecursive(event.Name); err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "unable to recurse path '%s'", event.Name)
err := watcher.addRecursive(event.Name)
if err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "failed to add created path '%s'", event.Name)
}
watcher.tree.Visit(event.Name, PreOrder, func(path string, _ bool) error {
err = watcher.tree.Visit(event.Name, PreOrder, func(path string, _ bool) error {
watcher.deliver(fsnotify.Event{
Name: path,
Op: event.Op,
})
return nil
})
if err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "failed to visit created path '%s'", event.Name)
}

case fsnotify.Remove:
watcher.tree.Visit(event.Name, PostOrder, func(path string, _ bool) error {
err := watcher.tree.Visit(event.Name, PostOrder, func(path string, _ bool) error {
watcher.deliver(fsnotify.Event{
Name: path,
Op: event.Op,
})
return nil
})
watcher.tree.Remove(event.Name)
if err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "failed to visit removed path '%s'", event.Name)
}

err = watcher.tree.Remove(event.Name)
if err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "failed to visit removed path '%s'", event.Name)
}

// Handling rename (move) as a special case to give this recursion
// the same semantics as macOS FSEvents:
// - Removal of a dir notifies removal for all files inside it
// - Moving a dir away sends only one notification for this dir
case fsnotify.Rename:
watcher.tree.Remove(event.Name)
err := watcher.tree.Remove(event.Name)
if err != nil {
watcher.inner.Errors <- errors.Wrapf(err, "failed to remove path '%s'", event.Name)
}
fallthrough

default:
Expand Down
4 changes: 4 additions & 0 deletions auditbeat/tests/system/test_file_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,17 @@ def test_recursive(self):
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)
self.wait_log_contains("\"recursive\": true")

# auditbeat_test/subdir/
subdir = os.path.join(dirs[0], "subdir")
os.mkdir(subdir)
# auditbeat_test/subdir/file.txt
file1 = os.path.join(subdir, "file.txt")
self.create_file(file1, "hello world!")

# auditbeat_test/subdir/other/
subdir2 = os.path.join(subdir, "other")
os.mkdir(subdir2)
# auditbeat_test/subdir/other/more.txt
file2 = os.path.join(subdir2, "more.txt")
self.create_file(file2, "")

Expand Down

0 comments on commit ba7cc31

Please sign in to comment.