Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for fileProspector in filestream input #21712

Merged
merged 4 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func newPathIdentifier(_ *common.Config) (fileIdentifier, error) {
}

func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
path := e.NewPath
if e.Op == loginp.OpDelete {
path = e.OldPath
}
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + p.name + identitySep + e.NewPath,
name: pluginName + identitySep + p.name + identitySep + path,
identifierGenerator: p.name,
}
}
Expand Down
10 changes: 7 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ type Harvester interface {

// HarvesterGroup is responsible for running the
// Harvesters started by the Prospector.
type HarvesterGroup struct {
type HarvesterGroup interface {
Run(input.Context, Source) error
}

type defaultHarvesterGroup struct {
manager *InputManager
readers map[string]context.CancelFunc
pipeline beat.PipelineConnector
Expand All @@ -54,7 +58,7 @@ type HarvesterGroup struct {
}

// Run starts the Harvester for a Source.
func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error {
func (hg *defaultHarvesterGroup) Run(ctx input.Context, s Source) error {
log := ctx.Logger.With("source", s.Name())
log.Debug("Starting harvester for file")

Expand Down Expand Up @@ -111,7 +115,7 @@ func (hg *HarvesterGroup) Run(ctx input.Context, s Source) error {
}

// Cancel stops the running Harvester for a given Source.
func (hg *HarvesterGroup) Cancel(s Source) error {
func (hg *defaultHarvesterGroup) Cancel(s Source) error {
if cancel, ok := hg.readers[s.Name()]; ok {
cancel()
return nil
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (inp *managedInput) Run(
store.Retain()
defer store.Release()

hg := &HarvesterGroup{
hg := &defaultHarvesterGroup{
pipeline: pipeline,
readers: make(map[string]context.CancelFunc),
manager: inp.manager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
type Prospector interface {
// Run starts the event loop and handles the incoming events
// either by starting/stopping a harvester, or updating the statestore.
Run(input.Context, *statestore.Store, *HarvesterGroup)
Run(input.Context, *statestore.Store, HarvesterGroup)
// Test checks if the Prospector is able to run the configuration
// specified by the user.
Test() error
Expand Down
15 changes: 7 additions & 8 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newFileProspector(
}

// Run starts the fileProspector which accepts FS events from a file watcher.
func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.HarvesterGroup) {
func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg loginp.HarvesterGroup) {
log := ctx.Logger.With("prospector", prospectorDebugKey)
log.Debug("Starting prospector")
defer log.Debug("Prospector has stopped")
Expand Down Expand Up @@ -100,8 +100,12 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.

src := p.identifier.GetSource(fe)
switch fe.Op {
case loginp.OpCreate:
log.Debugf("A new file %s has been found", fe.NewPath)
case loginp.OpCreate, loginp.OpWrite:
if fe.Op == loginp.OpCreate {
log.Debugf("A new file %s has been found", fe.NewPath)
} else if fe.Op == loginp.OpWrite {
log.Debugf("File %s has been updated", fe.NewPath)
}

if p.ignoreOlder > 0 {
now := time.Now()
Expand All @@ -113,11 +117,6 @@ func (p *fileProspector) Run(ctx input.Context, s *statestore.Store, hg *loginp.

hg.Run(ctx, src)

case loginp.OpWrite:
log.Debugf("File %s has been updated", fe.NewPath)

hg.Run(ctx, src)

case loginp.OpDelete:
log.Debugf("File %s has been removed", fe.OldPath)

Expand Down
197 changes: 197 additions & 0 deletions filebeat/input/filestream/prospector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/go-concert/unison"
)

func TestProspectorNewAndUpdatedFiles(t *testing.T) {
minuteAgo := time.Now().Add(-1 * time.Minute)

testCases := map[string]struct {
events []loginp.FSEvent
ignoreOlder time.Duration
expectedSources []string
}{
"two new files": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/file"},
loginp.FSEvent{Op: loginp.OpCreate, NewPath: "/path/to/other/file"},
},
expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"},
},
"one updated file": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpWrite, NewPath: "/path/to/file"},
},
expectedSources: []string{"filestream::path::/path/to/file"},
},
"old files with ignore older configured": {
events: []loginp.FSEvent{
loginp.FSEvent{
Op: loginp.OpCreate,
NewPath: "/path/to/file",
Info: testFileInfo{"/path/to/file", 5, minuteAgo},
},
loginp.FSEvent{
Op: loginp.OpWrite,
NewPath: "/path/to/other/file",
Info: testFileInfo{"/path/to/other/file", 5, minuteAgo},
},
},
ignoreOlder: 10 * time.Second,
expectedSources: []string{},
},
"newer files with ignore older": {
events: []loginp.FSEvent{
loginp.FSEvent{
Op: loginp.OpCreate,
NewPath: "/path/to/file",
Info: testFileInfo{"/path/to/file", 5, minuteAgo},
},
loginp.FSEvent{
Op: loginp.OpWrite,
NewPath: "/path/to/other/file",
Info: testFileInfo{"/path/to/other/file", 5, minuteAgo},
},
},
ignoreOlder: 5 * time.Minute,
expectedSources: []string{"filestream::path::/path/to/file", "filestream::path::/path/to/other/file"},
},
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
p := fileProspector{
filewatcher: &mockFileWatcher{events: test.events},
identifier: mustPathIdentifier(),
ignoreOlder: test.ignoreOlder,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}
hg := getTestHarvesterGroup()

p.Run(ctx, testStateStore(), hg)

assert.ElementsMatch(t, hg.encounteredNames, test.expectedSources)
})
}
}

func TestProspectorDeletedFile(t *testing.T) {
testCases := map[string]struct {
events []loginp.FSEvent
cleanRemoved bool
}{
"one deleted file without clean removed": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"},
},
cleanRemoved: false,
},
"one deleted file with clean removed": {
events: []loginp.FSEvent{
loginp.FSEvent{Op: loginp.OpDelete, OldPath: "/path/to/file"},
},
cleanRemoved: true,
},
}

for name, test := range testCases {
test := test

t.Run(name, func(t *testing.T) {
p := fileProspector{
filewatcher: &mockFileWatcher{events: test.events},
identifier: mustPathIdentifier(),
cleanRemoved: test.cleanRemoved,
}
ctx := input.Context{Logger: logp.L(), Cancelation: context.Background()}

testStore := testStateStore()
testStore.Set("filestream::path::/path/to/file", nil)

p.Run(ctx, testStore, getTestHarvesterGroup())

has, err := testStore.Has("filestream::path::/path/to/file")
if err != nil {
t.Fatal(err)
}

if test.cleanRemoved {
assert.False(t, has)
} else {
assert.True(t, has)

}
})
}
}

type testHarvesterGroup struct {
encounteredNames []string
}

func getTestHarvesterGroup() *testHarvesterGroup { return &testHarvesterGroup{make([]string, 0)} }

func (t *testHarvesterGroup) Run(_ input.Context, s loginp.Source) error {
t.encounteredNames = append(t.encounteredNames, s.Name())
return nil
}

type mockFileWatcher struct {
nextIdx int
events []loginp.FSEvent
}

func (m *mockFileWatcher) Event() loginp.FSEvent {
if len(m.events) == m.nextIdx {
return loginp.FSEvent{}
}
evt := m.events[m.nextIdx]
m.nextIdx++
return evt
}
func (m *mockFileWatcher) Run(_ unison.Canceler) { return }

func testStateStore() *statestore.Store {
s, _ := statestore.NewRegistry(storetest.NewMemoryStoreBackend()).Get(pluginName)
return s
}

func mustPathIdentifier() fileIdentifier {
pathIdentifier, err := newPathIdentifier(nil)
if err != nil {
panic(err)
}
return pathIdentifier

}