diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index c6afc72b986..3203b90be1d 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -199,6 +199,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // remaining files in newFiles are newly created files for path, fd := range newFilesByName { + // no need to react on empty new files + if fd.Info.Size() == 0 { + w.log.Warnf("file %q has no content yet, skipping", fd.Filename) + continue + } select { case <-ctx.Done(): return diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 8cfcff723c4..23e2af4bbd4 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -30,6 +30,7 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) func TestFileWatcher(t *testing.T) { @@ -219,10 +220,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -252,16 +253,69 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err = os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + t.Run("issues a warning in logs", func(t *testing.T) { + var lastWarning string + expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename) + require.Eventually(t, func() bool { + logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll() + if len(logs) == 0 { + return false + } + lastWarning = logs[len(logs)-1].Message + return strings.Contains(lastWarning, expLogMsg) + }, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning) + }) + + t.Run("emits a create event once something is written to the empty file", func(t *testing.T) { + err = os.WriteFile(filename, []byte("hello"), 0777) + require.NoError(t, err) + + e := fw.Event() + expEvent := loginp.FSEvent{ + NewPath: filename, + OldPath: filename, + Op: loginp.OpWrite, + Descriptor: loginp.FileDescriptor{ + Filename: filename, + Info: testFileInfo{path: basename, size: 5}, // +5 bytes appended + }, + } + requireEqualEvents(t, expEvent, e) + }) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)