diff --git a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go index 2b83c58..53912ea 100644 --- a/exporter/clickhouseprofileexporter/ch/access_native_columnar.go +++ b/exporter/clickhouseprofileexporter/ch/access_native_columnar.go @@ -7,6 +7,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -93,6 +94,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) payload_type := make([]string, 0) payload := make([][]byte, 0) tree := make([][]tuple, 0) + var pooledTrees []*PooledTree functions := make([][]tuple, 0) rl := ls.ResourceLogs() @@ -179,7 +181,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) return 0, err } - tree = append(tree, _tree) + pooledTrees = append(pooledTrees, _tree) + tree = append(tree, _tree.data) idx = offset + s ch.logger.Debug( @@ -239,13 +242,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error) return 0, err } err = b.Send() - for _, tpls := range tree { - for _, t := range tpls { - for _, v := range t[3].([]tuple) { - triples.put(v) - } - quadruples.put(t) - } + for _, tpls := range pooledTrees { + trees.put(tpls) } return offset, err } @@ -302,58 +300,90 @@ func readFunctionsFromMap(m pcommon.Map) ([]tuple, error) { } type LimitedPool struct { - m sync.RWMutex - pool *sync.Pool - size int + m sync.RWMutex + pool [20]*sync.Pool + createPool func() *sync.Pool } -func (l *LimitedPool) get() tuple { +type PooledTree struct { + time time.Time + triplesCount int + data []tuple + triples []tuple +} + +func (l *LimitedPool) get(quadruples int, triples int) *PooledTree { l.m.Lock() defer l.m.Unlock() - l.size-- - if l.size < 0 { - l.size = 0 + var pool *sync.Pool + if triples >= 20 { + pool = l.createPool() + } else if l.pool[triples] == nil { + l.pool[triples] = l.createPool() + pool = l.pool[triples] + } else { + pool = l.pool[triples] + } + tree := pool.Get().(*PooledTree) + var redo bool + if cap(tree.triples) < quadruples*triples { + tree.triples = make([]tuple, quadruples*triples) + for i := range tree.triples { + tree.triples[i] = tuple{nil, nil, nil} + } + redo = true + } + tree.triples = tree.triples[:quadruples*triples] + if cap(tree.data) < quadruples { + tree.data = make([]tuple, quadruples) + redo = true } - return l.pool.Get().(tuple) + tree.data = tree.data[:quadruples] + if redo || tree.triplesCount != triples { + j := 0 + for i := range tree.data { + _triples := tree.triples[j : j+triples] + j += triples + tree.data[i] = tuple{nil, nil, nil, _triples} + } + } + tree.triplesCount = triples + return tree } -func (l *LimitedPool) put(t tuple) { +func (l *LimitedPool) put(t *PooledTree) { l.m.Lock() defer l.m.Unlock() - if l.size >= 100000 { + if t.triplesCount >= 20 { return } - l.size++ - l.pool.Put(t) -} - -var triples = LimitedPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(tuple, 3) - }, - }, + pool := l.pool[t.triplesCount] + if time.Now().Sub(t.time) < time.Minute { + pool.Put(t) + } } -var quadruples = LimitedPool{ - pool: &sync.Pool{ - New: func() interface{} { - return make(tuple, 4) - }, +var trees = LimitedPool{ + createPool: func() *sync.Pool { + return &sync.Pool{ + New: func() interface{} { + return &PooledTree{time: time.Now()} + }, + } }, } -func readTreeFromMap(m pcommon.Map) ([]tuple, error) { +func readTreeFromMap(m pcommon.Map) (*PooledTree, error) { raw, _ := m.Get("tree") bRaw := bytes.NewReader(raw.Bytes().AsRaw()) - size, err := binary.ReadVarint(bRaw) + treeSize, err := binary.ReadVarint(bRaw) if err != nil { return nil, err } - res := make([]tuple, size) + var res *PooledTree - for i := int64(0); i < size; i++ { + for i := int64(0); i < treeSize; i++ { parentId, err := binary.ReadUvarint(bRaw) if err != nil { return nil, err @@ -374,8 +404,11 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) { return nil, err } - values := make([]tuple, size) - for i := range values { + if res == nil { + res = trees.get(int(treeSize), int(size)) + } + + for j := int64(0); j < size; j++ { size, err := binary.ReadVarint(bRaw) if err != nil { return nil, err @@ -395,17 +428,13 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) { if err != nil { return nil, err } - - values[i] = triples.get() // tuple{name, self, total} - values[i][0] = name - values[i][1] = self - values[i][2] = total + res.data[i][3].([]tuple)[j][0] = name + res.data[i][3].([]tuple)[j][1] = self + res.data[i][3].([]tuple)[j][2] = total } - res[i] = quadruples.get() // tuple{parentId, fnId, nodeId, values} - res[i][0] = parentId - res[i][1] = fnId - res[i][2] = nodeId - res[i][3] = values + res.data[i][0] = parentId + res.data[i][1] = fnId + res.data[i][2] = nodeId } return res, nil } diff --git a/exporter/qrynexporter/README.md b/exporter/qrynexporter/README.md index 7daa99c..f22c2c9 100644 --- a/exporter/qrynexporter/README.md +++ b/exporter/qrynexporter/README.md @@ -8,9 +8,18 @@ # Configuration options: +- `dsn` (required): Data Source Name for Clickhouse. + - Example: `tcp://localhost:9000/qryn` + +- `clustered_clickhouse` (required): + - Type: boolean + - Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`. + +- `client_side_trace_processing` (required): + - Type: boolean + - Default: `true` + - Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage. -- `dsn` (required): Clickhouse's dsn. -- `clustered_clickhouse` (required): true if clickhouse cluster is used # Example: ## Simple Trace Data diff --git a/exporter/qrynexporter/config.go b/exporter/qrynexporter/config.go index c71651b..d0e2477 100644 --- a/exporter/qrynexporter/config.go +++ b/exporter/qrynexporter/config.go @@ -30,6 +30,9 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` exporterhelper.QueueSettings `mapstructure:"sending_queue"` + // ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side. + ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"` + ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"` // DSN is the ClickHouse server Data Source Name. diff --git a/exporter/qrynexporter/logs.go b/exporter/qrynexporter/logs.go index cc8eafc..7791965 100644 --- a/exporter/qrynexporter/logs.go +++ b/exporter/qrynexporter/logs.go @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error { } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/metrics.go b/exporter/qrynexporter/metrics.go index c2f0fd4..d1c4618 100644 --- a/exporter/qrynexporter/metrics.go +++ b/exporter/qrynexporter/metrics.go @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric } } - if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil { + if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics))) e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error())) return err diff --git a/exporter/qrynexporter/schema.go b/exporter/qrynexporter/schema.go index fed1546..c84c745 100644 --- a/exporter/qrynexporter/schema.go +++ b/exporter/qrynexporter/schema.go @@ -20,7 +20,7 @@ import ( ) var ( - tracesInputSQL = func(clustered bool) string { + tracesInputSQL = func(_ bool) string { return `INSERT INTO traces_input ( trace_id, span_id, @@ -57,6 +57,40 @@ var ( } ) +func TracesV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces%s ( + oid, + trace_id, + span_id, + parent_id, + name, + timestamp_ns, + duration_ns, + service_name, + payload_type, + payload)`, dist) +} + +func TracesTagsV2InputSQL(clustered bool) string { + dist := "" + if clustered { + dist = "_dist" + } + return fmt.Sprintf(`INSERT INTO tempo_traces_attrs_gin%s ( + oid, + date, + key, + val, + trace_id, + span_id, + timestamp_ns, + duration)`, dist) +} + // Note: https://github.com/metrico/qryn/blob/master/lib/db/maintain/scripts.js // We need to align with the schema here. // @@ -76,8 +110,8 @@ var ( // // ) Engine=Null -// Trace represent trace model -type Trace struct { +// TraceInput represent trace model +type TraceInput struct { TraceID string `ch:"trace_id"` SpanID string `ch:"span_id"` ParentID string `ch:"parent_id"` @@ -90,6 +124,30 @@ type Trace struct { Tags [][]string `ch:"tags"` } +type TempoTrace struct { + OID string `ch:"oid"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + ParentID []byte `ch:"parent_id"` + Name string `ch:"name"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration_ns"` + ServiceName string `ch:"service_name"` + PayloadType int8 `ch:"payload_type"` + Payload string `ch:"payload"` +} + +type TempoTraceTag struct { + OID string `ch:"oid"` + Date time.Time `ch:"date"` + Key string `ch:"key"` + Val string `ch:"val"` + TraceID []byte `ch:"trace_id"` + SpanID []byte `ch:"span_id"` + TimestampNs int64 `ch:"timestamp_ns"` + DurationNs int64 `ch:"duration"` +} + // Sample represent sample data // `CREATE TABLE IF NOT EXISTS samples_v3 // ( diff --git a/exporter/qrynexporter/trace_batch_processor.go b/exporter/qrynexporter/trace_batch_processor.go new file mode 100644 index 0000000..6fcde08 --- /dev/null +++ b/exporter/qrynexporter/trace_batch_processor.go @@ -0,0 +1,103 @@ +package qrynexporter + +import ( + "encoding/hex" + "fmt" + "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +type traceWithTagsBatch struct { + driver.Batch + tagsBatch driver.Batch +} + +func (b *traceWithTagsBatch) AppendStruct(v any) error { + ti, ok := v.(*TraceInput) + if !ok { + return fmt.Errorf("invalid data type, expected *Trace, got %T", v) + } + binTraceId, err := unhexAndPad(ti.TraceID, 16) + if err != nil { + return err + } + binParentID, err := unhexAndPad(ti.ParentID, 8) + if err != nil { + return err + } + binSpanID, err := unhexAndPad(ti.SpanID, 8) + if err != nil { + return err + } + trace := &TempoTrace{ + OID: "0", + TraceID: binTraceId, + SpanID: binSpanID, + ParentID: binParentID, + Name: ti.Name, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + ServiceName: ti.ServiceName, + PayloadType: ti.PayloadType, + Payload: ti.Payload, + } + err = b.Batch.AppendStruct(trace) + if err != nil { + return err + } + for _, tag := range ti.Tags { + attr := &TempoTraceTag{ + OID: "0", + Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24), + Key: tag[0], + Val: tag[1], + TraceID: binTraceId, + SpanID: binSpanID, + TimestampNs: ti.TimestampNs, + DurationNs: ti.DurationNs, + } + err = b.tagsBatch.AppendStruct(attr) + if err != nil { + return err + } + } + return nil +} + +func (b *traceWithTagsBatch) Abort() error { + var errs [2]error + errs[0] = b.Batch.Abort() + errs[1] = b.tagsBatch.Abort() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func (b *traceWithTagsBatch) Send() error { + var errs [2]error + errs[0] = b.Batch.Send() + errs[1] = b.tagsBatch.Send() + for _, err := range errs { + if err != nil { + return err + } + } + return nil +} + +func unhexAndPad(s string, size int) ([]byte, error) { + bStr, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + if len(bStr) < size { + res := make([]byte, size) + copy(res[size-len(bStr):], bStr) + return res, nil + } + return bStr[size-len(bStr):], nil +} diff --git a/exporter/qrynexporter/traces.go b/exporter/qrynexporter/traces.go index 86dec6d..dfe6d7e 100644 --- a/exporter/qrynexporter/traces.go +++ b/exporter/qrynexporter/traces.go @@ -34,8 +34,11 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +type contextKey string + const ( - spanLinkDataFormat = "%s|%s|%s|%s|%d" + spanLinkDataFormat = "%s|%s|%s|%s|%d" + clusterKey contextKey = "cluster" ) var delegate = &protojson.MarshalOptions{ @@ -48,8 +51,9 @@ type tracesExporter struct { logger *zap.Logger meter metric.Meter - db clickhouse.Conn - cluster bool + db clickhouse.Conn + cluster bool + clientSide bool } // newTracesExporter returns a SpanWriter for the database @@ -63,10 +67,11 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings) return nil, err } exp := &tracesExporter{ - logger: logger, - meter: set.MeterProvider.Meter(typeStr), - db: db, - cluster: cfg.ClusteredClickhouse, + logger: logger, + meter: set.MeterProvider.Meter(typeStr), + db: db, + cluster: cfg.ClusteredClickhouse, + clientSide: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing, } if err := initMetrics(exp.meter); err != nil { exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) @@ -163,8 +168,15 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) { } func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error { - isCluster := ctx.Value("cluster").(bool) - batch, err := e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + isCluster := ctx.Value(clusterKey).(bool) + var batch driver.Batch + var err error + if e.clientSide { + batch, err = e.prepareBatchClientSide(ctx) + } else { + batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster)) + } + if err != nil { return err } @@ -187,9 +199,25 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans return nil } +func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) { + batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster)) + if err != nil { + return nil, err + } + subBatch, err := e.db.PrepareBatch(ctx, TracesTagsV2InputSQL(e.cluster)) + if err != nil { + batch.Abort() + return nil, err + } + return &traceWithTagsBatch{ + Batch: batch, + tagsBatch: subBatch, + }, nil +} + // traceDataPusher implements OTEL exporterhelper.traceDataPusher func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error { - _ctx := context.WithValue(ctx, "cluster", e.cluster) + _ctx := context.WithValue(ctx, clusterKey, e.cluster) start := time.Now() if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil { otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces))) @@ -362,7 +390,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte, return delegate.Marshal(otlpSpan) } -func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) { +func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) { durationNano := uint64(span.EndTimestamp() - span.StartTimestamp()) tags = aggregateSpanTags(span, tags) tags["service.name"] = serviceName @@ -379,7 +407,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName return nil, fmt.Errorf("failed to marshal span: %w", err) } - trace := &Trace{ + trace := &TraceInput{ TraceID: span.TraceID().String(), SpanID: span.SpanID().String(), ParentID: span.ParentSpanID().String(), diff --git a/receiver/pyroscopereceiver/README.md b/receiver/pyroscopereceiver/README.md index e0aa639..2ab4ffc 100644 --- a/receiver/pyroscopereceiver/README.md +++ b/receiver/pyroscopereceiver/README.md @@ -11,12 +11,32 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op - `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6. - `timeout`: sets the server reponse timeout. Default is 10 seconds. +- `metrics`: configures the metrics collection for the Pyroscope receiver. + - `enable`: enables or disables metrics collection. Default is true. + - `exclude_labels`: a list of metrics and label names to exclude from the metrics. + Available metrics are listed further. Metric name may be empty. Available labels are: + - `service`: name of the service provided the pprof request + - `type`: type of pprof request (jfr or pprof) + - `encoding`: not used, empty + - `error_code`: http error code response for http request count + - `status_code`: http response status code for http request count + - `exclude_metrics`: a list of metric names to exclude from collection. Available metrics are: + - `http_request_total`: Pyroscope receiver http request count. + - `request_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed request body size in bytes. + - `parsed_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed parsed body size in bytes. ## Example ```yaml receivers: pyroscopereceiver: + metrics: + enable: true + exclude_labels: + - metric: request_body_uncompressed_size_bytes + label: service + exclude_metrics: + - http_requests_total protocols: http: endpoint: 0.0.0.0:8062 diff --git a/receiver/pyroscopereceiver/config.go b/receiver/pyroscopereceiver/config.go index 988a755..86fecba 100644 --- a/receiver/pyroscopereceiver/config.go +++ b/receiver/pyroscopereceiver/config.go @@ -14,12 +14,25 @@ type Protocols struct { HTTP *confighttp.ServerConfig `mapstructure:"http"` } +type ExcludeLabel struct { + Metric string `mapstructure:"metric"` + Label string `mapstructure:"label"` +} + +type MetricsConfig struct { + Enable bool `mapstructure:"enable" default:"true"` + ExcludeLabels []ExcludeLabel `mapstructure:"exclude_labels"` + ExcludeMetrics []string `mapstructure:"exclude_metrics"` +} + // Represents the receiver config within the collector's config.yaml type Config struct { Protocols Protocols `mapstructure:"protocols"` // Cofigures timeout for synchronous request handling by the receiver server Timeout time.Duration `mapstructure:"timeout"` + + Metrics MetricsConfig `mapstructure:"metrics"` } var _ component.Config = (*Config)(nil) diff --git a/receiver/pyroscopereceiver/metrics.go b/receiver/pyroscopereceiver/metrics.go index c7dbc6f..2737499 100644 --- a/receiver/pyroscopereceiver/metrics.go +++ b/receiver/pyroscopereceiver/metrics.go @@ -2,6 +2,7 @@ package pyroscopereceiver import ( "fmt" + "slices" "go.opentelemetry.io/otel/metric" ) @@ -14,22 +15,28 @@ var ( otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram ) -func initMetrics(meter metric.Meter) error { +func initMetrics(meter metric.Meter, conf *Config) error { var err error - if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "http_request_total") { + otelcolReceiverPyroscopeHttpRequestTotal = nil + } else if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter( fmt.Sprint(prefix, "http_request_total"), metric.WithDescription("Pyroscope receiver http request count"), ); err != nil { return err } - if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "request_body_uncompressed_size_bytes") { + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes = nil + } else if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"), metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), ); err != nil { return err } - if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( + if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "parsed_body_uncompressed_size_bytes") { + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes = nil + } else if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram( fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"), metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"), metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576), diff --git a/receiver/pyroscopereceiver/receiver.go b/receiver/pyroscopereceiver/receiver.go index ce08cfc..d081d20 100644 --- a/receiver/pyroscopereceiver/receiver.go +++ b/receiver/pyroscopereceiver/receiver.go @@ -12,6 +12,7 @@ import ( "net" "net/http" "net/url" + "slices" "strconv" "strings" "sync" @@ -117,7 +118,7 @@ func newPyroscopeReceiver(cfg *Config, consumer consumer.Logs, set *receiver.Set r.mux.HandleFunc(ingestPath, func(resp http.ResponseWriter, req *http.Request) { r.httpHandlerIngest(resp, req) }) - if err := initMetrics(r.meter); err != nil { + if err := initMetrics(r.meter, cfg); err != nil { r.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error())) return r, err } @@ -176,15 +177,25 @@ func (r *pyroscopeReceiver) handle(ctx context.Context, resp http.ResponseWriter r.handleError(ctx, resp, "text/plain", http.StatusInternalServerError, err.Error(), pm.name, errorCodeError) return } - - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(pm.name, errorCodeSuccess, http.StatusNoContent))) + if otelcolReceiverPyroscopeHttpRequestTotal != nil { + otelcolReceiverPyroscopeHttpRequestTotal.Add( + ctx, 1, + metric.WithAttributeSet(*r.newOtelcolAttrSetHttp( + "http_request_total", pm.name, errorCodeSuccess, http.StatusNoContent)), + ) + } writeResponseNoContent(resp) }() return c } func (recv *pyroscopeReceiver) handleError(ctx context.Context, resp http.ResponseWriter, contentType string, statusCode int, msg string, service string, errorCode string) { - otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, metric.WithAttributeSet(*newOtelcolAttrSetHttp(service, errorCode, statusCode))) + if otelcolReceiverPyroscopeHttpRequestTotal != nil { + otelcolReceiverPyroscopeHttpRequestTotal.Add(ctx, 1, + metric.WithAttributeSet(*recv.newOtelcolAttrSetHttp( + "http_request_total", service, errorCode, statusCode))) + } + recv.logger.Error(msg) writeResponse(resp, "text/plain", statusCode, []byte(msg)) } @@ -240,12 +251,22 @@ func readParams(qs *url.Values) (params, error) { return p, nil } -func newOtelcolAttrSetHttp(service string, errorCode string, statusCode int) *attribute.Set { - s := attribute.NewSet( - attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, - attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)}, - attribute.KeyValue{Key: "status_code", Value: attribute.IntValue(statusCode)}, - ) +func (r *pyroscopeReceiver) newOtelcolAttrSetHttp(metric string, service string, errorCode string, + statusCode int) *attribute.Set { + keyValues := []attribute.KeyValue{ + {Key: keyService, Value: attribute.StringValue(service)}, + {Key: "error_code", Value: attribute.StringValue(errorCode)}, + {Key: "status_code", Value: attribute.IntValue(statusCode)}, + } + for i := len(keyValues) - 1; i >= 0; i-- { + if slices.ContainsFunc(r.cfg.Metrics.ExcludeLabels, func(label ExcludeLabel) bool { + return (label.Metric == metric || label.Metric == "") && label.Label == string(keyValues[i].Key) + }) { + keyValues[i] = keyValues[len(keyValues)-1] + keyValues = keyValues[:len(keyValues)-1] + } + } + s := attribute.NewSet(keyValues...) return &s } @@ -334,7 +355,9 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, r.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery)) qs := req.URL.Query() + format := formatPprof if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") { + format = formatJfr p = jfrparser.NewJfrPprofParser() } else if tmp, ok = qs["spyName"]; ok && (tmp[0] == "nodespy") { p = nodeparser.NewNodePprofParser() @@ -358,7 +381,12 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, return logs, fmt.Errorf("failed to decompress body: %w", err) } // TODO: try measure compressed size - otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatJfr, ""))) + if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes != nil { + otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes.Record(ctx, int64(buf.Len()), + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes("request_body_uncompressed_size_bytes", + pm.name, format, ""))) + } + resetHeaders(req) md := profile_types.Metadata{SampleRateHertz: 0} @@ -392,7 +420,6 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, } else { timestampNs = pm.start durationNs = pm.end - pm.start - durationNs = ns(durationNs) } record.SetTimestamp(pcommon.Timestamp(timestampNs)) m := record.Attributes() @@ -422,7 +449,12 @@ func (r *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Request, ) } // sz may be 0 and it will be recorded - otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), metric.WithAttributeSet(*newOtelcolAttrSetPayloadSizeBytes(pm.name, formatPprof, ""))) + if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes != nil { + otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes.Record(ctx, int64(sz), + metric.WithAttributeSet(*r.newOtelcolAttrSetPayloadSizeBytes( + "parsed_body_uncompressed_size_bytes", pm.name, formatPprof, ""))) + } + return logs, nil } @@ -430,8 +462,22 @@ func ns(sec uint64) uint64 { return sec * 1e9 } -func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding string) *attribute.Set { - s := attribute.NewSet(attribute.KeyValue{Key: keyService, Value: attribute.StringValue(service)}, attribute.KeyValue{Key: "type", Value: attribute.StringValue(typ)}, attribute.KeyValue{Key: "encoding", Value: attribute.StringValue(encoding)}) +func (r *pyroscopeReceiver) newOtelcolAttrSetPayloadSizeBytes(metric string, service string, typ string, + encoding string) *attribute.Set { + keyValues := []attribute.KeyValue{ + {Key: keyService, Value: attribute.StringValue(service)}, + {Key: "type", Value: attribute.StringValue(typ)}, + {Key: "encoding", Value: attribute.StringValue(encoding)}, + } + for i := len(keyValues) - 1; i >= 0; i-- { + if slices.ContainsFunc(r.cfg.Metrics.ExcludeLabels, func(label ExcludeLabel) bool { + return (label.Metric == metric || label.Metric == "") && label.Label == string(keyValues[i].Key) + }) { + keyValues[i] = keyValues[len(keyValues)-1] + keyValues = keyValues[:len(keyValues)-1] + } + } + s := attribute.NewSet(keyValues...) return &s }