From dc216f1eb6e7745f6df3cb67d9ea3b01ecba8291 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 30 Dec 2024 11:39:26 -0300 Subject: [PATCH] libbeat: optimize asset data decoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While working on #41888 I was benchmarking the filebeatreceiver CreateLogs factory and noticed that the asset decoding in libbeat dominates the cpu and memory profile of the receiver creation. This behavior is expected since asset decoding is intended to occur at startup. However, it's still worthwhile to optimize it if possible. Some time ago I worked on `iobuf.ReadAll` at elastic/elastic-agent-libs#229, an optimized version of `io.ReadAll` that has a better growth algorithm—based on bytes.Buffer—and benefits from the `io.ReaderFrom` optimization. The choice of when to use this is very picky, as using it with a reader that is not a `io.ReaderFrom` can be slower than the standard `io.ReadAll`. For this case we are certain of the reader implementation, so we can use it. Benchmark results show that it is 5% faster and uses 17% less memory. After these fixes the profiles are still dominated by the asset decoding, but I guess that is expected, at least it is a bit faster now. --- libbeat/asset/registry.go | 6 +-- x-pack/filebeat/fbreceiver/receiver_test.go | 45 +++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/libbeat/asset/registry.go b/libbeat/asset/registry.go index 99fc1a7dba0..fe34971c995 100644 --- a/libbeat/asset/registry.go +++ b/libbeat/asset/registry.go @@ -21,8 +21,9 @@ import ( "bytes" "compress/zlib" "encoding/base64" - "io/ioutil" "sort" + + "github.com/elastic/elastic-agent-libs/iobuf" ) // FieldsRegistry contains a list of fields.yml files @@ -106,7 +107,6 @@ func EncodeData(data string) (string, error) { // DecodeData base64 decodes the data and uncompresses it func DecodeData(data string) ([]byte, error) { - decoded, err := base64.StdEncoding.DecodeString(data) if err != nil { return nil, err @@ -119,5 +119,5 @@ func DecodeData(data string) ([]byte, error) { } defer r.Close() - return ioutil.ReadAll(r) + return iobuf.ReadAll(r) } diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 3bbdc1690ea..928db4c4b64 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" @@ -89,3 +90,47 @@ found: err = r.Shutdown(context.Background()) assert.NoError(t, err, "Error shutting down filebeatreceiver") } + +func BenchmarkFactory(b *testing.B) { + tmpDir := b.TempDir() + + cfg := &Config{ + Beatconfig: map[string]interface{}{ + "filebeat": map[string]interface{}{ + "inputs": []map[string]interface{}{ + { + "type": "benchmark", + "enabled": true, + "message": "test", + "count": 10, + }, + }, + }, + "output": map[string]interface{}{ + "otelconsumer": map[string]interface{}{}, + }, + "logging": map[string]interface{}{ + "level": "debug", + "selectors": []string{ + "*", + }, + }, + "path.home": tmpDir, + }, + } + + var zapLogs bytes.Buffer + core := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + zapcore.AddSync(&zapLogs), + zapcore.DebugLevel) + + receiverSettings := receiver.Settings{} + receiverSettings.Logger = zap.New(core) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := NewFactory().CreateLogsReceiver(context.Background(), receiverSettings, cfg, nil) + require.NoError(b, err) + } +}