Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre release #105

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 78 additions & 49 deletions exporter/clickhouseprofileexporter/ch/access_native_columnar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -93,6 +94,7 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
payload_type := make([]string, 0)
payload := make([][]byte, 0)
tree := make([][]tuple, 0)
var pooledTrees []*PooledTree
functions := make([][]tuple, 0)

rl := ls.ResourceLogs()
Expand Down Expand Up @@ -179,7 +181,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
return 0, err
}

tree = append(tree, _tree)
pooledTrees = append(pooledTrees, _tree)
tree = append(tree, _tree.data)

idx = offset + s
ch.logger.Debug(
Expand Down Expand Up @@ -239,13 +242,8 @@ func (ch *clickhouseAccessNativeColumnar) InsertBatch(ls plog.Logs) (int, error)
return 0, err
}
err = b.Send()
for _, tpls := range tree {
for _, t := range tpls {
for _, v := range t[3].([]tuple) {
triples.put(v)
}
quadruples.put(t)
}
for _, tpls := range pooledTrees {
trees.put(tpls)
}
return offset, err
}
Expand Down Expand Up @@ -302,58 +300,90 @@ func readFunctionsFromMap(m pcommon.Map) ([]tuple, error) {
}

type LimitedPool struct {
m sync.RWMutex
pool *sync.Pool
size int
m sync.RWMutex
pool [20]*sync.Pool
createPool func() *sync.Pool
}

func (l *LimitedPool) get() tuple {
type PooledTree struct {
time time.Time
triplesCount int
data []tuple
triples []tuple
}

func (l *LimitedPool) get(quadruples int, triples int) *PooledTree {
l.m.Lock()
defer l.m.Unlock()
l.size--
if l.size < 0 {
l.size = 0
var pool *sync.Pool
if triples >= 20 {
pool = l.createPool()
} else if l.pool[triples] == nil {
l.pool[triples] = l.createPool()
pool = l.pool[triples]
} else {
pool = l.pool[triples]
}
tree := pool.Get().(*PooledTree)
var redo bool
if cap(tree.triples) < quadruples*triples {
tree.triples = make([]tuple, quadruples*triples)
for i := range tree.triples {
tree.triples[i] = tuple{nil, nil, nil}
}
redo = true
}
tree.triples = tree.triples[:quadruples*triples]
if cap(tree.data) < quadruples {
tree.data = make([]tuple, quadruples)
redo = true
}
return l.pool.Get().(tuple)
tree.data = tree.data[:quadruples]
if redo || tree.triplesCount != triples {
j := 0
for i := range tree.data {
_triples := tree.triples[j : j+triples]
j += triples
tree.data[i] = tuple{nil, nil, nil, _triples}
}
}
tree.triplesCount = triples
return tree
}

func (l *LimitedPool) put(t tuple) {
func (l *LimitedPool) put(t *PooledTree) {
l.m.Lock()
defer l.m.Unlock()
if l.size >= 100000 {
if t.triplesCount >= 20 {
return
}
l.size++
l.pool.Put(t)
}

var triples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 3)
},
},
pool := l.pool[t.triplesCount]
if time.Now().Sub(t.time) < time.Minute {
pool.Put(t)
}
}

