From e13e183a02686c377db88e288c2e72c17e4b4ce1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 13 Jun 2023 17:59:41 +0200 Subject: [PATCH] More testing improvements This commit brings the improvements from https://github.com/elastic/beats/pull/35754 to this PR. --- filebeat/input/filestream/environment_test.go | 61 ++++++++++++++++--- .../filestream/input_integration_test.go | 14 ++--- 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 158eb30699de..34432fce340c 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -21,9 +21,11 @@ package filestream import ( "context" + "encoding/json" "fmt" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -64,6 +66,24 @@ type registryEntry struct { } func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { + if err := logp.DevelopmentSetup(logp.ToObserverOutput()); err != nil { + t.Fatalf("error setting up dev logging: %s", err) + } + + t.Cleanup(func() { + if t.Failed() { + t.Logf("Debug Logs:\n") + for _, log := range logp.ObserverLogs().TakeAll() { + data, err := json.Marshal(log) + if err != nil { + t.Errorf("failed encoding log as JSON: %s", err) + } + t.Logf("%s", string(data)) + } + return + } + }) + return &inputTestingEnvironment{ t: t, workingDir: t.TempDir(), @@ -194,6 +214,8 @@ func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) { // requireOffsetInRegistry checks if the expected offset is set for a file. func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID string, expectedOffset int) { e.t.Helper() + var offsetStr strings.Builder + filepath := e.abspath(filename) fi, err := os.Stat(filepath) if err != nil { @@ -202,16 +224,23 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri id := getIDFromPath(filepath, inputID, fi) var entry registryEntry - require.Eventually(e.t, func() bool { + require.Eventuallyf(e.t, func() bool { + offsetStr.Reset() + entry, err = e.getRegistryState(id) if err != nil { - return true + e.t.Fatalf("could not get state for '%s' from registry, err: %s", id, err) } + fmt.Fprint(&offsetStr, entry.Cursor.Offset) + return expectedOffset == entry.Cursor.Offset - }, time.Second, time.Millisecond) - require.NoError(e.t, err) - require.Equal(e.t, expectedOffset, entry.Cursor.Offset) + }, + time.Second, + 100*time.Millisecond, + "expected offset: '%d', cursor offset: '%s'", + expectedOffset, + &offsetStr) } // requireMetaInRegistry checks if the expected metadata is saved to the registry. @@ -258,6 +287,9 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry( expectedOffset int, timeout time.Duration) { + var cursorString strings.Builder + var fileSizeString strings.Builder + filepath := e.abspath(filename) fi, err := os.Stat(filepath) if err != nil { @@ -267,19 +299,32 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry( id := getIDFromPath(filepath, inputID, fi) require.Eventuallyf(e.t, func() bool { + cursorString.Reset() + fileSizeString.Reset() + entry, err := e.getRegistryState(id) if err != nil { e.t.Fatalf( "error getting state for ID '%s' from the registry, err: %s", id, err) } + + fi, err := os.Stat(filepath) + if err != nil { + e.t.Fatalf("could not stat '%s', err: %s", filepath, err) + } + + fileSizeString.WriteString(fmt.Sprint(fi.Size())) + cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset)) + return entry.Cursor.Offset == expectedOffset }, timeout, 100*time.Millisecond, - "expecting registry offset for '%s' to be: %d", - filename, - expectedOffset) + "expected offset: '%d', cursor offset: '%s', file size: '%s'", + expectedOffset, + &cursorString, + &fileSizeString) } func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID string) { diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 1237f808167d..32c36261e4f1 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -637,7 +637,7 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) { // remove symlink env.mustRemoveFile(symlinkName) env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) moreLines := []byte("forth line\nfifth line\n") env.mustWriteToFile(testlogName, moreLines) @@ -704,7 +704,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) { env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) cancelInput() env.waitUntilInputStops() @@ -718,7 +718,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { inp := env.mustCreateInput(map[string]interface{}{ "id": "fake-ID", "paths": []string{env.abspath(testlogName)}, - "prospector.scanner.check_interval": "100ms", + "prospector.scanner.check_interval": "200ms", }) testlines := []byte("first line\nsecond line\n") @@ -747,7 +747,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) // all newly started client has to be cancelled so events can be processed env.pipeline.cancelAllClients() @@ -758,7 +758,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) { env.mustWriteToFile(testlogName, truncatedTestLines) env.waitUntilEventCount(3) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 10*time.Second) cancelInput() env.waitUntilInputStops() @@ -918,7 +918,7 @@ func TestFilestreamTruncate(t *testing.T) { // remove symlink env.mustRemoveFile(symlinkName) env.mustTruncateFile(testlogName, 0) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second) // recreate symlink env.mustSymlink(testlogName, symlinkName) @@ -926,7 +926,7 @@ func TestFilestreamTruncate(t *testing.T) { moreLines := []byte("forth line\nfifth line\n") env.mustWriteToFile(testlogName, moreLines) - env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 30*time.Second) + env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 10*time.Second) cancelInput() env.waitUntilInputStops()