Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/go-pprof: process pprof #57

Merged
merged 6 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
519 changes: 261 additions & 258 deletions go.mod

Large diffs are not rendered by default.

906 changes: 475 additions & 431 deletions go.sum

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions receiver/pyroscopereceiver/pprofparser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package pprofparser

import (
"bytes"
"fmt"

pprof_proto "github.com/google/pprof/profile"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

type sampleType uint8

const (
sampleTypeCpu sampleType = iota
sampleTypeCount
)

type profileWrapper struct {
pprof *pprof_proto.Profile
prof profile_types.ProfileIR
}

type pProfParser struct {
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

// Creates a pprof parser that parse the accepted jfr buffer
func NewPprofParser() *pProfParser {
return &pProfParser{}
}

func (pa *pProfParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
// Parse pprof data
pProfData, err := pprof_proto.Parse(data)
if err != nil {
return nil, err
}

// Process pprof data and create SampleType slice
var sampleTypes []string
var sampleUnits []string
var valueAggregates []profile_types.SampleType

for i, st := range pProfData.SampleType {
sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type)
sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit)
sum, count := calculateSumAndCount(pProfData, i)
valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

var profiles []profile_types.ProfileIR
var profileType string

switch pProfData.PeriodType.Type {
case "cpu":
profileType = "process_cpu"
case "wall":
profileType = "wall"
case "mutex", "contentions":
profileType = "mutex"
case "goroutine":
profileType = "goroutines"
case "objects", "space", "alloc", "inuse":
profileType = "memory"
case "block":
profileType = "block"
}

profileTypeInfo := profile_types.ProfileType{
PeriodType: pProfData.PeriodType.Type,
PeriodUnit: pProfData.PeriodType.Unit,
SampleType: sampleTypes,
SampleUnit: sampleUnits,
Type: profileType,
}

// Create a new ProfileIR instance
profile := profile_types.ProfileIR{
ValueAggregation: valueAggregates,
Type: profileTypeInfo,
}
profile.Payload = new(bytes.Buffer)
pProfData.WriteUncompressed(profile.Payload)
// Append the profile to the result
profiles = append(profiles, profile)
return profiles, nil
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))
for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}
38 changes: 24 additions & 14 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"strings"
"sync"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/pprofparser"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/component"
Expand All @@ -31,7 +33,8 @@ const (
ingestPath = "/ingest"

formatJfr = "jfr"
formatPprof = "pprof"
formatPprof = "profile"
filePprof = "profile.pprof"

errorCodeError = "1"
errorCodeSuccess = ""
Expand Down Expand Up @@ -241,14 +244,14 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
logs := plog.NewLogs()

qs := req.URL.Query()
if tmp, ok = qs["format"]; ok && tmp[0] == "jfr" {
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
pa = jfrparser.NewJfrPprofParser()
} else {
return logs, fmt.Errorf("unsupported format, supported: [jfr]")
}
pa = pprofparser.NewPprofParser()

}
// support only multipart/form-data
f, err := recv.openMultipartJfr(req)
f, err := recv.openMultipart(req)
if err != nil {
return logs, err
}
Expand Down Expand Up @@ -315,26 +318,32 @@ func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding stri
return &s
}

func (recv *pyroscopeReceiver) openMultipartJfr(req *http.Request) (multipart.File, error) {
func (recv *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File, error) {
if err := req.ParseMultipartForm(recv.cfg.Protocols.Http.MaxRequestBodySize); err != nil {
return nil, fmt.Errorf("failed to parse multipart request: %w", err)
}
mf := req.MultipartForm
defer func() {
_ = mf.RemoveAll()
}()

part, ok := mf.File[formatJfr]
if !ok {
return nil, fmt.Errorf("required jfr part is missing")
formats := []string{formatJfr, formatPprof}
var part []*multipart.FileHeader // Replace YourPartType with the actual type of your 'part' variable
for _, f := range formats {
if p, ok := mf.File[f]; ok {
part = p
break
}
}
if part == nil {
return nil, fmt.Errorf("required jfr/pprof part is missing")
}
fh := part[0]
if fh.Filename != formatJfr {
return nil, fmt.Errorf("jfr filename is not '%s'", formatJfr)
if fh.Filename != formatJfr && fh.Filename != filePprof {
return nil, fmt.Errorf("filename is not '%s or %s'", formatJfr, formatPprof)
}
f, err := fh.Open()
if err != nil {
return nil, fmt.Errorf("failed to open jfr file")
return nil, fmt.Errorf("failed to open file")
}
return f, nil
}
Expand Down Expand Up @@ -366,6 +375,7 @@ func entitiesToStrings(entities []profile_types.SampleType) []any {
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {

m.PutStr("type", prof.Type.Type)
s := m.PutEmptySlice("sample_types")

Expand Down
20 changes: 10 additions & 10 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"go.uber.org/zap"
)

type jfrtest struct {
type datatest struct {
name string
urlParams map[string]string
jfr string
filename string
expected plog.Logs
}

Expand All @@ -40,10 +40,10 @@ func loadTestData(t *testing.T, filename string) []byte {
return b
}

func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) {
func run(t *testing.T, tests []datatest, collectorAddr string, sink *consumertest.LogsSink) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed")
assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.filename), "send shouldn't have been failed")
actual := sink.AllLogs()
assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0]))
sink.Reset()
Expand Down Expand Up @@ -75,9 +75,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) {
}

func TestPyroscopeIngestJfrCpu(t *testing.T) {
tests := make([]jfrtest, 1)
tests := make([]datatest, 1)
pb := loadTestData(t, "cortex-dev-01__kafka-0__cpu__0.pb")
tests[0] = jfrtest{
tests[0] = datatest{
name: "send labeled multipart form data gzipped cpu jfr to http ingest endpoint",
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
Expand All @@ -86,7 +86,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
"format": "jfr",
"sampleRate": "100",
},
jfr: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
filename: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
expected: gen([]profileLog{{
timestamp: 1700332322000000000,
attrs: map[string]any{
Expand All @@ -113,18 +113,18 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
}

func TestPyroscopeIngestJfrMemory(t *testing.T) {
tests := make([]jfrtest, 1)
tests := make([]datatest, 1)
pbAllocInNewTlab := loadTestData(t, "memory_example_alloc_in_new_tlab.pb")
pbLiveObject := loadTestData(t, "memory_example_live_object.pb")
tests[0] = jfrtest{
tests[0] = datatest{
name: "send labeled multipart form data gzipped memory jfr to http ingest endpoint",
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
},
jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"),
filename: filepath.Join("testdata", "memory_alloc_live_example.jfr"),
expected: gen([]profileLog{{
timestamp: 1700332322000000000,
attrs: map[string]any{
Expand Down
11 changes: 10 additions & 1 deletion receiver/pyroscopereceiver/testclient/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"mime/multipart"
"net/http"
"os"
"strings"
)

func Ingest(addr string, urlParams map[string]string, jfr string) error {
Expand All @@ -17,8 +18,16 @@ func Ingest(addr string, urlParams map[string]string, jfr string) error {

body := new(bytes.Buffer)

var fieldName, filename string
if strings.Contains(jfr, "profile") {
fieldName = "profile"
filename = "profile.pprof"
} else {
fieldName = "jfr"
filename = "jfr"
}
mw := multipart.NewWriter(body)
part, err := mw.CreateFormFile("jfr", "jfr")
part, err := mw.CreateFormFile(fieldName, filename)
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}
Expand Down
Binary file added receiver/pyroscopereceiver/testdata/profile.pprof
Binary file not shown.
Loading