Skip to content

Commit

Permalink
fswatcher: use mod time + file size to detect changes (#35759)
Browse files Browse the repository at this point in the history
fswatcher now uses the modification time and changes in the file size
to trigger either a truncate or write event.

Other small changes:
- Fixed some log formatting issue and only log the error if it
is not `context.Canceled`
- Fixes and improves some log messages.
- Brings the improvements from
#35754 to this PR.
  • Loading branch information
belimawr authored Jun 20, 2023
1 parent 9f35454 commit bdb67bc
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 31 deletions.
82 changes: 71 additions & 11 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package filestream

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -64,6 +66,24 @@ type registryEntry struct {
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
if err := logp.DevelopmentSetup(logp.ToObserverOutput()); err != nil {
t.Fatalf("error setting up dev logging: %s", err)
}

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range logp.ObserverLogs().TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})

return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
Expand Down Expand Up @@ -193,6 +213,8 @@ func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID string, expectedOffset int) {
e.t.Helper()
var offsetStr strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
Expand All @@ -201,16 +223,23 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri

id := getIDFromPath(filepath, inputID, fi)
var entry registryEntry
require.Eventually(e.t, func() bool {
require.Eventuallyf(e.t, func() bool {
offsetStr.Reset()

entry, err = e.getRegistryState(id)
if err != nil {
return true
e.t.Fatalf("could not get state for '%s' from registry, err: %s", id, err)
}

fmt.Fprint(&offsetStr, entry.Cursor.Offset)

return expectedOffset == entry.Cursor.Offset
}, time.Second, time.Millisecond)
require.NoError(e.t, err)
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
},
time.Second,
100*time.Millisecond,
"expected offset: '%d', cursor offset: '%s'",
expectedOffset,
&offsetStr)
}

// requireMetaInRegistry checks if the expected metadata is saved to the registry.
Expand Down Expand Up @@ -250,20 +279,51 @@ func requireMetadataEquals(one, other fileMeta) bool {
}

// waitUntilOffsetInRegistry waits for the expected offset is set for a file.
func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename, inputID string, expectedOffset int) {
// If timeout is reached or there is an error getting the state from the
// registry, the test fails
func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(
filename, inputID string,
expectedOffset int,
timeout time.Duration) {

var cursorString strings.Builder
var fileSizeString strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

id := getIDFromPath(filepath, inputID, fi)
entry, err := e.getRegistryState(id)
for err != nil || entry.Cursor.Offset != expectedOffset {
entry, err = e.getRegistryState(id)
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
require.Eventuallyf(e.t, func() bool {
cursorString.Reset()
fileSizeString.Reset()

entry, err := e.getRegistryState(id)
if err != nil {
e.t.Fatalf(
"error getting state for ID '%s' from the registry, err: %s",
id, err)
}

fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("could not stat '%s', err: %s", filepath, err)
}

fileSizeString.WriteString(fmt.Sprint(fi.Size()))
cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset))

return entry.Cursor.Offset == expectedOffset
},
timeout,
100*time.Millisecond,
"expected offset: '%d', cursor offset: '%s', file size: '%s'",
expectedOffset,
&cursorString,
&fileSizeString)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID string) {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ func (f *logFile) periodicStateCheck(ctx unison.Canceler) {
return nil
})
if err != nil {
f.log.Errorf("failed to schedule a periodic state check: %w", err)
if !errors.Is(err, context.Canceled) {
f.log.Errorf("failed to schedule a periodic state check: %s", err)
}
}
}

Expand Down
10 changes: 7 additions & 3 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
}

// if the two infos belong to the same file and it has been modified
// if the size is smaller than before, it is truncated, if bigger, it is a write event
if prevInfo.ModTime() != info.ModTime() {
// if the size is smaller than before, it is truncated, if bigger, it is a write event.
// It might happen that a file is truncated and then more data is added, both
// within the same second, this will make the reader stop, but a new one will not
// start because the modification data is the same, to avoid this situation,
// we also check for size changes here.
if prevInfo.ModTime() != info.ModTime() || prevInfo.Size() != info.Size() {
if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -392,7 +396,7 @@ func (s *fileScanner) isOriginalAndSymlinkConfigured(file string, uniqFileID map
}
fileID := file_helper.GetOSState(fileInfo).String()
if finfo, exists := uniqFileID[fileID]; exists {
s.log.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name())
s.log.Infof("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name())
return true
}
uniqFileID[fileID] = fileInfo
Expand Down
71 changes: 71 additions & 0 deletions filebeat/input/filestream/fswatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,77 @@ func TestFileWatchNewDeleteModified(t *testing.T) {
}
}

func TestFileWatcherTruncate(t *testing.T) {
oldTs := time.Now()
newTs := oldTs.Add(time.Second)
testCases := map[string]struct {
prevFiles map[string]os.FileInfo
nextFiles map[string]os.FileInfo
expectedEvents []loginp.FSEvent
}{
"truncated file, only size changes": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 0, oldTs, nil},
},
expectedEvents: []loginp.FSEvent{
{Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, oldTs, nil}},
},
},
"truncated file, mod time and size changes": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 0, newTs, nil},
},
expectedEvents: []loginp.FSEvent{
{Op: loginp.OpTruncate, OldPath: "path", NewPath: "path", Info: testFileInfo{"path", 0, newTs, nil}},
},
},
"no file change": {
prevFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
nextFiles: map[string]os.FileInfo{
"path": testFileInfo{"path", 42, oldTs, nil},
},
expectedEvents: []loginp.FSEvent{},
},
}

