From db5e52674eb78662670f1aa948af93d0cc7f1332 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 6 Dec 2023 18:11:19 +0100 Subject: [PATCH 1/3] Add benchmark for filestream input Now we can quickly compare performance metrics when we make changes to the filestream implementation without running the whole Filebeat. --- filebeat/input/filestream/input_test.go | 175 ++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 filebeat/input/filestream/input_test.go diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go new file mode 100644 index 00000000000..982107bc73f --- /dev/null +++ b/filebeat/input/filestream/input_test.go @@ -0,0 +1,175 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// runFilestreamBenchmark runs the entire filestream input with the in-memory registry and the test pipeline. +// `testID` must be unique for each test run +// `cfg` must be a valid YAML string containing valid filestream configuration +// `expEventCount` is an expected amount of produced events +func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) { + logger := logp.L() + c, err := conf.NewConfigWithYAML([]byte(cfg), cfg) + require.NoError(b, err) + + p := Plugin(logger, createTestStore(b)) + input, err := p.Manager.Create(c) + require.NoError(b, err) + + ctx, cancel := context.WithCancel(context.Background()) + context := v2.Context{ + Logger: logger, + ID: testID, + Cancelation: ctx, + } + + connector, eventsDone := newTestPipeline(expEventCount) + done := make(chan struct{}) + go func() { + err := input.Run(context, connector) + assert.NoError(b, err) + done <- struct{}{} + }() + + <-eventsDone + cancel() + <-done // for more stable results we should wait until the full shutdown +} + +func generateFile(b *testing.B, lineCount int) string { + b.Helper() + dir := b.TempDir() + file, err := os.CreateTemp(dir, "lines.log") + require.NoError(b, err) + + for i := 0; i < lineCount; i++ { + fmt.Fprintf(file, "rather mediocre log line message - %d\n", i) + } + filename := file.Name() + err = file.Close() + require.NoError(b, err) + return filename +} + +func BenchmarkFilestream(b *testing.B) { + logp.TestingSetup(logp.ToDiscardOutput()) + lineCount := 10000 + filename := generateFile(b, lineCount) + + b.Run("filestream default throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner.check_interval: 1s +paths: + - ` + filename + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("default-benchmark-%d", i), cfg, lineCount) + } + }) + + b.Run("filestream fingerprint throughput", func(b *testing.B) { + cfg := ` +type: filestream +prospector.scanner: + fingerprint.enabled: true + check_interval: 1s +file_identity.fingerprint: ~ +paths: + - ` + filename + ` +` + for i := 0; i < b.N; i++ { + runFilestreamBenchmark(b, fmt.Sprintf("fp-benchmark-%d", i), cfg, lineCount) + } + }) +} + +func createTestStore(t *testing.B) loginp.StateStore { + return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())} +} + +type testStore struct { + registry *statestore.Registry +} + +func (s *testStore) Close() { + s.registry.Close() +} + +func (s *testStore) Access() (*statestore.Store, error) { + return s.registry.Get("filestream-benchmark") +} + +func (s *testStore) CleanupInterval() time.Duration { + return time.Second +} + +func newTestPipeline(eventLimit int) (pc beat.PipelineConnector, done <-chan struct{}) { + ch := make(chan struct{}) + return &testPipeline{limit: eventLimit, done: ch}, ch +} + +type testPipeline struct { + done chan struct{} + limit int +} + +func (p *testPipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { + return p.Connect() +} +func (p *testPipeline) Connect() (beat.Client, error) { + return &testClient{p}, nil +} + +type testClient struct { + testPipeline *testPipeline +} + +func (c *testClient) Publish(event beat.Event) { + c.testPipeline.limit-- + if c.testPipeline.limit == 0 { + c.testPipeline.done <- struct{}{} + } +} + +func (c *testClient) PublishAll(events []beat.Event) { + for _, e := range events { + c.Publish(e) + } +} +func (c *testClient) Close() error { + return nil +} From 0ac77dfde94a828d612ea37e97ef1ece41b21fd7 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 6 Dec 2023 22:40:53 +0100 Subject: [PATCH 2/3] Switch helper functions to a more generic interface --- filebeat/input/filestream/input_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 982107bc73f..ec6793ed171 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -40,14 +40,14 @@ import ( // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration // `expEventCount` is an expected amount of produced events -func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) { +func runFilestreamBenchmark(t require.TestingT, testID string, cfg string, expEventCount int) { logger := logp.L() c, err := conf.NewConfigWithYAML([]byte(cfg), cfg) - require.NoError(b, err) + require.NoError(t, err) - p := Plugin(logger, createTestStore(b)) + p := Plugin(logger, createTestStore(t)) input, err := p.Manager.Create(c) - require.NoError(b, err) + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) context := v2.Context{ @@ -60,7 +60,7 @@ func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCou done := make(chan struct{}) go func() { err := input.Run(context, connector) - assert.NoError(b, err) + assert.NoError(t, err) done <- struct{}{} }() @@ -117,7 +117,7 @@ paths: }) } -func createTestStore(t *testing.B) loginp.StateStore { +func createTestStore(t require.TestingT) loginp.StateStore { return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())} } From a809df9490e5052729e8eaceea5b59c8d50bcb50 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Wed, 6 Dec 2023 22:44:29 +0100 Subject: [PATCH 3/3] Even more generic interfaces --- filebeat/input/filestream/input_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index ec6793ed171..a257745124b 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -40,7 +40,7 @@ import ( // `testID` must be unique for each test run // `cfg` must be a valid YAML string containing valid filestream configuration // `expEventCount` is an expected amount of produced events -func runFilestreamBenchmark(t require.TestingT, testID string, cfg string, expEventCount int) { +func runFilestreamBenchmark(t testing.TB, testID string, cfg string, expEventCount int) { logger := logp.L() c, err := conf.NewConfigWithYAML([]byte(cfg), cfg) require.NoError(t, err) @@ -69,18 +69,18 @@ func runFilestreamBenchmark(t require.TestingT, testID string, cfg string, expEv <-done // for more stable results we should wait until the full shutdown } -func generateFile(b *testing.B, lineCount int) string { - b.Helper() - dir := b.TempDir() +func generateFile(t testing.TB, lineCount int) string { + t.Helper() + dir := t.TempDir() file, err := os.CreateTemp(dir, "lines.log") - require.NoError(b, err) + require.NoError(t, err) for i := 0; i < lineCount; i++ { fmt.Fprintf(file, "rather mediocre log line message - %d\n", i) } filename := file.Name() err = file.Close() - require.NoError(b, err) + require.NoError(t, err) return filename } @@ -117,7 +117,7 @@ paths: }) } -func createTestStore(t require.TestingT) loginp.StateStore { +func createTestStore(t testing.TB) loginp.StateStore { return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())} }