Skip to content

Commit

Permalink
[libbeat] Go Benchmarks comparing compress/gzip and klauspost/compress (
Browse files Browse the repository at this point in the history
#41584)

- This PR replaces compress/gzip to klauspost/compress/gzip
- We send data to the _bulk endpoint and benchmark the results
  • Loading branch information
khushijain21 authored Nov 18, 2024
1 parent 5d37c99 commit 18e256f
Show file tree
Hide file tree
Showing 7 changed files with 1,510 additions and 1,437 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled
- Add `lowercase` processor. {issue}22254[22254] {pull}41424[41424]
- Add `uppercase` processor. {issue}22254[22254] {pull}41535[41535]
- Replace `compress/gzip` with https://github.com/klauspost/compress/gzip library for gzip compression {pull}41584[41584]

*Auditbeat*

Expand Down
2,778 changes: 1,389 additions & 1,389 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/icholy/digest v0.1.22
github.com/klauspost/compress v1.17.9
github.com/meraki/dashboard-api-go/v3 v3.0.9
github.com/otiai10/copy v1.12.0
github.com/pierrec/lz4/v4 v4.1.18
Expand Down Expand Up @@ -334,7 +335,6 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kortschak/utter v1.5.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion libbeat/esleg/eslegclient/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package eslegclient

import (
"bytes"
"compress/gzip"

"io"
"net/http"
"time"

"github.com/klauspost/compress/gzip"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs/codec"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down
33 changes: 32 additions & 1 deletion libbeat/internal/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ package testutil

import (
"flag"
"fmt"
"math/rand"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
)

var (
Expand All @@ -37,5 +41,32 @@ func SeedPRNG(t *testing.T) {
}

t.Logf("reproduce test with `go test ... -seed %v`", seed)
rand.Seed(seed)
rand.New(rand.NewSource(seed))
}

func GenerateEvents(numEvents, fieldsPerLevel, depth int) []beat.Event {
events := make([]beat.Event, numEvents)
for i := 0; i < numEvents; i++ {
event := &beat.Event{Fields: mapstr.M{}}
generateFields(event, fieldsPerLevel, depth)
events[i] = *event
}
return events
}

func generateFields(event *beat.Event, fieldsPerLevel, depth int) {
if depth == 0 {
return
}

for j := 1; j <= fieldsPerLevel; j++ {
var key string
for d := 1; d <= depth; d++ {
key += fmt.Sprintf("level%dfield%d", d, j)
key += "."
}
event.Fields.Put(key, "value")
key = ""
}

}
78 changes: 78 additions & 0 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/beats/v7/libbeat/outputs/outil"
Expand Down Expand Up @@ -713,6 +714,83 @@ func BenchmarkCollectPublishFailAll(b *testing.B) {
}
}

func BenchmarkPublish(b *testing.B) {
tests := []struct {
Name string
Events []beat.Event
}{
{
Name: "5 events",
Events: testutil.GenerateEvents(50, 5, 3),
},
{
Name: "50 events",
Events: testutil.GenerateEvents(500, 5, 3),
},
{
Name: "500 events",
Events: testutil.GenerateEvents(500, 5, 3),
},
}

levels := []int{1, 4, 7, 9}

requestCount := 0

// start a mock HTTP server
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(b, "testing value", r.Header.Get("X-Test"))
// from the documentation: https://golang.org/pkg/net/http/
// For incoming requests, the Host header is promoted to the
// Request.Host field and removed from the Header map.
assert.Equal(b, "myhost.local", r.Host)

var response string
if r.URL.Path == "/" {
response = `{ "version": { "number": "7.6.0" } }`
} else {
response = `{"items":[{"index":{}},{"index":{}},{"index":{}}]}`

}
fmt.Fprintln(w, response)
requestCount++
}))
defer ts.Close()

// Indexing to _bulk api
for _, test := range tests {
for _, l := range levels {
b.Run(fmt.Sprintf("%s with compression level %d", test.Name, l), func(b *testing.B) {
client, err := NewClient(
clientSettings{
connection: eslegclient.ConnectionSettings{
URL: ts.URL,
Headers: map[string]string{
"host": "myhost.local",
"X-Test": "testing value",
},
CompressionLevel: l,
},
},

nil,
)
assert.NoError(b, err)
batch := encodeBatch(client, outest.NewBatch(test.Events...))

// It uses gzip encoder internally for encoding data
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := client.Publish(context.Background(), batch)
assert.NoError(b, err)
}
})

}
}

}

func TestClientWithHeaders(t *testing.T) {
requestCount := 0
// start a mock HTTP server
Expand Down
51 changes: 6 additions & 45 deletions libbeat/processors/actions/lowercase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package actions

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -337,21 +337,14 @@ func BenchmarkLowerCaseProcessorRun(b *testing.B) {
Events []beat.Event
}{
{
Name: "5000 events with 5 fields on each level with 3 level depth without collisions",
Events: GenerateEvents(5000, 5, 3, false),
Name: "5000 events with 5 fields on each level with 3 level depth",
Events: testutil.GenerateEvents(5000, 5, 3),
},
{
Name: "5000 events with 5 fields on each level with 3 level depth with collisions",
Events: GenerateEvents(5000, 5, 3, true),
},
{
Name: "500 events with 50 fields on each level with 5 level depth without collisions",
Events: GenerateEvents(500, 50, 3, false),
},
{
Name: "500 events with 50 fields on each level with 5 level depth with collisions",
Events: GenerateEvents(500, 50, 3, true),
Name: "500 events with 50 fields on each level with 5 level depth",
Events: testutil.GenerateEvents(500, 50, 3),
},

// Add more test cases as needed for benchmarking
}

Expand All @@ -376,35 +369,3 @@ func BenchmarkLowerCaseProcessorRun(b *testing.B) {
})
}
}

func GenerateEvents(numEvents, fieldsPerLevel, depth int, withCollisions bool) []beat.Event {
events := make([]beat.Event, numEvents)
for i := 0; i < numEvents; i++ {
event := &beat.Event{Fields: mapstr.M{}}
generateFields(event, fieldsPerLevel, depth, withCollisions)
events[i] = *event
}
return events
}

func generateFields(event *beat.Event, fieldsPerLevel, depth int, withCollisions bool) {
if depth == 0 {
return
}

for j := 1; j <= fieldsPerLevel; j++ {
var key string
for d := 1; d < depth; d++ {
key += fmt.Sprintf("level%dfield%d", d, j)
key += "."
}
if withCollisions {
key += fmt.Sprintf("Level%dField%d", depth, j) // Creating a collision (Level is capitalized)
} else {
key += fmt.Sprintf("level%dfield%d", depth, j)
}
event.Fields.Put(key, "value")
key = ""
}

}

0 comments on commit 18e256f

Please sign in to comment.