Skip to content

Commit

Permalink
libbeat: optimize asset data decoding (#42180)
Browse files Browse the repository at this point in the history
While working on #41888 I was benchmarking the filebeatreceiver
CreateLogs factory and noticed that the asset decoding in libbeat
dominates the cpu and memory profile of the receiver creation.

This behavior is expected since asset decoding is intended to occur at
startup. However, it's still worthwhile to optimize it if possible.

Some time ago I worked on `iobuf.ReadAll` at
elastic/elastic-agent-libs#229, an optimized version of `io.ReadAll`
that has a better growth algorithm—based on bytes.Buffer—and
benefits from the `io.ReaderFrom` optimization. The choice of when to
use this is very picky, as using it with a reader that is not a
`io.ReaderFrom` can be slower than the standard `io.ReadAll`. For this
case we are certain of the reader implementation, so we can use it.
Benchmark results show that it is 5% faster and uses 17% less memory.

After these fixes the profiles are still dominated by the asset
decoding, but I guess that is expected, at least it is a bit faster now.
  • Loading branch information
mauri870 authored Jan 2, 2025
1 parent b781320 commit 3d1bdcf
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
6 changes: 3 additions & 3 deletions libbeat/asset/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"bytes"
"compress/zlib"
"encoding/base64"
"io/ioutil"
"sort"

"github.com/elastic/elastic-agent-libs/iobuf"
)

// FieldsRegistry contains a list of fields.yml files
Expand Down Expand Up @@ -106,7 +107,6 @@ func EncodeData(data string) (string, error) {

// DecodeData base64 decodes the data and uncompresses it
func DecodeData(data string) ([]byte, error) {

decoded, err := base64.StdEncoding.DecodeString(data)
if err != nil {
return nil, err
Expand All @@ -119,5 +119,5 @@ func DecodeData(data string) ([]byte, error) {
}
defer r.Close()

return ioutil.ReadAll(r)
return iobuf.ReadAll(r)
}
45 changes: 45 additions & 0 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
Expand Down Expand Up @@ -89,3 +90,47 @@ found:
err = r.Shutdown(context.Background())
assert.NoError(t, err, "Error shutting down filebeatreceiver")
}

func BenchmarkFactory(b *testing.B) {
tmpDir := b.TempDir()

cfg := &Config{
Beatconfig: map[string]interface{}{
"filebeat": map[string]interface{}{
"inputs": []map[string]interface{}{
{
"type": "benchmark",
"enabled": true,
"message": "test",
"count": 10,
},
},
},
"output": map[string]interface{}{
"otelconsumer": map[string]interface{}{},
},
"logging": map[string]interface{}{
"level": "debug",
"selectors": []string{
"*",
},
},
"path.home": tmpDir,
},
}

var zapLogs bytes.Buffer
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
zapcore.AddSync(&zapLogs),
zapcore.DebugLevel)

receiverSettings := receiver.Settings{}
receiverSettings.Logger = zap.New(core)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := NewFactory().CreateLogsReceiver(context.Background(), receiverSettings, cfg, nil)
require.NoError(b, err)
}
}

0 comments on commit 3d1bdcf

Please sign in to comment.