Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filestream benchmarks for many files case, fix data race #37345

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type fileMeta struct {
type filestream struct {
readerConfig readerConfig
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parsers parser.Config
}
Expand Down Expand Up @@ -175,7 +174,7 @@ func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state {
}

func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSource, offset int64) (reader.Reader, error) {
f, err := inp.openFile(log, fs.newPath, offset)
f, encoding, err := inp.openFile(log, fs.newPath, offset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
Codec: encoding,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
Expand All @@ -241,33 +240,33 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, error) {
func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*os.File, encoding.Encoding, error) {
fi, err := os.Stat(path)
if err != nil {
return nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

// it must be checked if the file is not a named pipe before we try to open it
// if it is a named pipe os.OpenFile fails, so there is no need to try opening it.
if fi.Mode()&os.ModeNamedPipe != 0 {
return nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
return nil, nil, fmt.Errorf("failed to open file %s, named pipes are not supported", fi.Name())
}

ok := false
f, err := file.ReadOpen(path)
if err != nil {
return nil, fmt.Errorf("failed opening %s: %w", path, err)
return nil, nil, fmt.Errorf("failed opening %s: %w", path, err)
}
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))

fi, err = f.Stat()
if err != nil {
return nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
return nil, nil, fmt.Errorf("failed to stat source file %s: %w", path, err)
}

err = checkFileBeforeOpening(fi)
if err != nil {
return nil, err
return nil, nil, err
}

if fi.Size() < offset {
Expand All @@ -276,20 +275,20 @@ func (inp *filestream) openFile(log *logp.Logger, path string, offset int64) (*o
}
err = inp.initFileOffset(f, offset)
if err != nil {
return nil, err
return nil, nil, err
}

inp.encoding, err = inp.encodingFactory(f)
encoding, err := inp.encodingFactory(f)
if err != nil {
f.Close()
if errors.Is(err, transform.ErrShortSrc) {
return nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f)
}
return nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
return nil, nil, fmt.Errorf("initialising encoding for '%v' failed: %w", f, err)
}
ok = true

return f, nil
return f, encoding, nil
}

func checkFileBeforeOpening(fi os.FileInfo) error {
Expand Down
146 changes: 98 additions & 48 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
Expand All @@ -38,25 +39,26 @@ import (

func BenchmarkFilestream(b *testing.B) {
logp.TestingSetup(logp.ToDiscardOutput())
lineCount := 10000
filename := generateFile(b, lineCount)

b.ResetTimer()
b.Run("single file", func(b *testing.B) {
lineCount := 10000
filename := generateFile(b, b.TempDir(), lineCount)
b.ResetTimer()

b.Run("filestream default throughput", func(b *testing.B) {
cfg := `
b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount)
}
})
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-inode-benchmark-%d", i), cfg, lineCount)
}
})

b.Run("filestream fingerprint throughput", func(b *testing.B) {
cfg := `
b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
Expand All @@ -65,9 +67,51 @@ file_identity.fingerprint: ~
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount)
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-fp-benchmark-%d", i), cfg, lineCount)
}
})
})

b.Run("many files", func(b *testing.B) {
lineCount := 1000
fileCount := 100
dir := b.TempDir()

for i := 0; i < fileCount; i++ {
_ = generateFile(b, dir, lineCount)
}

ingestPath := filepath.Join(dir, "*")
expEvents := lineCount * fileCount
b.ResetTimer()

b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-inode-benchmark-%d", i), cfg, expEvents)
}
})

b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-fp-benchmark-%d", i), cfg, expEvents)
}
})
})
}

Expand All @@ -76,23 +120,25 @@ paths:
// `cfg` must be a valid YAML string containing valid filestream configuration
// `expEventCount` is an expected amount of produced events
func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) {
b.Helper()
// we don't include initialization in the benchmark time
b.StopTimer()
runner := createFilestreamTestRunner(b, testID, cfg, expEventCount)
runner := createFilestreamTestRunner(context.Background(), b, testID, cfg, int64(expEventCount), false)
// this is where the benchmark actually starts
b.StartTimer()
events := runner(b)
require.Len(b, events, expEventCount)
_ = runner(b)
}

