diff --git a/bench/pyroscope_pipeline_bench_test.go b/bench/pyroscope_pipeline_bench_test.go index 4000fe6..2db874f 100644 --- a/bench/pyroscope_pipeline_bench_test.go +++ b/bench/pyroscope_pipeline_bench_test.go @@ -14,8 +14,8 @@ type request struct { } // Benchmarks a running otelcol pyroscope write pipeline (collector and Clickhouse). -// Adjust collectorAddr to bench a your target if needed. -// Example: GOMAXPROCS=1 go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/receiver/pyroscopereceiver -benchtime 10s -count 6 +// Adjust collectorAddr to bench your target if needed. +// Example: go test -bench ^BenchmarkPyroscopePipeline$ github.com/metrico/otel-collector/bench -benchtime 10s -count 6 -cpu 1 func BenchmarkPyroscopePipeline(b *testing.B) { dist := []request{ { diff --git a/config/example-config.yaml b/config/example-config.yaml index 2bcb3da..9838ea1 100644 --- a/config/example-config.yaml +++ b/config/example-config.yaml @@ -64,7 +64,7 @@ exporters: max_interval: 30s max_elapsed_time: 300s clickhouseprofileexporter: - dsn: tcp://0.0.0.0:9000/qryn + dsn: clickhouse://0.0.0.0:9000/qryn timeout: 10s sending_queue: queue_size: 100 @@ -81,6 +81,8 @@ exporters: extensions: health_check: pprof: + mutex_profile_fraction: 100 + block_profile_fraction: 100 zpages: memory_ballast: size_mib: 1000 diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index ae52e44..dcdb7c5 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -159,10 +159,20 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) error { tmp, _ = m.Get(columnDurationNs) duration_ns[i], _ = strconv.ParseUint(tmp.Str(), 10, 64) - tmp, _ = m.Get(columnPeriodType) + tmp, _ = m.Get(columnPayloadType) payload_type[i] = tmp.AsString() payload[i] = r.Body().Bytes().AsRaw() + + ch.logger.Debug( + fmt.Sprintf("batch insert prepared row %d", i), + zap.Uint64(columnTimestampNs, timestamp_ns[i]), + zap.String(columnType, typ[i]), + zap.String(columnServiceName, service_name[i]), + zap.String(columnPeriodType, period_type[i]), + zap.String(columnPeriodUnit, period_unit[i]), + zap.String(columnPayloadType, payload_type[i]), + ) } // column order here should match table column order diff --git a/loadtest/pyroscope_pipeline_k6_load_test.js b/loadtest/pyroscope_pipeline_k6_load_test.js new file mode 100644 index 0000000..154d865 --- /dev/null +++ b/loadtest/pyroscope_pipeline_k6_load_test.js @@ -0,0 +1,91 @@ +/** + * A simple k6 load test for pyroscope pipeline. + * + * Usage: + * - Average load test: + * k6 run pyroscope_pipeline_k6_load_test.js + * - Breakpoint test: + * k6 run pyroscope_pipeline_k6_load_test.js -e WORKLOAD=breakpoint + * - Smoke test: + * k6 run pyroscope_pipeline_k6_load_test.js -e WORKLOAD=smoke + */ +import http from 'k6/http'; +import { check } from 'k6'; + +var averageWorkload = { + // example workload: 30k pods with 5m upload interval, assuming uniform random distribution, + // then average rps is 30,000/5/60=100 + load_avg: { + executor: "ramping-vus", + stages: [ + { duration: "10s", target: 100 }, + { duration: "50s", target: 100 }, + { duration: "5s", target: 0 }, + ], + }, +}; + +var breakpointWorkload = { + load_avg: { + executor: "ramping-vus", + stages: [ + { duration: "10s", target: 20 }, + { duration: "30s", target: 20 }, + { duration: "30s", target: 100 }, + { duration: "30s", target: 120 }, + { duration: "30s", target: 150 }, + { duration: "30s", target: 180 }, + { duration: "30s", target: 200 }, + ], + }, +}; + +export const options = 'smoke' === __ENV.WORKLOAD ? null : { + thresholds: { + http_req_failed: [{threshold: "rate<0.000000001", abortOnFail: true }], + http_req_duration: ["med<100"], + http_req_duration: ["p(95)<300"], + http_req_duration: ["p(99)<1000"], + http_req_duration: ["max<3000"], + }, + scenarios: 'breakpoint' === __ENV.WORKLOAD ? breakpointWorkload : averageWorkload, +}; + +const dist = [ + { + urlParams: { + "name": "com.example.Test{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", + "from": "1700332322", + "until": "1700332329", + "format": "jfr", + "sampleRate": "100", + }, + jfrgz: open("../receiver/pyroscopereceiver/testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz"/*unix-only*/, "b"), + }, + { + urlParams: { + "name": "com.example.Test{dc=us-east-1,kubernetes_pod_name=app-abcd1234}", + "from": "1700332322", + "until": "1700332329", + "format": "jfr", + }, + jfrgz: open("../receiver/pyroscopereceiver/testdata/memory_alloc_live_example.jfr.gz"/*unix-only*/, "b"), + }, +] +const collectorAddr = "http://0.0.0.0:8062" + +let j = 0 + +export default function () { + const data = { + jfr: http.file(dist[j].jfrgz, "jfr"), + }; + const params = dist[j].urlParams + const qs = Object.keys(params).map(k => `${k}=${params[k]}`).join("&") + const res = http.post(`${collectorAddr}/ingest?${encodeURI(qs)}`, data); + + check(res, { + "response code was 204": (res) => res.status == 204, + }); + j = (j + 1) % dist.length; +} diff --git a/receiver/pyroscopereceiver/jfrparser/parser.go b/receiver/pyroscopereceiver/jfrparser/parser.go index 1cde2ca..c0d2eb4 100644 --- a/receiver/pyroscopereceiver/jfrparser/parser.go +++ b/receiver/pyroscopereceiver/jfrparser/parser.go @@ -300,7 +300,8 @@ func (pa *jfrPprofParser) appendLocation(sampleType sampleType, prof *pprof_prot // append new location with a single line referencing the new function, ignoring inlining without a line number newl := &pprof_proto.Location{ - ID: uint64(len(prof.Location)) + 1, // starts with 1 not 0 + ID: uint64(len(prof.Location)) + 1, // starts with 1 not 0 + // TODO: parse line numbers like https://github.com/grafana/jfr-parser/pull/27/files Line: []pprof_proto.Line{{Function: newf}}, } diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index 91ee8f0..37931c1 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -221,10 +221,9 @@ func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *at func acquireBuf(p *sync.Pool) *bytes.Buffer { v := p.Get() if v == nil { - v = new(bytes.Buffer) + return new(bytes.Buffer) } - buf := v.(*bytes.Buffer) - return buf + return v.(*bytes.Buffer) } func releaseBuf(p *sync.Pool, buf *bytes.Buffer) { @@ -240,6 +239,8 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque ) logs := plog.NewLogs() + recv.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) + qs := req.URL.Query() if tmp, ok = qs["format"]; ok && tmp[0] == "jfr" { pa = jfrparser.NewJfrPprofParser() @@ -284,9 +285,10 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque sz := 0 rs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() - for _, pr := range ps { + for i, pr := range ps { r := rs.AppendEmpty() - r.SetTimestamp(pcommon.Timestamp(ns(pm.start))) + timestampNs := ns(pm.start) + r.SetTimestamp(pcommon.Timestamp(timestampNs)) m := r.Attributes() m.PutStr("duration_ns", fmt.Sprint(ns(pm.end-pm.start))) m.PutStr("service_name", pm.name) @@ -300,6 +302,17 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque } r.Body().SetEmptyBytes().FromRaw(pr.Payload.Bytes()) sz += pr.Payload.Len() + recv.logger.Debug( + fmt.Sprintf("parsed profile %d", i), + zap.Uint64("timestamp_ns", timestampNs), + zap.String("type", pr.Type.Type), + zap.String("service_name", pm.name), + zap.String("period_type", pr.Type.PeriodType), + zap.String("period_unit", pr.Type.PeriodUnit), + zap.String("sample_types", strings.Join(pr.Type.SampleType, ",")), + zap.String("sample_units", strings.Join(pr.Type.SampleUnit, ",")), + zap.Uint8("payload_type", uint8(pr.PayloadType)), + ) } // sz may be 0 and it will be recorded otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) @@ -367,8 +380,8 @@ func entitiesToStrings(entities []profile_types.SampleType) []any { func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { m.PutStr("type", prof.Type.Type) - s := m.PutEmptySlice("sample_types") + s := m.PutEmptySlice("sample_types") err := s.FromRaw(stringToAnyArray(prof.Type.SampleType)) if err != nil { return err @@ -379,19 +392,18 @@ func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error { if err != nil { return err } + // Correct type assertion for []profile.SampleType result := prof.ValueAggregation.([]profile_types.SampleType) s = m.PutEmptySlice("values_agg") - err = s.FromRaw(entitiesToStrings(result)) - if err != nil { return err } + m.PutStr("period_type", prof.Type.PeriodType) m.PutStr("period_unit", prof.Type.PeriodUnit) m.PutStr("payload_type", fmt.Sprint(prof.PayloadType)) - return nil } diff --git a/receiver/pyroscopereceiver/testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz b/receiver/pyroscopereceiver/testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz new file mode 100644 index 0000000..9052290 Binary files /dev/null and b/receiver/pyroscopereceiver/testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz differ diff --git a/receiver/pyroscopereceiver/testdata/memory_alloc_live_example.jfr.gz b/receiver/pyroscopereceiver/testdata/memory_alloc_live_example.jfr.gz new file mode 100644 index 0000000..1615170 Binary files /dev/null and b/receiver/pyroscopereceiver/testdata/memory_alloc_live_example.jfr.gz differ