Skip to content

Commit

Permalink
Merge pull request #49 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Jan 19, 2024
2 parents dfec83d + d883335 commit c521620
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 122 deletions.
53 changes: 53 additions & 0 deletions bench/pyroscope_pipeline_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pyroscopereceiver

import (
"path/filepath"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/testclient"
"github.com/stretchr/testify/assert"
)

type request struct {
urlParams map[string]string
jfr string
}

// Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse).
// Adjust collectorAddr to bench a your target if needed.
// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6
func BenchmarkPyroscopePipeline(b *testing.B) {
dist := []request{
{
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
"sampleRate": "100",
},
jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
},
{
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
},
jfr: filepath.Join("..", "receiver", "pyroscopereceiver", "testdata", "memory_alloc_live_example.jfr"),
},
}
collectorAddr := "http://0.0.0.0:8062"

b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
j := 0
for pb.Next() {
err := testclient.Ingest(collectorAddr, dist[j].urlParams, dist[j].jfr)
assert.NoError(b, err, "failed to ingest")
j = (j + 1) % len(dist)
}
})
}
1 change: 1 addition & 0 deletions exporter/clickhouseprofileexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func initMetrics(meter metric.Meter) error {
if otelcolExporterClickhouseProfileFlushTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "flush_time_millis"),
metric.WithDescription("Clickhouse profile exporter flush time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions receiver/pyroscopereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op

- `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6.
- `timeout`: sets the server reponse timeout. Default is 10 seconds.
- `request_body_uncompressed_size_bytes`: sets the expected value for uncompressed request body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.
- `parsed_body_uncompressed_size_bytes`: sets the expected value for uncompressed parsed body size in bytes to size pipeline buffers and optimize allocations based on exported metrics. Default is 0.

## Example

Expand Down
12 changes: 0 additions & 12 deletions receiver/pyroscopereceiver/buf/prepare.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"compress/gzip"
"fmt"
"io"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
)

type codec uint8
Expand All @@ -17,15 +15,13 @@ const (

// Decodes compressed streams
type Decompressor struct {
uncompressedSizeBytes int64
maxUncompressedSizeBytes int64
decoders map[codec]func(body io.Reader) (io.Reader, error)
}

// Creates a new decompressor
func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64) *Decompressor {
func NewDecompressor(maxUncompressedSizeBytes int64) *Decompressor {
return &Decompressor{
uncompressedSizeBytes: uncompressedSizeBytes,
maxUncompressedSizeBytes: maxUncompressedSizeBytes,
decoders: map[codec]func(r io.Reader) (io.Reader, error){
Gzip: func(r io.Reader) (io.Reader, error) {
Expand All @@ -39,36 +35,34 @@ func NewDecompressor(uncompressedSizeBytes int64, maxUncompressedSizeBytes int64
}
}

func (d *Decompressor) readBytes(r io.Reader) (*bytes.Buffer, error) {
buf := buf.PrepareBuffer(d.uncompressedSizeBytes)

func (d *Decompressor) readBytes(r io.Reader, out *bytes.Buffer) error {
// read max+1 to validate size via a single Read()
lr := io.LimitReader(r, d.maxUncompressedSizeBytes+1)

n, err := buf.ReadFrom(lr)
n, err := out.ReadFrom(lr)
if err != nil {
return nil, err
return err
}
if n < 1 {
return nil, fmt.Errorf("empty profile")
return fmt.Errorf("empty profile")
}
if n > d.maxUncompressedSizeBytes {
return nil, fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
return fmt.Errorf("body size exceeds the limit %d bytes", d.maxUncompressedSizeBytes)
}
return buf, nil
return nil
}

// Decodes the accepted reader, applying the configured size limit to avoid oom by compression bomb
func (d *Decompressor) Decompress(r io.Reader, c codec) (*bytes.Buffer, error) {
func (d *Decompressor) Decompress(r io.Reader, c codec, out *bytes.Buffer) error {
decoder, ok := d.decoders[c]
if !ok {
return nil, fmt.Errorf("unsupported encoding")
return fmt.Errorf("unsupported encoding")
}

dr, err := decoder(r)
if err != nil {
return nil, err
return err
}

return d.readBytes(dr)
return d.readBytes(dr, out)
}
15 changes: 0 additions & 15 deletions receiver/pyroscopereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ type Config struct {

// Cofigures timeout for synchronous request handling by the receiver server
Timeout time.Duration `mapstructure:"timeout"`
// Configures the expected value for uncompressed request body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
RequestBodyUncompressedSizeBytes int64 `mapstructure:"request_body_uncompressed_size_bytes"`
// Configures the expected value for uncompressed parsed body size in bytes to size pipeline buffers
// and optimize allocations based on exported metrics
ParsedBodyUncompressedSizeBytes int64 `mapstructure:"parsed_body_uncompressed_size_bytes"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -38,14 +32,5 @@ func (cfg *Config) Validate() error {
if cfg.Protocols.Http.MaxRequestBodySize < 1 {
return fmt.Errorf("max_request_body_size must be positive")
}
if cfg.RequestBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("request_body_uncompressed_size_bytes must be positive")
}
if cfg.RequestBodyUncompressedSizeBytes > cfg.Protocols.Http.MaxRequestBodySize {
return fmt.Errorf("expected value cannot be greater than max")
}
if cfg.ParsedBodyUncompressedSizeBytes < 0 {
return fmt.Errorf("parsed_body_uncompressed_size_bytes must be positive")
}
return nil
}
12 changes: 4 additions & 8 deletions receiver/pyroscopereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
const (
typeStr = "pyroscopereceiver"

defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
defaultRequestBodyUncompressedSizeBytesExpectedValue = 0
defaultParsedBodyUncompressedSizeBytesExpectedValue = 0
defaultHttpAddr = "0.0.0.0:8062"
defaultMaxRequestBodySize = 5e6 + 1e6 // reserve for metadata
defaultTimeout = 10 * time.Second
)