// createFilestreamTestRunner can be used for both benchmarks and regular tests to run a filestream input
// with the given configuration and event limit.
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
// `eventLimit` is an amount of produced events after which the filestream will shutdown
// `collectEvents` if `true` the runner will return a list of all events produced by the filestream input.
// Events should not be collected in benchmarks due to high extra costs of using the channel.
//
// returns a runner function that returns produced events.
func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLimit int) func(t testing.TB) []beat.Event {
func createFilestreamTestRunner(ctx context.Context, b testing.TB, testID string, cfg string, eventLimit int64, collectEvents bool) func(t testing.TB) []beat.Event {
logger := logp.L()
c, err := conf.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(b, err)
Expand All @@ -101,41 +147,43 @@ func createFilestreamTestRunner(b testing.TB, testID string, cfg string, eventLi
input, err := p.Manager.Create(c)
require.NoError(b, err)

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
context := v2.Context{
Logger: logger,
ID: testID,
Cancelation: ctx,
}

events := make([]beat.Event, 0, eventLimit)
connector, eventsDone := newTestPipeline(eventLimit, &events)
done := make(chan struct{})
connector, events := newTestPipeline(eventLimit, collectEvents)
var out []beat.Event
if collectEvents {
out = make([]beat.Event, 0, eventLimit)
}
go func() {
// even if `collectEvents` is false we need to range the channel
// and wait until it's closed indicating that the input finished its job
for event := range events {
out = append(out, event)
}
cancel()
}()

return func(t testing.TB) []beat.Event {
go func() {
err := input.Run(context, connector)
assert.NoError(b, err)
close(done)
}()
err := input.Run(context, connector)
require.NoError(b, err)

<-eventsDone
cancel()
<-done // for more stable results we should wait until the full shutdown
return events
return out
}
}

func generateFile(t testing.TB, lineCount int) string {
func generateFile(t testing.TB, dir string, lineCount int) string {
t.Helper()
dir := t.TempDir()
file, err := os.CreateTemp(dir, "lines.log")
file, err := os.CreateTemp(dir, "*")
require.NoError(t, err)

filename := file.Name()
for i := 0; i < lineCount; i++ {
fmt.Fprintf(file, "rather mediocre log line message - %d\n", i)
fmt.Fprintf(file, "rather mediocre log line message in %s - %d\n", filename, i)
}
filename := file.Name()
err = file.Close()
require.NoError(t, err)
return filename
Expand All @@ -161,15 +209,15 @@ func (s *testStore) CleanupInterval() time.Duration {
return time.Second
}

func newTestPipeline(eventLimit int, out *[]beat.Event) (pc beat.PipelineConnector, done <-chan struct{}) {
ch := make(chan struct{})
return &testPipeline{limit: eventLimit, done: ch, out: out}, ch
func newTestPipeline(eventLimit int64, collectEvents bool) (pc beat.PipelineConnector, out <-chan beat.Event) {
ch := make(chan beat.Event, eventLimit)
return &testPipeline{limit: eventLimit, out: ch, collect: collectEvents}, ch
}

type testPipeline struct {
done chan struct{}
limit int
out *[]beat.Event
limit int64
out chan beat.Event
collect bool
}

func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) {
Expand All @@ -184,13 +232,15 @@ type testClient struct {
}

func (c *testClient) Publish(event beat.Event) {
c.testPipeline.limit--
if c.testPipeline.limit < 0 {
newLimit := atomic.AddInt64(&c.testPipeline.limit, -1)
if newLimit < 0 {
return
}
*c.testPipeline.out = append(*c.testPipeline.out, event)
if c.testPipeline.limit == 0 {
close(c.testPipeline.done)
if c.testPipeline.collect {
c.testPipeline.out <- event
}
if newLimit == 0 {
close(c.testPipeline.out)
}
}

Expand Down