for name, test := range testCases {
t.Run(name, func(t *testing.T) {
w := fileWatcher{
log: logp.L(),
prev: test.prevFiles,
scanner: &mockScanner{test.nextFiles},
events: make(chan loginp.FSEvent, len(test.expectedEvents)),
sameFileFunc: testSameFile,
}

w.watch(context.Background())
close(w.events)

actual := []loginp.FSEvent{}
for evt := range w.events {
actual = append(actual, evt)
}

if len(actual) != len(test.expectedEvents) {
t.Fatalf("expecting %d elements, got %d", len(test.expectedEvents), len(actual))
}
for i := range test.expectedEvents {
if test.expectedEvents[i] != actual[i] {
t.Errorf("element [%d] differ. Expecting:\n%#v\nGot:\n%#v\n", i, test.expectedEvents[i], actual[i])
}
}
})
}
}

type mockScanner struct {
files map[string]os.FileInfo
}
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ func (inp *filestream) readFromSource(
message, err := r.Next()
if err != nil {
if errors.Is(err, ErrFileTruncate) {
log.Infof("File was truncated. Begin reading file from offset 0. Path=%s", path)
log.Infof("File was truncated, nothing to read. Path='%s'", path)
} else if errors.Is(err, ErrClosed) {
log.Info("Reader was closed. Closing.")
log.Infof("Reader was closed. Closing. Path='%s'", path)
} else if errors.Is(err, io.EOF) {
log.Debugf("EOF has been reached. Closing.")
log.Debugf("EOF has been reached. Closing. Path='%s'", path)
} else {
log.Errorf("Read line error: %v", err)
metrics.ProcessingErrors.Inc()
Expand Down
19 changes: 9 additions & 10 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand All @@ -716,10 +716,9 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.resend_on_touch": "true",
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "200ms",
})

testlines := []byte("first line\nsecond line\n")
Expand All @@ -743,7 +742,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// all newly started client has to be cancelled so events can be processed
env.pipeline.cancelAllClients()
Expand All @@ -754,7 +753,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
env.mustWriteToFile(testlogName, truncatedTestLines)

env.waitUntilEventCount(3)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines))
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down Expand Up @@ -914,15 +913,15 @@ func TestFilestreamTruncate(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines))
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down
6 changes: 4 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package input_logfile

import (
"context"
"errors"
"time"

"github.com/elastic/go-concert/timed"
Expand Down Expand Up @@ -48,8 +50,8 @@ func (c *cleaner) run(canceler unison.Canceler, store *store, interval time.Dura
gcStore(c.log, started, store)
return nil
})
if err != nil {
c.log.Errorf("failed to start the registry cleaning routine: %w", err)
if err != nil && !errors.Is(err, context.Canceled) {
c.log.Errorf("failed to start the registry cleaning routine: %s", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func lockResource(log *logp.Logger, resource *resource, canceler inputv2.Cancele
if !resource.lock.TryLock() {
log.Infof("Resource '%v' currently in use, waiting...", resource.key)
err := resource.lock.LockContext(canceler)
log.Infof("Resource '%v' finally released. Lock acquired", resource.key)
if err != nil {
log.Infof("Input for resource '%v' has been stopped while waiting", resource.key)
return err
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (p *fileProspector) onFSEvent(
group.Start(ctx, src)

case loginp.OpTruncate:
log.Debugf("File %s has been truncated", event.NewPath)
log.Debugf("File %s has been truncated setting offset to 0", event.NewPath)

err := updater.ResetCursor(src, state{Offset: 0})
if err != nil {
Expand Down

0 comments on commit bdb67bc

Please sign in to comment.