From 371b1bae79abe1833f3c297343f17fff362bd689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 8 Mar 2021 17:42:13 +0100 Subject: [PATCH 1/3] Fix support of symlinking in filestream input --- .../filestream/input_integration_test.go | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index c768abfa32b0..dab84d60714f 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -24,6 +24,7 @@ import ( "context" "os" "runtime" + "strconv" "testing" "time" @@ -736,3 +737,174 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { cancelInput() env.waitUntilInputStops() } + +// test_symlinks_enabled from test_harvester.py +func TestFilestreamSymlinksEnabled(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(testlogName), + env.abspath(symlinkName), + }, + "prospector.scanner.symlinks": "true", + }) + + testlines := []byte("first line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + + cancelInput() + env.waitUntilInputStops() + + env.requireOffsetInRegistry(testlogName, len(testlines)) + env.requireOffsetInRegistry(symlinkName, len(testlines)) +} + +// test_symlink_rotated from test_harvester.py +func TestFilestreamSymlinkRotated(t *testing.T) { + env := newInputTestingEnvironment(t) + + firstTestlogName := "test1.log" + secondTestlogName := "test2.log" + symlinkName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(firstTestlogName), + env.abspath(secondTestlogName), + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + }) + + commonLine := "first line in file " + for i, path := range []string{firstTestlogName, secondTestlogName} { + env.mustWriteLinesToFile(path, []byte(commonLine+strconv.Itoa(i)+"\n")) + } + + env.mustSymlink(firstTestlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(2) + + expectedOffset := len(commonLine) + 2 + env.requireOffsetInRegistry(firstTestlogName, expectedOffset) + env.requireOffsetInRegistry(secondTestlogName, expectedOffset) + + // rotate symlink + env.mustRemoveFile(symlinkName) + env.mustSymlink(secondTestlogName, symlinkName) + + moreLines := "second line in file 2\nthird line in file 2\n" + env.mustAppendLinesToFile(secondTestlogName, []byte(moreLines)) + + env.waitUntilEventCount(4) + env.requireOffsetInRegistry(firstTestlogName, expectedOffset) + env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines)) + env.requireOffsetInRegistry(symlinkName, expectedOffset+len(moreLines)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(2) +} + +// test_symlink_removed from test_harvester.py +func TestFilestreamSymlinkRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(testlogName), + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + "close.on_state_change.removed": "false", + "clean_removed": "false", + }) + + line := []byte("first line\n") + env.mustWriteLinesToFile(testlogName, line) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(1) + + env.requireOffsetInRegistry(testlogName, len(line)) + + // remove symlink + env.mustRemoveFile(symlinkName) + + env.mustAppendLinesToFile(testlogName, line) + + env.waitUntilEventCount(2) + env.requireOffsetInRegistry(testlogName, 2*len(line)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(1) +} + +// test_truncate from test_harvester.py +func TestFilestreamTruncate(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + symlinkName := "test.log.symlink" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{ + env.abspath(testlogName), + env.abspath(symlinkName), + }, + "prospector.scanner.check_interval": "1ms", + "prospector.scanner.symlinks": "true", + }) + + lines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, lines) + + env.mustSymlink(testlogName, symlinkName) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + + env.requireOffsetInRegistry(testlogName, len(lines)) + + // remove symlink + env.mustRemoveFile(symlinkName) + env.mustTruncateFile(testlogName, 0) + env.waitUntilOffsetInRegistry(testlogName, 0) + + moreLines := []byte("forth line\nfifth line\n") + env.mustWriteLinesToFile(testlogName, moreLines) + + env.waitUntilEventCount(5) + env.requireOffsetInRegistry(testlogName, len(moreLines)) + + cancelInput() + env.waitUntilInputStops() + + env.requireRegistryEntryCount(1) +} From 280f4d231aee98d0547336a8a9d555128cfae16d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 8 Apr 2021 14:59:17 +0200 Subject: [PATCH 2/3] remove accidentally added files from paths --- filebeat/input/filestream/input_integration_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index dab84d60714f..78f77a4fff47 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -746,7 +746,6 @@ func TestFilestreamSymlinksEnabled(t *testing.T) { symlinkName := "test.log.symlink" inp := env.mustCreateInput(map[string]interface{}{ "paths": []string{ - env.abspath(testlogName), env.abspath(symlinkName), }, "prospector.scanner.symlinks": "true", @@ -766,7 +765,6 @@ func TestFilestreamSymlinksEnabled(t *testing.T) { env.waitUntilInputStops() env.requireOffsetInRegistry(testlogName, len(testlines)) - env.requireOffsetInRegistry(symlinkName, len(testlines)) } // test_symlink_rotated from test_harvester.py @@ -778,8 +776,6 @@ func TestFilestreamSymlinkRotated(t *testing.T) { symlinkName := "test.log" inp := env.mustCreateInput(map[string]interface{}{ "paths": []string{ - env.abspath(firstTestlogName), - env.abspath(secondTestlogName), env.abspath(symlinkName), }, "prospector.scanner.check_interval": "1ms", @@ -798,11 +794,10 @@ func TestFilestreamSymlinkRotated(t *testing.T) { ctx, cancelInput := context.WithCancel(context.Background()) env.startInput(ctx, inp) - env.waitUntilEventCount(2) + env.waitUntilEventCount(1) expectedOffset := len(commonLine) + 2 env.requireOffsetInRegistry(firstTestlogName, expectedOffset) - env.requireOffsetInRegistry(secondTestlogName, expectedOffset) // rotate symlink env.mustRemoveFile(symlinkName) @@ -814,7 +809,6 @@ func TestFilestreamSymlinkRotated(t *testing.T) { env.waitUntilEventCount(4) env.requireOffsetInRegistry(firstTestlogName, expectedOffset) env.requireOffsetInRegistry(secondTestlogName, expectedOffset+len(moreLines)) - env.requireOffsetInRegistry(symlinkName, expectedOffset+len(moreLines)) cancelInput() env.waitUntilInputStops() @@ -830,7 +824,6 @@ func TestFilestreamSymlinkRemoved(t *testing.T) { symlinkName := "test.log.symlink" inp := env.mustCreateInput(map[string]interface{}{ "paths": []string{ - env.abspath(testlogName), env.abspath(symlinkName), }, "prospector.scanner.check_interval": "1ms", @@ -873,8 +866,7 @@ func TestFilestreamTruncate(t *testing.T) { symlinkName := "test.log.symlink" inp := env.mustCreateInput(map[string]interface{}{ "paths": []string{ - env.abspath(testlogName), - env.abspath(symlinkName), + env.abspath("*"), }, "prospector.scanner.check_interval": "1ms", "prospector.scanner.symlinks": "true", From 179a7a1603276c7b7bae24284021e232516325c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Apr 2021 18:20:25 +0200 Subject: [PATCH 3/3] add missing new symlinking --- filebeat/input/filestream/input_integration_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 78f77a4fff47..ab9ed187bd73 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -889,6 +889,9 @@ func TestFilestreamTruncate(t *testing.T) { env.mustTruncateFile(testlogName, 0) env.waitUntilOffsetInRegistry(testlogName, 0) + // recreate symlink + env.mustSymlink(testlogName, symlinkName) + moreLines := []byte("forth line\nfifth line\n") env.mustWriteLinesToFile(testlogName, moreLines)