func createDefaultConfig() component.Config {
Expand All @@ -28,9 +26,7 @@ func createDefaultConfig() component.Config {
MaxRequestBodySize: defaultMaxRequestBodySize,
},
},
Timeout: defaultTimeout,
RequestBodyUncompressedSizeBytes: defaultRequestBodyUncompressedSizeBytesExpectedValue,
ParsedBodyUncompressedSizeBytes: defaultParsedBodyUncompressedSizeBytesExpectedValue,
Timeout: defaultTimeout,
}
}

Expand Down
5 changes: 2 additions & 3 deletions receiver/pyroscopereceiver/jfrparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
pprof_proto "github.com/google/pprof/profile"
jfr_parser "github.com/grafana/jfr-parser/parser"
jfr_types "github.com/grafana/jfr-parser/parser/types"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/buf"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ func NewJfrPprofParser() *jfrPprofParser {
}

// Parses the jfr buffer into pprof. The returned slice may be empty without an error.
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error) {
func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
var (
period int64
event string
Expand Down Expand Up @@ -115,7 +114,7 @@ func (pa *jfrPprofParser) Parse(jfr *bytes.Buffer, md profile_types.Metadata, pa
for _, pr := range pa.proftab {
if nil != pr {
// assuming jfr-pprof conversion should not expand memory footprint, transitively applying jfr limit on pprof
pr.prof.Payload = buf.PrepareBuffer(parsedBodyUncompressedSizeBytes)
pr.prof.Payload = new(bytes.Buffer)
pr.pprof.WriteUncompressed(pr.prof.Payload)
ps = append(ps, pr.prof)
}
Expand Down
3 changes: 3 additions & 0 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ func initMetrics(meter metric.Meter) error {
if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
); err != nil {
return err
}
if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
); err != nil {
return err
}
if otelcolReceiverPyroscopeHttpResponseTimeMillis, err = meter.Int64Histogram(
fmt.Sprint(prefix, "http_response_time_millis"),
metric.WithDescription("Pyroscope receiver http response time in millis"),
metric.WithExplicitBucketBoundaries(0, 5, 10, 20, 50, 100, 200, 500, 1000, 5000),
); err != nil {
return err
}
Expand Down
49 changes: 49 additions & 0 deletions receiver/pyroscopereceiver/pool_alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pyroscopereceiver

import (
"bytes"
"compress/gzip"
"os"
"path/filepath"
"sync"
"testing"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/stretchr/testify/assert"
)

func TestAllocDecompress(t *testing.T) {
dist := []string{
filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
filepath.Join("testdata", "memory_alloc_live_example.jfr"),
}
compressed := []*bytes.Buffer{
loadCompressed(t, dist[0]),
loadCompressed(t, dist[1]),
}
d := compress.NewDecompressor(1024 * 1024 * 1024)
j := 0
p := &sync.Pool{}

n := testing.AllocsPerRun(100, func() {
buf := acquireBuf(p)
d.Decompress(compressed[j], compress.Gzip, buf)
releaseBuf(p, buf)
j = (j + 1) % len(dist)
})
t.Logf("\naverage alloc count: %f", n)
}

func loadCompressed(t *testing.T, jfr string) *bytes.Buffer {
uncompressed, err := os.ReadFile(jfr)
if err != nil {
assert.NoError(t, err, "failed to load jfr")
}
compressed := new(bytes.Buffer)
gw := gzip.NewWriter(compressed)
if _, err := gw.Write(uncompressed); err != nil {
assert.NoError(t, err, "failed to compress jfr")
}
gw.Close()
return compressed
}
42 changes: 32 additions & 10 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ type pyroscopeReceiver struct {
decompressor *compress.Decompressor
httpServer *http.Server
shutdownWg sync.WaitGroup

uncompressedBufPool *sync.Pool
}

type parser interface {
// Parses the given input buffer into the collector's profile IR
Parse(buf *bytes.Buffer, md profile_types.Metadata, parsedBodyUncompressedSizeBytes int64) ([]profile_types.ProfileIR, error)
Parse(buf *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error)
}

type params struct {
Expand All @@ -72,13 +74,14 @@ type params struct {

func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.CreateSettings) (*pyroscopeReceiver, error) {
recv := &pyroscopeReceiver{
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
next: consumer,
uncompressedBufPool: &sync.Pool{},
}
recv.decompressor = compress.NewDecompressor(recv.cfg.RequestBodyUncompressedSizeBytes, recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.decompressor = compress.NewDecompressor(recv.cfg.Protocols.Http.MaxRequestBodySize)
recv.httpMux = http.NewServeMux()
recv.httpMux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) {
recv.httpHandlerIngest(resp, req)
Expand Down Expand Up @@ -151,7 +154,7 @@ func (recv *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWri
}

otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().Unix()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
otelcolReceiverPyroscopeHttpResponseTimeMillis.Record(ctx, time.Now().UnixMilli()-startTimeFromContext(ctx), metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess)))
writeResponseNoContent(resp)
}()
return c
Expand Down Expand Up @@ -221,6 +224,20 @@ func newOtelcolAttrSetHttp(service string, errorCode string) *attribute.Set {
return &s
}

func acquireBuf(p *sync.Pool) *bytes.Buffer {
v := p.Get()
if v == nil {
v = new(bytes.Buffer)
}
buf := v.(*bytes.Buffer)
return buf
}

func releaseBuf(p *sync.Pool, buf *bytes.Buffer) {
buf.Reset()
p.Put(buf)
}

func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, pm params) (plog.Logs, error) {
var (
tmp []string
Expand All @@ -243,7 +260,12 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
}
defer f.Close()

buf, err := recv.decompressor.Decompress(f, compress.Gzip)
buf := acquireBuf(recv.uncompressedBufPool)
defer func() {
releaseBuf(recv.uncompressedBufPool, buf)
}()

err = recv.decompressor.Decompress(f, compress.Gzip, buf)
if err != nil {
return logs, fmt.Errorf("failed to decompress body: %w", err)
}
Expand All @@ -261,7 +283,7 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
md.SampleRateHertz = hz
}

ps, err := pa.Parse(buf, md, recv.cfg.ParsedBodyUncompressedSizeBytes)
ps, err := pa.Parse(buf, md)
if err != nil {
return logs, fmt.Errorf("failed to parse pprof: %w", err)
}
Expand Down
Loading

0 comments on commit c521620

Please sign in to comment.