From 86f5592d5e32ef2226d4e7f5456f952914d7fe5c Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 18 Jul 2023 10:06:57 +0200 Subject: [PATCH] Fix empty new file edge case (#36076) It's possible that file scanning happens when the file was created but was not yet written to. In this case the size is 0. We should not spawn any resources (e.g. harvesters) for such files until they actually have some content. We create events only when read something from a file, so having a harvester on an empty file is not useful. Not handling this edge case also causes our tests to be flaky, sometimes an expected size does not match an actual size (0). --- filebeat/input/filestream/fswatch.go | 5 ++ filebeat/input/filestream/fswatch_test.go | 62 +++++++++++++++++++++-- 2 files changed, 63 insertions(+), 4 deletions(-) diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 0c14bcee8ce5..b5b02ab11c79 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -199,6 +199,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // remaining files in newFiles are newly created files for path, fd := range newFilesByName { + // no need to react on empty new files + if fd.Info.Size() == 0 { + w.log.Warnf("file %q has no content yet, skipping", fd.Filename) + continue + } select { case <-ctx.Done(): return diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 2a30dd8b3cdf..d2dbc893b8ef 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -30,6 +30,7 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestFileWatcher(t *testing.T) { @@ -219,10 +220,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -252,16 +253,69 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err = os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + t.Run("issues a warning in logs", func(t *testing.T) { + var lastWarning string + expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename) + require.Eventually(t, func() bool { + logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() + if len(logs) == 0 { + return false + } + lastWarning = logs[len(logs)-1].Message + return strings.Contains(lastWarning, expLogMsg) + }, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning) + }) + + t.Run("emits a create event once something is written to the empty file", func(t *testing.T) { + err = os.WriteFile(filename, []byte("hello"), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpWrite, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{name: basename, size: 5}, // +5 bytes appended + }, + } + requireEqualEvents(t, expEvent, e) + }) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)