Skip to content

Commit

Permalink
Merge pull request #59 from coroot/profiles_to_cluster_agent
Browse files Browse the repository at this point in the history
profiling: upload profiles to `coroot-cluster-agent`
  • Loading branch information
apetruhin authored Jan 18, 2024
2 parents cffd56a + f41bbc7 commit 4a80e36
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 39 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func main() {
klog.Exitln(err)
}

processInfoCh := profiling.Init()
processInfoCh := profiling.Init(machineId, hostname)

cr, err := containers.NewRegistry(registerer, kv, processInfoCh)
if err != nil {
Expand Down
62 changes: 24 additions & 38 deletions profiling/profiling.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"sync"
"time"

Expand All @@ -33,7 +32,8 @@ const (
)

var (
httpClient = http.Client{
constLabels labels.Labels
httpClient = http.Client{
Timeout: UploadTimeout,
}
endpointUrl *url.URL
Expand All @@ -43,14 +43,20 @@ var (
}
)

func Init() chan<- containers.ProcessInfo {
func Init(hostId, hostName string) chan<- containers.ProcessInfo {
endpoint := os.Getenv("PROFILES_ENDPOINT")
if endpoint == "" {
klog.Infoln("no profiles endpoint configured")
return nil
}
klog.Infoln("profiles endpoint:", endpoint)

constLabels = labels.Labels{
{Name: "host.name", Value: hostName},
{Name: "host.id", Value: hostId},
{Name: "profile.source", Value: "ebpf"},
}

var err error
endpointUrl, err = url.Parse(endpoint)
if err != nil {
Expand All @@ -60,7 +66,7 @@ func Init() chan<- containers.ProcessInfo {
reg := prometheus.NewRegistry()
so := ebpfspy.SessionOptions{
CollectUser: true,
CollectKernel: true,
CollectKernel: false,
UnknownSymbolModuleOffset: true,
UnknownSymbolAddress: false,
PythonEnabled: true,
Expand Down Expand Up @@ -125,17 +131,15 @@ func Stop() {
func collect() {
ticker := time.NewTicker(CollectInterval)
defer ticker.Stop()
tPrev := time.Now()
for t := range ticker.C {
tCurr := t
session.UpdateTargets(sd.TargetsOptions{})
bs := pprof.NewProfileBuilders(SampleRate)
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
p := targetFinder.get(pid)
if p == nil {
pi := targetFinder.get(pid)
if pi == nil {
return
}
b := bs.BuilderForTarget(p.hash, labels.Labels{{Value: p.labels}})
b := bs.BuilderForTarget(pi.hash, pi.labels)
b.AddSample(stack, value)
})
klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
Expand All @@ -145,34 +149,26 @@ func collect() {
t = time.Now()
var uploaded int
for _, b := range bs.Builders {
err = upload(b, tPrev, tCurr)
err = upload(b)
if err != nil {
klog.Errorln(err)
break
}
uploaded++
}
klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
tPrev = tCurr
}
}

func upload(b *pprof.ProfileBuilder, from, until time.Time) error {
func upload(b *pprof.ProfileBuilder) error {
u := *endpointUrl
q := u.Query()
q.Set("name", "ebpf"+b.Labels[0].Value)
q.Set("format", "pprof")
q.Set("from", strconv.Itoa(int(from.Unix())))
q.Set("until", strconv.Itoa(int(until.Unix())))
q.Set("spyName", "coroot-node-agent")
u.RawQuery = q.Encode()

b.Profile.SampleType[0].Type = "samples"
b.Profile.SampleType[0].Unit = "samples"
for _, s := range b.Profile.Sample {
s.Value[0] = s.Value[0] / b.Profile.Period
for _, l := range append(b.Labels, constLabels...) {
q.Set(l.Name, l.Value)
}
u.RawQuery = q.Encode()

b.Profile.DurationNanos = CollectInterval.Nanoseconds()
body := bytes.NewBuffer(nil)
_, err := b.Write(body)
if err != nil {
Expand All @@ -183,18 +179,15 @@ func upload(b *pprof.ProfileBuilder, from, until time.Time) error {
if err != nil {
return err
}

resp, err := httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

if resp.StatusCode != 200 {
return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
}
Expand Down Expand Up @@ -261,23 +254,16 @@ func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
type processInfo struct {
containerId string
startedAt time.Time
labels string
labels labels.Labels
hash uint64
}

func (pi *processInfo) calcHashAndLabels() {
hash := fnv.New64a()
_, _ = hash.Write([]byte(pi.containerId))
pi.hash = hash.Sum64()
var buf bytes.Buffer
buf.WriteByte('{')
buf.WriteString("container_id")
buf.WriteByte('=')
buf.WriteString(pi.containerId)
buf.WriteByte(',')
buf.WriteString("service_name")
buf.WriteByte('=')
buf.WriteString(common.ContainerIdToOtelServiceName(pi.containerId))
buf.WriteByte('}')
pi.labels = buf.String()
pi.labels = labels.Labels{
{Name: "service.name", Value: common.ContainerIdToOtelServiceName(pi.containerId)},
{Name: "container.id", Value: pi.containerId},
}
}

0 comments on commit 4a80e36

Please sign in to comment.