Skip to content

Commit

Permalink
Fix empty file edge case
Browse files Browse the repository at this point in the history
It's possible that file scanning happens when the file was created but
was not yet written to. In this case the size is 0.

We should not spawn any resources (e.g. harvesters) for such files until they actually
have some content. We create events only when read something from a
file, so having a harvester on an empty file is not useful.

Not handling this edge case also causes our tests to be flaky,
sometimes an expected size does not match an actual size (0).
  • Loading branch information
rdner committed Jul 17, 2023
1 parent a37490d commit 74e3ce4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
5 changes: 5 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ func (s *fileScanner) GetFiles() map[string]loginp.FileDescriptor {
continue
}

if it.info.Size() == 0 {
s.log.Warnf("file %q has no content yet, skipping", filename)
continue
}

fd, err := s.toFileDescriptor(&it)
if err != nil {
s.log.Warnf("cannot create a file descriptor for an ingest target %q: %s", filename, err)
Expand Down
30 changes: 27 additions & 3 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ scanner:
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 100ms
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down Expand Up @@ -251,6 +251,30 @@ scanner:
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not emit events for empty files", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
go fw.Run(ctx)

basename := "created.log"
filename := filepath.Join(dir, basename)
err := os.WriteFile(filename, nil, 0777)
require.NoError(t, err)

require.NoError(t, err)
e := fw.Event()
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
Expand All @@ -260,7 +284,7 @@ scanner:
fingerprint.enabled: true
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down

0 comments on commit 74e3ce4

Please sign in to comment.