From 1db1158d98851c540ad67438124e231d619da39e Mon Sep 17 00:00:00 2001 From: Denis Rechkunov <denis.rechkunov@elastic.co> Date: Mon, 17 Jul 2023 08:48:17 +0200 Subject: [PATCH] Fix empty file edge case It's possible that file scanning happens when the file was created but was not yet written to. In this case the size is 0. We should not spawn any resources (e.g. harvesters) for such files until they actually have some content. We create events only when read something from a file, so having a harvester on an empty file is not useful. Not handling this edge case also causes our tests to be flaky, sometimes an expected size does not match an actual size (0). --- filebeat/input/filestream/fswatch.go | 5 ++++ filebeat/input/filestream/fswatch_test.go | 32 ++++++++++++++++++++--- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/filebeat/input/filestream/fswatch.go b/filebeat/input/filestream/fswatch.go index 371465e9b352..f240a2acfd21 100644 --- a/filebeat/input/filestream/fswatch.go +++ b/filebeat/input/filestream/fswatch.go @@ -365,6 +365,11 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor { continue } + if it.info.Size() == 0 { + s.log.Warnf("file %q has no content yet, skipping", filename) + continue + } + fd, err := s.toFileDescriptor(&it) if err != nil { s.log.Warnf("cannot create a file descriptor for an ingest target %q: %s", filename, err) diff --git a/filebeat/input/filestream/fswatch_test.go b/filebeat/input/filestream/fswatch_test.go index 90f2f594c4c0..42e4cba2203e 100644 --- a/filebeat/input/filestream/fswatch_test.go +++ b/filebeat/input/filestream/fswatch_test.go @@ -218,10 +218,10 @@ scanner: paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr) @@ -251,16 +251,40 @@ scanner: require.Equal(t, loginp.OpDone, e.Op) }) + t.Run("does not emit events for empty files", func(t *testing.T) { + dir := t.TempDir() + paths := []string{filepath.Join(dir, "*.log")} + cfgStr := ` +scanner: + check_interval: 10ms +` + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + fw := createWatcherWithConfig(t, paths, cfgStr) + go fw.Run(ctx) + + basename := "created.log" + filename := filepath.Join(dir, basename) + err := os.WriteFile(filename, nil, 0777) + require.NoError(t, err) + + require.NoError(t, err) + e := fw.Event() + require.Equal(t, loginp.OpDone, e.Op) + }) + t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) { dir := t.TempDir() paths := []string{filepath.Join(dir, "*.log")} cfgStr := ` scanner: - check_interval: 100ms + check_interval: 10ms fingerprint.enabled: true ` - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() fw := createWatcherWithConfig(t, paths, cfgStr)