Skip to content

Commit

Permalink
More testing improvements
Browse files Browse the repository at this point in the history
This commit brings the improvements from
elastic#35754 to this PR.
  • Loading branch information
belimawr committed Jun 13, 2023
1 parent e16c542 commit e13e183
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
61 changes: 53 additions & 8 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package filestream

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -64,6 +66,24 @@ type registryEntry struct {
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
if err := logp.DevelopmentSetup(logp.ToObserverOutput()); err != nil {
t.Fatalf("error setting up dev logging: %s", err)
}

t.Cleanup(func() {
if t.Failed() {
t.Logf("Debug Logs:\n")
for _, log := range logp.ObserverLogs().TakeAll() {
data, err := json.Marshal(log)
if err != nil {
t.Errorf("failed encoding log as JSON: %s", err)
}
t.Logf("%s", string(data))
}
return
}
})

return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
Expand Down Expand Up @@ -194,6 +214,8 @@ func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID string, expectedOffset int) {
e.t.Helper()
var offsetStr strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
Expand All @@ -202,16 +224,23 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename, inputID stri

id := getIDFromPath(filepath, inputID, fi)
var entry registryEntry
require.Eventually(e.t, func() bool {
require.Eventuallyf(e.t, func() bool {
offsetStr.Reset()

entry, err = e.getRegistryState(id)
if err != nil {
return true
e.t.Fatalf("could not get state for '%s' from registry, err: %s", id, err)
}

fmt.Fprint(&offsetStr, entry.Cursor.Offset)

return expectedOffset == entry.Cursor.Offset
}, time.Second, time.Millisecond)
require.NoError(e.t, err)
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
},
time.Second,
100*time.Millisecond,
"expected offset: '%d', cursor offset: '%s'",
expectedOffset,
&offsetStr)
}

// requireMetaInRegistry checks if the expected metadata is saved to the registry.
Expand Down Expand Up @@ -258,6 +287,9 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(
expectedOffset int,
timeout time.Duration) {

var cursorString strings.Builder
var fileSizeString strings.Builder

filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
Expand All @@ -267,19 +299,32 @@ func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(
id := getIDFromPath(filepath, inputID, fi)

require.Eventuallyf(e.t, func() bool {
cursorString.Reset()
fileSizeString.Reset()

entry, err := e.getRegistryState(id)
if err != nil {
e.t.Fatalf(
"error getting state for ID '%s' from the registry, err: %s",
id, err)
}

fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("could not stat '%s', err: %s", filepath, err)
}

fileSizeString.WriteString(fmt.Sprint(fi.Size()))
cursorString.WriteString(fmt.Sprint(entry.Cursor.Offset))

return entry.Cursor.Offset == expectedOffset
},
timeout,
100*time.Millisecond,
"expecting registry offset for '%s' to be: %d",
filename,
expectedOffset)
"expected offset: '%d', cursor offset: '%s', file size: '%s'",
expectedOffset,
&cursorString,
&fileSizeString)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID string) {
Expand Down
14 changes: 7 additions & 7 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func TestFilestreamTruncateWithSymlink(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestFilestreamTruncateCheckOffset(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand All @@ -718,7 +718,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
inp := env.mustCreateInput(map[string]interface{}{
"id": "fake-ID",
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "100ms",
"prospector.scanner.check_interval": "200ms",
})

testlines := []byte("first line\nsecond line\n")
Expand Down Expand Up @@ -747,7 +747,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {

env.mustTruncateFile(testlogName, 0)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// all newly started client has to be cancelled so events can be processed
env.pipeline.cancelAllClients()
Expand All @@ -758,7 +758,7 @@ func TestFilestreamTruncateBlockedOutput(t *testing.T) {
env.mustWriteToFile(testlogName, truncatedTestLines)

env.waitUntilEventCount(3)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(truncatedTestLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down Expand Up @@ -918,15 +918,15 @@ func TestFilestreamTruncate(t *testing.T) {
// remove symlink
env.mustRemoveFile(symlinkName)
env.mustTruncateFile(testlogName, 0)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", 0, 10*time.Second)

// recreate symlink
env.mustSymlink(testlogName, symlinkName)

moreLines := []byte("forth line\nfifth line\n")
env.mustWriteToFile(testlogName, moreLines)

env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 30*time.Second)
env.waitUntilOffsetInRegistry(testlogName, "fake-ID", len(moreLines), 10*time.Second)

cancelInput()
env.waitUntilInputStops()
Expand Down

0 comments on commit e13e183

Please sign in to comment.