var quadruples = LimitedPool{
pool: &sync.Pool{
New: func() interface{} {
return make(tuple, 4)
},
var trees = LimitedPool{
createPool: func() *sync.Pool {
return &sync.Pool{
New: func() interface{} {
return &PooledTree{time: time.Now()}
},
}
},
}

func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
func readTreeFromMap(m pcommon.Map) (*PooledTree, error) {
raw, _ := m.Get("tree")
bRaw := bytes.NewReader(raw.Bytes().AsRaw())
size, err := binary.ReadVarint(bRaw)
treeSize, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
}

res := make([]tuple, size)
var res *PooledTree

for i := int64(0); i < size; i++ {
for i := int64(0); i < treeSize; i++ {
parentId, err := binary.ReadUvarint(bRaw)
if err != nil {
return nil, err
Expand All @@ -374,8 +404,11 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
return nil, err
}

values := make([]tuple, size)
for i := range values {
if res == nil {
res = trees.get(int(treeSize), int(size))
}

for j := int64(0); j < size; j++ {
size, err := binary.ReadVarint(bRaw)
if err != nil {
return nil, err
Expand All @@ -395,17 +428,13 @@ func readTreeFromMap(m pcommon.Map) ([]tuple, error) {
if err != nil {
return nil, err
}

values[i] = triples.get() // tuple{name, self, total}
values[i][0] = name
values[i][1] = self
values[i][2] = total
res.data[i][3].([]tuple)[j][0] = name
res.data[i][3].([]tuple)[j][1] = self
res.data[i][3].([]tuple)[j][2] = total
}
res[i] = quadruples.get() // tuple{parentId, fnId, nodeId, values}
res[i][0] = parentId
res[i][1] = fnId
res[i][2] = nodeId
res[i][3] = values
res.data[i][0] = parentId
res.data[i][1] = fnId
res.data[i][2] = nodeId
}
return res, nil
}
12 changes: 12 additions & 0 deletions receiver/pyroscopereceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ Implements the Pyroscope ingest protocol and conveys the accepted profiles as Op

- `protocols`: sets the application layer protocols that the receiver will serve. See [Supported Protocols](#supported-protocols). Default is http/s on 0.0.0.0:8062 with max request body size of: 5e6 + 1e6.
- `timeout`: sets the server reponse timeout. Default is 10 seconds.
- `metrics`: configures the metrics collection for the Pyroscope receiver.
- `enable`: enables or disables metrics collection. Default is true.
- `exclude_labels`: a list of label names to exclude from the metrics. Available labels are:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akvlad I would like to keep service on the counters, and remove it from histograms. Maybe we can add an option to exclude the labels from spefiic metrics? Like an optional metric list for each label under exclude_labels

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I'll think about that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- `service`: name of the service provided the pprof request
- `type`: type of pprof request (jfr or pprof)
- `encoding`: not used, empty
- `error_code`: http error code response for http request count
- `status_code`: http response status code for http request count
- `exclude_metrics`: a list of metric names to exclude from collection. Available metrics are:
- `http_request_total`: Pyroscope receiver http request count.
- `request_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed request body size in bytes.
- `parsed_body_uncompressed_size_bytes`: Pyroscope receiver uncompressed parsed body size in bytes.

## Example

Expand Down
8 changes: 8 additions & 0 deletions receiver/pyroscopereceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ type Protocols struct {
HTTP *confighttp.ServerConfig `mapstructure:"http"`
}

type MetricsConfig struct {
Enable bool `mapstructure:"enable" default:"true"`
ExcludeLabels []string `mapstructure:"exclude_labels"`
ExcludeMetrics []string `mapstructure:"exclude_metrics"`
}

// Represents the receiver config within the collector's config.yaml
type Config struct {
Protocols Protocols `mapstructure:"protocols"`

// Cofigures timeout for synchronous request handling by the receiver server
Timeout time.Duration `mapstructure:"timeout"`

Metrics MetricsConfig `mapstructure:"metrics"`
}

var _ component.Config = (*Config)(nil)
Expand Down
15 changes: 11 additions & 4 deletions receiver/pyroscopereceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pyroscopereceiver

import (
"fmt"
"slices"

"go.opentelemetry.io/otel/metric"
)
Expand All @@ -14,22 +15,28 @@ var (
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes metric.Int64Histogram
)

func initMetrics(meter metric.Meter) error {
func initMetrics(meter metric.Meter, conf *Config) error {
var err error
if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter(
if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "http_request_total") {
otelcolReceiverPyroscopeHttpRequestTotal = nil
} else if otelcolReceiverPyroscopeHttpRequestTotal, err = meter.Int64Counter(
fmt.Sprint(prefix, "http_request_total"),
metric.WithDescription("Pyroscope receiver http request count"),
); err != nil {
return err
}
if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "request_body_uncompressed_size_bytes") {
otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes = nil
} else if otelcolReceiverPyroscopeRequestBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "request_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed request body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
); err != nil {
return err
}
if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
if !conf.Metrics.Enable || slices.Contains(conf.Metrics.ExcludeMetrics, "parsed_body_uncompressed_size_bytes") {
otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes = nil
} else if otelcolReceiverPyroscopeParsedBodyUncompressedSizeBytes, err = meter.Int64Histogram(
fmt.Sprint(prefix, "parsed_body_uncompressed_size_bytes"),
metric.WithDescription("Pyroscope receiver uncompressed parsed body size in bytes"),
metric.WithExplicitBucketBoundaries(0, 1024, 4096, 16384, 32768, 65536, 131072, 262144, 524288, 1048576),
Expand Down
Loading
Loading