Skip to content

Commit

Permalink
Merge pull request #57 from afzalabbasi/feature/go-pprof
Browse files Browse the repository at this point in the history
feature/go-pprof: process pprof
  • Loading branch information
akvlad authored Feb 9, 2024
2 parents 07da0ba + a1263ea commit c9ede0a
Show file tree
Hide file tree
Showing 7 changed files with 883 additions and 714 deletions.
519 changes: 261 additions & 258 deletions go.mod

Large diffs are not rendered by default.

906 changes: 475 additions & 431 deletions go.sum

Large diffs are not rendered by default.

103 changes: 103 additions & 0 deletions receiver/pyroscopereceiver/pprofparser/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package pprofparser

import (
"bytes"
"fmt"

pprof_proto "github.com/google/pprof/profile"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
)

type sampleType uint8

const (
sampleTypeCpu sampleType = iota
sampleTypeCount
)

type profileWrapper struct {
pprof *pprof_proto.Profile
prof profile_types.ProfileIR
}

type pProfParser struct {
proftab [sampleTypeCount]*profileWrapper // <sample type, (profile, pprof)>
samptab [sampleTypeCount]map[uint32]uint32 // <extern jfr stacktrace id,matching pprof sample array index>
loctab [sampleTypeCount]map[uint32]*pprof_proto.Location // <extern jfr funcid, pprof location>
}

// Creates a pprof parser that parse the accepted jfr buffer
func NewPprofParser() *pProfParser {
return &pProfParser{}
}

func (pa *pProfParser) Parse(data *bytes.Buffer, md profile_types.Metadata) ([]profile_types.ProfileIR, error) {
// Parse pprof data
pProfData, err := pprof_proto.Parse(data)
if err != nil {
return nil, err
}

// Process pprof data and create SampleType slice
var sampleTypes []string
var sampleUnits []string
var valueAggregates []profile_types.SampleType

for i, st := range pProfData.SampleType {
sampleTypes = append(sampleTypes, pProfData.SampleType[i].Type)
sampleUnits = append(sampleUnits, pProfData.SampleType[i].Unit)
sum, count := calculateSumAndCount(pProfData, i)
valueAggregates = append(valueAggregates, profile_types.SampleType{fmt.Sprintf("%s:%s", st.Type, st.Unit), sum, count})
}

var profiles []profile_types.ProfileIR
var profileType string

switch pProfData.PeriodType.Type {
case "cpu":
profileType = "process_cpu"
case "wall":
profileType = "wall"
case "mutex", "contentions":
profileType = "mutex"
case "goroutine":
profileType = "goroutines"
case "objects", "space", "alloc", "inuse":
profileType = "memory"
case "block":
profileType = "block"
}

profileTypeInfo := profile_types.ProfileType{
PeriodType: pProfData.PeriodType.Type,
PeriodUnit: pProfData.PeriodType.Unit,
SampleType: sampleTypes,
SampleUnit: sampleUnits,
Type: profileType,
}

// Create a new ProfileIR instance
profile := profile_types.ProfileIR{
ValueAggregation: valueAggregates,
Type: profileTypeInfo,
}
profile.Payload = new(bytes.Buffer)
pProfData.WriteUncompressed(profile.Payload)
// Append the profile to the result
profiles = append(profiles, profile)
return profiles, nil
}

func calculateSumAndCount(samples *pprof_proto.Profile, sampleTypeIndex int) (int64, int32) {
var sum int64
count := int32(len(samples.Sample))
for _, sample := range samples.Sample {
// Check if the sample has a value for the specified sample type
if sampleTypeIndex < len(sample.Value) {
// Accumulate the value for the specified sample type
sum += sample.Value[sampleTypeIndex]
}
}

return sum, count
}
38 changes: 24 additions & 14 deletions receiver/pyroscopereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"strings"
"sync"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/jfrparser"
"github.com/metrico/otel-collector/receiver/pyroscopereceiver/pprofparser"

