From cc4c9c777722d9c2db8de6bb251124ba07e32987 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 17 Jul 2023 08:48:17 +0200 Subject: [PATCH] Fix empty new file edge case It's possible that file scanning happens when the file was created but was not yet written to. In this case the size is 0. We should not spawn any resources (e.g. harvesters) for such files until they actually have some content. We create events only when read something from a file, so having a harvester on an empty file is not useful. Not handling this edge case also causes our tests to be flaky, sometimes an expected size does not match an actual size (0). --- filebeat/input/filestream/fswatch.go | 5 ++++ filebeat/input/filestream/fswatch_test.go | 32 ++++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 371465e9b352..77959b04a1ac 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -198,6 +198,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) { // remaining files in newFiles are newly created files for path, fd := range newFilesByName { + // no need to react on empty new files + if fd.Info.Size() == 0 { + w.log.Warnf("file %q has no content yet, skipping", fd.Filename) + continue + } select { case <-ctx.Done(): return diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 90f2f594c4c0..42e4cba2203e 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -218,10 +218,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -251,16 +251,40 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + require.NoError(t, err) + e := fw.Event() + require.Equal(t, loginp.OpDone, e.Op) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)