From 02ea6cb3417ad05daa00bf856b876ea144afcb96 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Thu, 11 Jan 2024 14:37:54 +0200 Subject: [PATCH 1/7] test(pyroscope): add simple unscientific benchmark --- exporter/clickhouseprofileexporter/metrics.go | 1 + receiver/pyroscopereceiver/metrics.go | 3 ++ .../pyroscope_pipeline_bench_test.go | 45 +++++++++++++++++++ receiver/pyroscopereceiver/receiver_test.go | 8 ++-- 4 files changed, 52 insertions(+), 5 deletions(-) create mode 100644 receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go diff --git a/exporter/clickhouseprofileexporter/metrics.go b/exporter/clickhouseprofileexporter/metrics.go index 37bb392..83082cf 100644 --- a/exporter/clickhouseprofileexporter/metrics.go +++ b/exporter/clickhouseprofileexporter/metrics.go @@ -17,6 +17,7 @@ func initMetrics(meter metric.Meter) error { if otelcolExporterClickhouseProfileFlushTimeMillis, err = meter.Int64Histogram( fmt.Sprint(prefix, "flush_time_millis"), metric.WithDescription("Clickhouse profile exporter flush time in millis"), + metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), ); err != nil { return err } diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index d68f41a..f77d1c9 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -26,18 +26,21 @@ func initMetrics(meter metric.Meter) error { if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"), + metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), ); err != nil { return err } if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"), + metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), ); err != nil { return err } if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram( fmt.Sprint(prefix, "http_response_time_millis"), metric.WithDescription("Pyroscope receiver http response time in millis"), + metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000), ); err != nil { return err } diff --git a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go new file mode 100644 index 0000000..0bd6eaf --- /dev/null +++ b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go @@ -0,0 +1,45 @@ +package pyroscopereceiver + +import ( + "fmt" + "path/filepath" + "testing" +) + +type request struct { + urlParams map[string]string + jfr string +} + +// Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse). +// Adjust collectorAddr to bench a your target if needed. +func BenchmarkPyroscopePipeline(b *testing.B) { + dist := []request{ + { + urlParams: map[string]string{ + "name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", + "from": "1700332322", + "until": "1700332329", + "format": "jfr", + "sampleRate": "100", + }, + jfr: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), + }, + { + urlParams: map[string]string{ + "name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", + "from": "1700332322", + "until": "1700332329", + "format": "jfr", + }, + jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"), + }, + } + collectorAddr := fmt.Sprintf("http://%s%s", defaultHttpAddr, ingestPath) + + b.ResetTimer() + for i, j := 0, 0; i < b.N; i++ { + send(collectorAddr, dist[j].urlParams, dist[j].jfr) + j = (j + 1) % len(dist) + } +} diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index fa8697b..840a731 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -46,7 +46,7 @@ func loadTestData(t *testing.T, filename string) []byte { func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, send(t, collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed") + assert.NoError(t, send(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed") actual := sink.AllLogs() assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0])) sink.Reset() @@ -63,9 +63,7 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, - ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, + Timeout: defaultTimeout, } sink := new(consumertest.LogsSink) set := receivertest.NewNopCreateSettings() @@ -79,7 +77,7 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { return addr, sink } -func send(t *testing.T, addr string, urlParams map[string]string, jfr string) error { +func send(addr string, urlParams map[string]string, jfr string) error { data, err := os.ReadFile(jfr) if err != nil { return err From 3a09fa833679ad1ffb3661ee3c5d0a3f1a0e15de Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 14 Jan 2024 11:04:23 +0200 Subject: [PATCH 2/7] support parallel bench --- .../pyroscope_pipeline_bench_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go index 0bd6eaf..a15ad53 100644 --- a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go +++ b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go @@ -13,6 +13,7 @@ type request struct { // Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse). // Adjust collectorAddr to bench a your target if needed. +// Example: go test -benchmem -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 -cpu 1 | tee bench.txt func BenchmarkPyroscopePipeline(b *testing.B) { dist := []request{ { @@ -38,8 +39,11 @@ func BenchmarkPyroscopePipeline(b *testing.B) { collectorAddr := fmt.Sprintf("http://%s%s", defaultHttpAddr, ingestPath) b.ResetTimer() - for i, j := 0, 0; i < b.N; i++ { - send(collectorAddr, dist[j].urlParams, dist[j].jfr) - j = (j + 1) % len(dist) - } + b.RunParallel(func(pb *testing.PB) { + j := 0 + for pb.Next() { + send(collectorAddr, dist[j].urlParams, dist[j].jfr) + j = (j + 1) % len(dist) + } + }) } From 06a4e315fab76b951b1f9d581146111e283f552b Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 14 Jan 2024 11:07:30 +0200 Subject: [PATCH 3/7] report allocs internally --- receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go index a15ad53..975178c 100644 --- a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go +++ b/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go @@ -13,7 +13,7 @@ type request struct { // Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse). // Adjust collectorAddr to bench a your target if needed. -// Example: go test -benchmem -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 -cpu 1 | tee bench.txt +// Example: go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 -cpu 1 | tee bench.txt func BenchmarkPyroscopePipeline(b *testing.B) { dist := []request{ { @@ -38,6 +38,7 @@ func BenchmarkPyroscopePipeline(b *testing.B) { } collectorAddr := fmt.Sprintf("http://%s%s", defaultHttpAddr, ingestPath) + b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { j := 0 From 1fa6b1c0063b0c30873eeedf63d39f96144d485e Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 14 Jan 2024 12:55:53 +0200 Subject: [PATCH 4/7] increase test timeout for debug --- receiver/pyroscopereceiver/receiver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index 840a731..aff31cb 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -63,7 +63,7 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, + Timeout: defaultTimeout * 100, } sink := new(consumertest.LogsSink) set := receivertest.NewNopCreateSettings() @@ -163,7 +163,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { pbAllocInNewTlab := loadTestData(t, "memory_example_alloc_in_new_tlab.pb") pbLiveObject := loadTestData(t, "memory_example_live_object.pb") tests[0] = jfrtest{ - name: "send labeled multipart form data gzipped memoty jfr to http ingest endpoint", + name: "send labeled multipart form data gzipped memory jfr to http ingest endpoint", urlParams: map[string]string{ "name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", "from": "1700332322", From 64114a3fd56308061124dc31a88d5b4455b81268 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 14 Jan 2024 13:09:44 +0200 Subject: [PATCH 5/7] refactor testclient pkg and bench --- .../pyroscope_pipeline_bench_test.go | 9 +-- receiver/pyroscopereceiver/receiver_test.go | 55 ++----------------- .../pyroscopereceiver/testclient/ingest.go | 54 ++++++++++++++++++ 3 files changed, 63 insertions(+), 55 deletions(-) rename {receiver/pyroscopereceiver => bench}/pyroscope_pipeline_bench_test.go (76%) create mode 100644 receiver/pyroscopereceiver/testclient/ingest.go diff --git a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go b/bench/pyroscope_pipeline_bench_test.go similarity index 76% rename from receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go rename to bench/pyroscope_pipeline_bench_test.go index 975178c..3a86c45 100644 --- a/receiver/pyroscopereceiver/pyroscope_pipeline_bench_test.go +++ b/bench/pyroscope_pipeline_bench_test.go @@ -1,9 +1,10 @@ package pyroscopereceiver import ( - "fmt" "path/filepath" "testing" + + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient" ) type request struct { @@ -13,7 +14,7 @@ type request struct { // Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse). // Adjust collectorAddr to bench a your target if needed. -// Example: go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 -cpu 1 | tee bench.txt +// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 func BenchmarkPyroscopePipeline(b *testing.B) { dist := []request{ { @@ -36,14 +37,14 @@ func BenchmarkPyroscopePipeline(b *testing.B) { jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"), }, } - collectorAddr := fmt.Sprintf("http://%s%s", defaultHttpAddr, ingestPath) + collectorAddr := "http://0.0.0.0:8062" b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { j := 0 for pb.Next() { - send(collectorAddr, dist[j].urlParams, dist[j].jfr) + testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr) j = (j + 1) % len(dist) } }) diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index aff31cb..1976e88 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -1,17 +1,14 @@ package pyroscopereceiver import ( - "bytes" - "compress/gzip" "context" "fmt" - "mime/multipart" "net" - "net/http" "os" "path/filepath" "testing" + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,7 +43,7 @@ func loadTestData(t *testing.T, filename string) []byte { func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - assert.NoError(t, send(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed") + assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed") actual := sink.AllLogs() assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0])) sink.Reset() @@ -77,50 +74,6 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { return addr, sink } -func send(addr string, urlParams map[string]string, jfr string) error { - data, err := os.ReadFile(jfr) - if err != nil { - return err - } - - body := new(bytes.Buffer) - - mw := multipart.NewWriter(body) - part, err := mw.CreateFormFile("jfr", "jfr") - if err != nil { - return fmt.Errorf("failed to create form file: %w", err) - } - gw := gzip.NewWriter(part) - if _, err := gw.Write(data); err != nil { - return err - } - gw.Close() - mw.Close() - - req, err := http.NewRequest("POST", addr, body) - if err != nil { - return err - } - req.Header.Add("Content-Type", mw.FormDataContentType()) - - q := req.URL.Query() - for k, v := range urlParams { - q.Add(k, v) - } - req.URL.RawQuery = q.Encode() - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - - resp.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return fmt.Errorf("failed to upload profile; http status code: %d", resp.StatusCode) - } - return nil -} - func TestPyroscopeIngestJfrCpu(t *testing.T) { tests := make([]jfrtest, 1) pb := loadTestData(t, "cortex-dev-01__kafka-0__cpu__0.pb") @@ -154,7 +107,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) { }}), } addr, sink := startHttpServer(t) - collectorAddr := fmt.Sprintf("http://%s%s", addr, ingestPath) + collectorAddr := fmt.Sprintf("http://%s", addr) run(t, tests, collectorAddr, sink) } @@ -211,7 +164,7 @@ func TestPyroscopeIngestJfrMemory(t *testing.T) { } addr, sink := startHttpServer(t) - collectorAddr := fmt.Sprintf("http://%s%s", addr, ingestPath) + collectorAddr := fmt.Sprintf("http://%s", addr) run(t, tests, collectorAddr, sink) } diff --git a/receiver/pyroscopereceiver/testclient/ingest.go b/receiver/pyroscopereceiver/testclient/ingest.go new file mode 100644 index 0000000..096e0e5 --- /dev/null +++ b/receiver/pyroscopereceiver/testclient/ingest.go @@ -0,0 +1,54 @@ +package testclient + +import ( + "bytes" + "compress/gzip" + "fmt" + "mime/multipart" + "net/http" + "os" +) + +func Ingest(addr string, urlParams map[string]string, jfr string) error { + data, err := os.ReadFile(jfr) + if err != nil { + return err + } + + body := new(bytes.Buffer) + + mw := multipart.NewWriter(body) + part, err := mw.CreateFormFile("jfr", "jfr") + if err != nil { + return fmt.Errorf("failed to create form file: %w", err) + } + gw := gzip.NewWriter(part) + if _, err := gw.Write(data); err != nil { + return err + } + gw.Close() + mw.Close() + + req, err := http.NewRequest("POST", fmt.Sprintf("%s/ingest", addr), body) + if err != nil { + return err + } + req.Header.Add("Content-Type", mw.FormDataContentType()) + + q := req.URL.Query() + for k, v := range urlParams { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("failed to upload profile; http status code: %d", resp.StatusCode) + } + return nil +} From 11f72b4330abf5d7ec948a1327a897bd311c508b Mon Sep 17 00:00:00 2001 From: tomershafir Date: Sun, 14 Jan 2024 16:48:14 +0200 Subject: [PATCH 6/7] rm prealloc config and pool decompress buffers --- receiver/pyroscopereceiver/README.md | 2 - receiver/pyroscopereceiver/buf/prepare.go | 12 ----- .../compress/{compress.go => decompress.go} | 28 +++++------ receiver/pyroscopereceiver/config.go | 15 ------ receiver/pyroscopereceiver/factory.go | 12 ++--- .../pyroscopereceiver/jfrparser/parser.go | 5 +- receiver/pyroscopereceiver/pool_alloc_test.go | 49 +++++++++++++++++++ receiver/pyroscopereceiver/receiver.go | 42 ++++++++++++---- receiver/pyroscopereceiver/receiver_test.go | 4 +- 9 files changed, 99 insertions(+), 70 deletions(-) delete mode 100644 receiver/pyroscopereceiver/buf/prepare.go rename receiver/pyroscopereceiver/compress/{compress.go => decompress.go} (57%) create mode 100644 receiver/pyroscopereceiver/pool_alloc_test.go diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index 7a20998..e0aa639 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -11,8 +11,6 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6. - `timeout`: sets the server reponse timeout. Default is 10 seconds. -- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. -- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0. ## Example diff --git a/receiver/pyroscopereceiver/buf/prepare.go b/receiver/pyroscopereceiver/buf/prepare.go deleted file mode 100644 index ad5f829..0000000 --- a/receiver/pyroscopereceiver/buf/prepare.go +++ /dev/null @@ -1,12 +0,0 @@ -package buf - -import "bytes" - -// Pre-allocates a buffer based on heuristics to minimize resize -func PrepareBuffer(uncompressedSizeBytes int64) *bytes.Buffer { - var buf bytes.Buffer - // extra space to try avoid realloc where expected size fits enough - // TODO: try internal simple statistical model to pre-allocate a buffer - buf.Grow(int(uncompressedSizeBytes) + bytes.MinRead) - return &buf -} diff --git a/receiver/pyroscopereceiver/compress/compress.go b/receiver/pyroscopereceiver/compress/decompress.go similarity index 57% rename from receiver/pyroscopereceiver/compress/compress.go rename to receiver/pyroscopereceiver/compress/decompress.go index 9106559..0381c56 100644 --- a/receiver/pyroscopereceiver/compress/compress.go +++ b/receiver/pyroscopereceiver/compress/decompress.go @@ -5,8 +5,6 @@ import ( "compress/gzip" "fmt" "io" - - "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" ) type codec uint8 @@ -17,15 +15,13 @@ const ( // Decodes compressed streams type Decompressor struct { - uncompressedSizeBytes int64 maxUncompressedSizeBytes int64 decoders map[codec]func(body io.Reader) (io.Reader, error) } // Creates a new decompressor -func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor { +func NewDecompressor(maxUncompressedSizeBytes int64) *Decompressor { return &Decompressor{ - uncompressedSizeBytes: uncompressedSizeBytes, maxUncompressedSizeBytes: maxUncompressedSizeBytes, decoders: map[codec]func(r io.Reader) (io.Reader, error){ Gzip: func(r io.Reader) (io.Reader, error) { @@ -39,36 +35,34 @@ func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64 } } -func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) { - buf := buf.PrepareBuffer(d.uncompressedSizeBytes) - +func (d *Decompressor) readBytes(r io.Reader, out *bytes.Buffer) error { // read max+1 to validate size via a single Read() lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1) - n, err := buf.ReadFrom(lr) + n, err := out.ReadFrom(lr) if err != nil { - return nil, err + return err } if n < 1 { - return nil, fmt.Errorf("empty profile") + return fmt.Errorf("empty profile") } if n > d.maxUncompressedSizeBytes { - return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes) + return fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes) } - return buf, nil + return nil } // Decodes the accepted reader, applying the configured size limit to avoid oom by compression bomb -func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) { +func (d *Decompressor) Decompress(r io.Reader, c codec, out *bytes.Buffer) error { decoder, ok := d.decoders[c] if !ok { - return nil, fmt.Errorf("unsupported encoding") + return fmt.Errorf("unsupported encoding") } dr, err := decoder(r) if err != nil { - return nil, err + return err } - return d.readBytes(dr) + return d.readBytes(dr, out) } diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index e54081a..d7e0b2f 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -20,12 +20,6 @@ type Config struct { // Cofigures timeout for synchronous request handling by the receiver server Timeout time.Duration `mapstructure:"timeout"` - // Configures the expected value for uncompressed request body size in bytes to size pipeline buffers - // and optimize allocations based on exported metrics - RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"` - // Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers - // and optimize allocations based on exported metrics - ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"` } var _ component.Config = (*Config)(nil) @@ -38,14 +32,5 @@ func (cfg *Config) Validate() error { if cfg.Protocols.Http.MaxRequestBodySize < 1 { return fmt.Errorf("max_request_body_size must be positive") } - if cfg.RequestBodyUncompressedSizeBytes < 0 { - return fmt.Errorf("request_body_uncompressed_size_bytes must be positive") - } - if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize { - return fmt.Errorf("expected value cannot be greater than max") - } - if cfg.ParsedBodyUncompressedSizeBytes < 0 { - return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive") - } return nil } diff --git a/receiver/pyroscopereceiver/factory.go b/receiver/pyroscopereceiver/factory.go index d698fdf..4839e3e 100644 --- a/receiver/pyroscopereceiver/factory.go +++ b/receiver/pyroscopereceiver/factory.go @@ -13,11 +13,9 @@ import ( const ( typeStr = "pyroscopereceiver" - defaultHttpAddr = "0.0.0.0:8062" - defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata - defaultTimeout = 10 * time.Second - defaultRequestBodyUncompressedSizeBytesExpectedValue = 0 - defaultParsedBodyUncompressedSizeBytesExpectedValue = 0 + defaultHttpAddr = "0.0.0.0:8062" + defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata + defaultTimeout = 10 * time.Second ) func createDefaultConfig() component.Config { @@ -28,9 +26,7 @@ func createDefaultConfig() component.Config { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, - ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, + Timeout: defaultTimeout, } } diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 0db1ed7..396b8f8 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -8,7 +8,6 @@ import ( pprof_proto "github.com/google/pprof/profile" jfr_parser "github.com/grafana/jfr-parser/parser" jfr_types "github.com/grafana/jfr-parser/parser/types" - "github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf" profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types" ) @@ -55,7 +54,7 @@ func NewJfrPprofParser() *jfrPprofParser { } // Parses the jfr buffer into pprof. The returned slice may be empty without an error. -func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) { +func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) { var ( period int64 event string @@ -115,7 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, pa for _, pr := range pa.proftab { if nil != pr { // assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof - pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes) + pr.prof.Payload = new(bytes.Buffer) pr.pprof.WriteUncompressed(pr.prof.Payload) ps = append(ps, pr.prof) } diff --git a/receiver/pyroscopereceiver/pool_alloc_test.go b/receiver/pyroscopereceiver/pool_alloc_test.go new file mode 100644 index 0000000..820e9e7 --- /dev/null +++ b/receiver/pyroscopereceiver/pool_alloc_test.go @@ -0,0 +1,49 @@ +package pyroscopereceiver + +import ( + "bytes" + "compress/gzip" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress" + "github.com/stretchr/testify/assert" +) + +func TestAllocDecompress(t *testing.T) { + dist := []string{ + filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), + filepath.Join("testdata", "memory_alloc_live_example.jfr"), + } + compressed := []*bytes.Buffer{ + loadCompressed(t, dist[0]), + loadCompressed(t, dist[1]), + } + d := compress.NewDecompressor(1024 * 1024 * 1024) + j := 0 + p := &sync.Pool{} + + n := testing.AllocsPerRun(100, func() { + buf := acquireBuf(p) + d.Decompress(compressed[j], compress.Gzip, buf) + releaseBuf(p, buf) + j = (j + 1) % len(dist) + }) + t.Logf("\naverage alloc count: %f", n) +} + +func loadCompressed(t *testing.T, jfr string) *bytes.Buffer { + uncompressed, err := os.ReadFile(jfr) + if err != nil { + assert.NoError(t, err, "failed to load jfr") + } + compressed := new(bytes.Buffer) + gw := gzip.NewWriter(compressed) + if _, err := gw.Write(uncompressed); err != nil { + assert.NoError(t, err, "failed to compress jfr") + } + gw.Close() + return compressed +} diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 912c0aa..cd4f348 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -56,11 +56,13 @@ type pyroscopeReceiver struct { decompressor *compress.Decompressor httpServer *http.Server shutdownWg sync.WaitGroup + + uncompressedBufPool *sync.Pool } type parser interface { // Parses the given input buffer into the collector's profile IR - Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) + Parse(buf *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) } type params struct { @@ -72,13 +74,14 @@ type params struct { func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.CreateSettings) (*pyroscopeReceiver, error) { recv := &pyroscopeReceiver{ - cfg: cfg, - set: set, - logger: set.Logger, - meter: set.MeterProvider.Meter(typeStr), - next: consumer, + cfg: cfg, + set: set, + logger: set.Logger, + meter: set.MeterProvider.Meter(typeStr), + next: consumer, + uncompressedBufPool: &sync.Pool{}, } - recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize) + recv.decompressor = compress.NewDecompressor(recv.cfg.Protocols.Http.MaxRequestBodySize) recv.httpMux = http.NewServeMux() recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { recv.httpHandlerIngest(resp, req) @@ -151,7 +154,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri } otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) - otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) + otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess))) writeResponseNoContent(resp) }() return c @@ -221,6 +224,20 @@ func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set { return &s } +func acquireBuf(p *sync.Pool) *bytes.Buffer { + v := p.Get() + if v == nil { + v = new(bytes.Buffer) + } + buf := v.(*bytes.Buffer) + return buf +} + +func releaseBuf(p *sync.Pool, buf *bytes.Buffer) { + buf.Reset() + p.Put(buf) +} + func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) { var ( tmp []string @@ -243,7 +260,12 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque } defer f.Close() - buf, err := recv.decompressor.Decompress(f, compress.Gzip) + buf := acquireBuf(recv.uncompressedBufPool) + defer func() { + releaseBuf(recv.uncompressedBufPool, buf) + }() + + err = recv.decompressor.Decompress(f, compress.Gzip, buf) if err != nil { return logs, fmt.Errorf("failed to decompress body: %w", err) } @@ -261,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque md.SampleRateHertz = hz } - ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes) + ps, err := pa.Parse(buf, md) if err != nil { return logs, fmt.Errorf("failed to parse pprof: %w", err) } diff --git a/receiver/pyroscopereceiver/receiver_test.go b/receiver/pyroscopereceiver/receiver_test.go index fa8697b..e141007 100644 --- a/receiver/pyroscopereceiver/receiver_test.go +++ b/receiver/pyroscopereceiver/receiver_test.go @@ -63,9 +63,7 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) { MaxRequestBodySize: defaultMaxRequestBodySize, }, }, - Timeout: defaultTimeout, - RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue, - ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue, + Timeout: defaultTimeout, } sink := new(consumertest.LogsSink) set := receivertest.NewNopCreateSettings() From 3d81ccfc9b7a6d342f45f543da3390fc15b3550a Mon Sep 17 00:00:00 2001 From: tomershafir Date: Mon, 15 Jan 2024 17:11:23 +0200 Subject: [PATCH 7/7] fix testdata path in /bench --- bench/pyroscope_pipeline_bench_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bench/pyroscope_pipeline_bench_test.go b/bench/pyroscope_pipeline_bench_test.go index 3a86c45..4000fe6 100644 --- a/bench/pyroscope_pipeline_bench_test.go +++ b/bench/pyroscope_pipeline_bench_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient" + "github.com/stretchr/testify/assert" ) type request struct { @@ -25,7 +26,7 @@ func BenchmarkPyroscopePipeline(b *testing.B) { "format": "jfr", "sampleRate": "100", }, - jfr: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), + jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"), }, { urlParams: map[string]string{ @@ -34,7 +35,7 @@ func BenchmarkPyroscopePipeline(b *testing.B) { "until": "1700332329", "format": "jfr", }, - jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"), + jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "memory_alloc_live_example.jfr"), }, } collectorAddr := "http://0.0.0.0:8062" @@ -44,7 +45,8 @@ func BenchmarkPyroscopePipeline(b *testing.B) { b.RunParallel(func(pb *testing.PB) { j := 0 for pb.Next() { - testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr) + err := testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr) + assert.NoError(b, err, "failed to ingest") j = (j + 1) % len(dist) } })