"github.com/metrico/otel-collector/receiver/pyroscopereceiver/compress"
profile_types "github.com/metrico/otel-collector/receiver/pyroscopereceiver/types"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/component"
Expand All @@ -31,7 +33,8 @@ const (
ingestPath = "/ingest"

formatJfr = "jfr"
formatPprof = "pprof"
formatPprof = "profile"
filePprof = "profile.pprof"

errorCodeError = "1"
errorCodeSuccess = ""
Expand Down Expand Up @@ -242,14 +245,14 @@ func (recv *pyroscopeReceiver) readProfiles(ctx context.Context, req *http.Reque
recv.logger.Debug("received profiles", zap.String("url_query", req.URL.RawQuery))

qs := req.URL.Query()
if tmp, ok = qs["format"]; ok && tmp[0] == "jfr" {
if tmp, ok = qs["format"]; ok && (tmp[0] == "jfr") {
pa = jfrparser.NewJfrPprofParser()
} else {
return logs, fmt.Errorf("unsupported format, supported: [jfr]")
}
pa = pprofparser.NewPprofParser()

}
// support only multipart/form-data
f, err := recv.openMultipartJfr(req)
f, err := recv.openMultipart(req)
if err != nil {
return logs, err
}
Expand Down Expand Up @@ -328,26 +331,32 @@ func newOtelcolAttrSetPayloadSizeBytes(service string, typ string, encoding stri
return &s
}

func (recv *pyroscopeReceiver) openMultipartJfr(req *http.Request) (multipart.File, error) {
func (recv *pyroscopeReceiver) openMultipart(req *http.Request) (multipart.File, error) {
if err := req.ParseMultipartForm(recv.cfg.Protocols.Http.MaxRequestBodySize); err != nil {
return nil, fmt.Errorf("failed to parse multipart request: %w", err)
}
mf := req.MultipartForm
defer func() {
_ = mf.RemoveAll()
}()

part, ok := mf.File[formatJfr]
if !ok {
return nil, fmt.Errorf("required jfr part is missing")
formats := []string{formatJfr, formatPprof}
var part []*multipart.FileHeader // Replace YourPartType with the actual type of your 'part' variable
for _, f := range formats {
if p, ok := mf.File[f]; ok {
part = p
break
}
}
if part == nil {
return nil, fmt.Errorf("required jfr/pprof part is missing")
}
fh := part[0]
if fh.Filename != formatJfr {
return nil, fmt.Errorf("jfr filename is not '%s'", formatJfr)
if fh.Filename != formatJfr && fh.Filename != filePprof {
return nil, fmt.Errorf("filename is not '%s or %s'", formatJfr, formatPprof)
}
f, err := fh.Open()
if err != nil {
return nil, fmt.Errorf("failed to open jfr file")
return nil, fmt.Errorf("failed to open file")
}
return f, nil
}
Expand Down Expand Up @@ -379,6 +388,7 @@ func entitiesToStrings(entities []profile_types.SampleType) []any {
}

func setAttrsFromProfile(prof profile_types.ProfileIR, m pcommon.Map) error {

m.PutStr("type", prof.Type.Type)

s := m.PutEmptySlice("sample_types")
Expand Down
20 changes: 10 additions & 10 deletions receiver/pyroscopereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"go.uber.org/zap"
)

type jfrtest struct {
type datatest struct {
name string
urlParams map[string]string
jfr string
filename string
expected plog.Logs
}

Expand All @@ -40,10 +40,10 @@ func loadTestData(t *testing.T, filename string) []byte {
return b
}

func run(t *testing.T, tests []jfrtest, collectorAddr string, sink *consumertest.LogsSink) {
func run(t *testing.T, tests []datatest, collectorAddr string, sink *consumertest.LogsSink) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.jfr), "send shouldn't have been failed")
assert.NoError(t, testclient.Ingest(collectorAddr, tt.urlParams, tt.filename), "send shouldn't have been failed")
actual := sink.AllLogs()
assert.NoError(t, plogtest.CompareLogs(tt.expected, actual[0]))
sink.Reset()
Expand Down Expand Up @@ -75,9 +75,9 @@ func startHttpServer(t *testing.T) (string, *consumertest.LogsSink) {
}

func TestPyroscopeIngestJfrCpu(t *testing.T) {
tests := make([]jfrtest, 1)
tests := make([]datatest, 1)
pb := loadTestData(t, "cortex-dev-01__kafka-0__cpu__0.pb")
tests[0] = jfrtest{
tests[0] = datatest{
name: "send labeled multipart form data gzipped cpu jfr to http ingest endpoint",
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
Expand All @@ -86,7 +86,7 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
"format": "jfr",
"sampleRate": "100",
},
jfr: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
filename: filepath.Join("testdata", "cortex-dev-01__kafka-0__cpu__0.jfr"),
expected: gen([]profileLog{{
timestamp: 1700332322000000000,
attrs: map[string]any{
Expand All @@ -113,18 +113,18 @@ func TestPyroscopeIngestJfrCpu(t *testing.T) {
}

func TestPyroscopeIngestJfrMemory(t *testing.T) {
tests := make([]jfrtest, 1)
tests := make([]datatest, 1)
pbAllocInNewTlab := loadTestData(t, "memory_example_alloc_in_new_tlab.pb")
pbLiveObject := loadTestData(t, "memory_example_live_object.pb")
tests[0] = jfrtest{
tests[0] = datatest{
name: "send labeled multipart form data gzipped memory jfr to http ingest endpoint",
urlParams: map[string]string{
"name": "com.example.App{dc=us-east-1,kubernetes_pod_name=app-abcd1234}",
"from": "1700332322",
"until": "1700332329",
"format": "jfr",
},
jfr: filepath.Join("testdata", "memory_alloc_live_example.jfr"),
filename: filepath.Join("testdata", "memory_alloc_live_example.jfr"),
expected: gen([]profileLog{{
timestamp: 1700332322000000000,
attrs: map[string]any{
Expand Down
11 changes: 10 additions & 1 deletion receiver/pyroscopereceiver/testclient/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"mime/multipart"
"net/http"
"os"
"strings"
)

func Ingest(addr string, urlParams map[string]string, jfr string) error {
Expand All @@ -17,8 +18,16 @@ func Ingest(addr string, urlParams map[string]string, jfr string) error {

body := new(bytes.Buffer)

var fieldName, filename string
if strings.Contains(jfr, "profile") {
fieldName = "profile"
filename = "profile.pprof"
} else {
fieldName = "jfr"
filename = "jfr"
}
mw := multipart.NewWriter(body)
part, err := mw.CreateFormFile("jfr", "jfr")
part, err := mw.CreateFormFile(fieldName, filename)
if err != nil {
return fmt.Errorf("failed to create form file: %w", err)
}
Expand Down
Binary file added receiver/pyroscopereceiver/testdata/profile.pprof
Binary file not shown.

0 comments on commit c9ede0a

Please sign in to comment.