Skip to content

Commit

Permalink
Add helpers for filestream integration tests (#24469) (#24628)
Browse files Browse the repository at this point in the history
(cherry picked from commit 832ea77)
  • Loading branch information
kvch authored Mar 18, 2021
1 parent a5e41a6 commit 3ed8a25
Showing 1 changed file with 126 additions and 0 deletions.
126 changes: 126 additions & 0 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,69 @@ func (e *inputTestingEnvironment) mustWriteLinesToFile(filename string, lines []
}
}

func (e *inputTestingEnvironment) mustAppendLinesToFile(filename string, lines []byte) {
path := e.abspath(filename)
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
e.t.Fatalf("failed to open file '%s': %+v", path, err)
}
defer f.Close()

_, err = f.Write(lines)
if err != nil {
e.t.Fatalf("append lines to file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustRenameFile(oldname, newname string) {
err := os.Rename(e.abspath(oldname), e.abspath(newname))
if err != nil {
e.t.Fatalf("failed to rename file '%s': %+v", oldname, err)
}
}

func (e *inputTestingEnvironment) mustRemoveFile(filename string) {
path := e.abspath(filename)
err := os.Remove(path)
if err != nil {
e.t.Fatalf("failed to rename file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) mustSymlink(filename, symlinkname string) {
err := os.Symlink(e.abspath(filename), e.abspath(symlinkname))
if err != nil {
e.t.Fatalf("failed to create symlink to file '%s': %+v", filename, err)
}
}

func (e *inputTestingEnvironment) mustTruncateFile(filename string, size int64) {
path := e.abspath(filename)
err := os.Truncate(path, size)
if err != nil {
e.t.Fatalf("failed to truncate file '%s': %+v", path, err)
}
}

func (e *inputTestingEnvironment) abspath(filename string) string {
return filepath.Join(e.workingDir, filename)
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
actual += 1
return true, nil
})
if err != nil {
e.t.Fatalf("error while iterating through registry: %+v", err)
}

require.Equal(e.t, actual, expectedCount)
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expectedOffset int) {
filepath := e.abspath(filename)
Expand All @@ -131,6 +183,32 @@ func (e *inputTestingEnvironment) requireOffsetInRegistry(filename string, expec
require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath})

var entry registryEntry
err = inputStore.Get(src.Name(), &entry)
if err == nil {
e.t.Fatalf("key is not expected to be present '%s'", src.Name())
}
}

// requireOffsetInRegistry checks if the expected offset is set for a file.
func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expectedOffset int) {
entry := e.getRegistryState(key)

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) getRegistryState(key string) registryEntry {
inputStore, _ := e.stateStore.Access()

Expand All @@ -153,10 +231,58 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
if sum == count {
return
}
if count < sum {
e.t.Fatalf("too many events; expected: %d, actual: %d", count, sum)
}
time.Sleep(10 * time.Millisecond)
}
}

// waitUntilHarvesterIsDone detects Harvester stop by checking if the last client has been closed
// as when a Harvester stops the client is closed.
func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
for !e.pipeline.clients[len(e.pipeline.clients)-1].closed {
time.Sleep(10 * time.Millisecond)
}
}

// requireEventReceived requires that the list of messages has made it into the output.
func (e *inputTestingEnvironment) requireEventsReceived(events []string) {
foundEvents := make([]bool, len(events))
checkedEventCount := 0
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
if len(events) == checkedEventCount {
e.t.Fatalf("not enough expected elements")
}
message := evt.Fields["message"].(string)
if message == events[checkedEventCount] {
foundEvents[checkedEventCount] = true
}
checkedEventCount += 1
}
}

var missingEvents []string
for i, found := range foundEvents {
if !found {
missingEvents = append(missingEvents, events[i])
}
}

require.Equal(e.t, 0, len(missingEvents), "following events are missing: %+v", missingEvents)
}

func (e *inputTestingEnvironment) getOutputMessages() []string {
messages := make([]string, 0)
for _, c := range e.pipeline.clients {
for _, evt := range c.GetEvents() {
messages = append(messages, evt.Fields["message"].(string))
}
}
return messages
}

type testInputStore struct {
registry *statestore.Registry
}
Expand Down

0 comments on commit 3ed8a25

Please sign in to comment.