Skip to content

Commit

Permalink
Fix empty new file edge case (#36076)
Browse files Browse the repository at this point in the history
It's possible that file scanning happens when the file was created but
was not yet written to. In this case the size is 0.

We should not spawn any resources (e.g. harvesters) for such files until they actually
have some content. We create events only when read something from a
file, so having a harvester on an empty file is not useful.

Not handling this edge case also causes our tests to be flaky,
sometimes an expected size does not match an actual size (0).

(cherry picked from commit 0cd5775)

# Conflicts:
#	filebeat/input/filestream/fswatch_test.go
  • Loading branch information
rdner authored and mergify[bot] committed Jul 18, 2023
1 parent 14aef55 commit 8f32c34
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
5 changes: 5 additions & 0 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {

// remaining files in newFiles are newly created files
for path, fd := range newFilesByName {
// no need to react on empty new files
if fd.Info.Size() == 0 {
w.log.Warnf("file %q has no content yet, skipping", fd.Filename)
continue
}
select {
case <-ctx.Done():
return
Expand Down
66 changes: 62 additions & 4 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ import (
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
<<<<<<< HEAD
"github.com/elastic/beats/v7/libbeat/common"
=======
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
>>>>>>> 0cd57755c1 (Fix empty new file edge case (#36076))
)

func TestFileWatcher(t *testing.T) {
Expand Down Expand Up @@ -218,10 +223,10 @@ scanner:
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 100ms
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down Expand Up @@ -251,16 +256,69 @@ scanner:
require.Equal(t, loginp.OpDone, e.Op)
})

t.Run("does not emit events for empty files", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 10ms
`

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err)

fw := createWatcherWithConfig(t, paths, cfgStr)
go fw.Run(ctx)

basename := "created.log"
filename := filepath.Join(dir, basename)
err = os.WriteFile(filename, nil, 0777)
require.NoError(t, err)

t.Run("issues a warning in logs", func(t *testing.T) {
var lastWarning string
expLogMsg := fmt.Sprintf("file %q has no content yet, skipping", filename)
require.Eventually(t, func() bool {
logs := logp.ObserverLogs().FilterLevelExact(logp.WarnLevel.ZapLevel()).TakeAll()
if len(logs) == 0 {
return false
}
lastWarning = logs[len(logs)-1].Message
return strings.Contains(lastWarning, expLogMsg)
}, 100*time.Millisecond, 10*time.Millisecond, "required a warning message %q but got %q", expLogMsg, lastWarning)
})

t.Run("emits a create event once something is written to the empty file", func(t *testing.T) {
err = os.WriteFile(filename, []byte("hello"), 0777)
require.NoError(t, err)

e := fw.Event()
expEvent := loginp.FSEvent{
NewPath: filename,
OldPath: filename,
Op: loginp.OpWrite,
Descriptor: loginp.FileDescriptor{
Filename: filename,
Info: testFileInfo{name: basename, size: 5}, // +5 bytes appended
},
}
requireEqualEvents(t, expEvent, e)
})
})

t.Run("does not emit an event for a fingerprint collision", func(t *testing.T) {
dir := t.TempDir()
paths := []string{filepath.Join(dir, "*.log")}
cfgStr := `
scanner:
check_interval: 100ms
check_interval: 10ms
fingerprint.enabled: true
`

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

fw := createWatcherWithConfig(t, paths, cfgStr)
Expand Down

0 comments on commit 8f32c34

Please sign in to comment.