Skip to content

Commit

Permalink
Merge pull request opensearch-project#354 from kakkoyun/keep_track_of…
Browse files Browse the repository at this point in the history
…_stack_unwinding

profiler: Keep track of stack unwinding failures
  • Loading branch information
kakkoyun authored Apr 20, 2022
2 parents 64b003e + 14fc8ba commit 8e48635
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 58 deletions.
36 changes: 29 additions & 7 deletions parca-agent.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#define KBUILD_MODNAME "parca-agent"

#undef container_of
//#include "bpf_core_read.h"
#include <bpf_core_read.h>
#include <bpf_endian.h>
#include <bpf_helpers.h>
Expand All @@ -37,6 +36,8 @@ volatile const char bpf_metadata_name[] SEC(".rodata") =
// Max depth of each stack trace to track
#define MAX_STACK_DEPTH 127

/*================================ eBPF MAPS =================================*/

#define BPF_MAP(_name, _type, _key_type, _value_type, _max_entries) \
struct { \
__uint(type, _type); \
Expand Down Expand Up @@ -87,6 +88,8 @@ bpf_map_lookup_or_try_init(void *map, const void *key, const void *init) {
return bpf_map_lookup_elem(map, key);
}

/*================================= HOOKS ==================================*/

// This code gets a bit complex. Probably not suitable for casual hacking.
SEC("perf_event")
int do_sample(struct bpf_perf_event_data *ctx) {
Expand All @@ -98,11 +101,31 @@ int do_sample(struct bpf_perf_event_data *ctx) {
return 0;

// create map key
stack_count_key_t key = {.pid = tgid};

// get stacks
key.user_stack_id = bpf_get_stackid(ctx, &stack_traces, BPF_F_USER_STACK);
key.kernel_stack_id = bpf_get_stackid(ctx, &stack_traces, 0);
stack_count_key_t key = {
.pid = tgid,
.user_stack_id = 0,
.kernel_stack_id = 0,
};

// get user stack id
int stack_id = bpf_get_stackid(ctx, &stack_traces, BPF_F_USER_STACK);
if (stack_id >= 0)
key.user_stack_id = stack_id;

// get kernel stack id
int kernel_stack_id = bpf_get_stackid(ctx, &stack_traces, 0);
if (kernel_stack_id >= 0)
key.kernel_stack_id = kernel_stack_id;

// TODO(kakkoyun): failed bpf_get_stackid() could indicate stack unwinding
// issues; this could be a useful place to hook eh_frame-based stack
// unwinding.
// TODO(kakkoyun): Does returned error code help?
// if (key.user_stack_id == 0 && key.kernel_stack_id == 0)
// Both user and kernel stacks are empty.
// However, for now, we still want to count the event, to keep track of the
// number of the failed stack unwinding attempts.
// return 0;

u64 zero = 0;
u64 *count;
Expand All @@ -111,7 +134,6 @@ int do_sample(struct bpf_perf_event_data *ctx) {
return 0;

__sync_fetch_and_add(count, 1);

return 0;
}

Expand Down
190 changes: 139 additions & 51 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ import (
//go:embed parca-agent.bpf.o
var bpfObj []byte

var errUnrecoverable = errors.New("unrecoverable error")

const (
stackDepth = 127 // Always needs to be sync with MAX_STACK_DEPTH in parca-agent.bpf.c
doubleStackDepth = 254
Expand Down Expand Up @@ -118,9 +120,51 @@ func (m bpfMaps) clean() error {
return nil
}

type metrics struct {
reg prometheus.Registerer

missingStacks *prometheus.CounterVec
missingPIDs prometheus.Counter
failedStackUnwindingAttempts *prometheus.CounterVec
}

func (m metrics) unregister() bool {
return m.reg.Unregister(m.missingStacks) &&
m.reg.Unregister(m.missingPIDs) &&
m.reg.Unregister(m.failedStackUnwindingAttempts)
}

func newMetrics(reg prometheus.Registerer, target model.LabelSet) *metrics {
return &metrics{
reg: reg,
missingStacks: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "parca_agent_profiler_missing_stacks_total",
Help: "Number of missing profile stacks",
ConstLabels: map[string]string{"target": target.String()},
},
[]string{"type"},
),
missingPIDs: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "parca_agent_profiler_missing_pid_total",
Help: "Number of missing PIDs",
ConstLabels: map[string]string{"target": target.String()},
},
),
failedStackUnwindingAttempts: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "parca_agent_profiler_failed_stack_unwinding_attempts_total",
Help: "Number of failed stack unwinding attempts",
ConstLabels: map[string]string{"target": target.String()},
},
[]string{"type"},
),
}
}

type CgroupProfiler struct {
logger log.Logger
reg prometheus.Registerer

mtx *sync.RWMutex
cancel func()
Expand All @@ -130,9 +174,9 @@ type CgroupProfiler struct {
ksymCache *ksym.Cache
objCache objectfile.Cache

bpfMaps *bpfMaps
bpfMaps *bpfMaps
byteOrder binary.ByteOrder

missingStacks *prometheus.CounterVec
lastError error
lastProfileTakenAt time.Time

Expand All @@ -143,6 +187,8 @@ type CgroupProfiler struct {
profilingDuration time.Duration

profileBufferPool sync.Pool

metrics *metrics
}

func NewCgroupProfiler(
Expand All @@ -158,7 +204,6 @@ func NewCgroupProfiler(
) *CgroupProfiler {
return &CgroupProfiler{
logger: log.With(logger, "labels", target.String()),
reg: reg,
mtx: &sync.RWMutex{},
target: target,
profilingDuration: profilingDuration,
Expand All @@ -172,19 +217,13 @@ func NewCgroupProfiler(
debugInfoClient,
tmp,
),
missingStacks: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Name: "parca_agent_profiler_missing_stacks_total",
Help: "Number of missing profile stacks",
ConstLabels: map[string]string{"target": target.String()},
},
[]string{"type"},
),
profileBufferPool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
},
byteOrder: byteorder.GetHostByteOrder(),
metrics: newMetrics(reg, target),
}
}

Expand Down Expand Up @@ -212,8 +251,8 @@ func (p *CgroupProfiler) Stop() {
p.mtx.Lock()
defer p.mtx.Unlock()
level.Debug(p.logger).Log("msg", "stopping cgroup profiler")
if !p.reg.Unregister(p.missingStacks) {
level.Debug(p.logger).Log("msg", "cannot unregister metric")
if !p.metrics.unregister() {
level.Debug(p.logger).Log("msg", "cannot unregister metrics")
}
if p.cancel != nil {
p.cancel()
Expand Down Expand Up @@ -366,64 +405,55 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
samples := map[[doubleStackDepth]uint64]*profile.Sample{}

it := p.bpfMaps.counts.Iterator()
byteOrder := byteorder.GetHostByteOrder()

// TODO(brancz): Use libbpf batch functions.
for it.Next() {
// This byte slice is only valid for this iteration, so it must be
// copied if we want to do anything with it outside of this loop.
keyBytes := it.Key()

r := bytes.NewBuffer(keyBytes)

pidBytes := make([]byte, 4)
if _, err := io.ReadFull(r, pidBytes); err != nil {
return fmt.Errorf("read pid bytes: %w", err)
}
pid := byteOrder.Uint32(pidBytes)

userStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, userStackIDBytes); err != nil {
return fmt.Errorf("read user stack ID bytes: %w", err)
pid := p.byteOrder.Uint32(pidBytes)
if pid == 0 {
level.Debug(p.logger).Log("msg", "missing pid")
p.metrics.missingPIDs.Inc()
continue
}
userStackID := int32(byteOrder.Uint32(userStackIDBytes))

kernelStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, kernelStackIDBytes); err != nil {
return fmt.Errorf("read kernel stack ID bytes: %w", err)
// Twice the stack depth because we have a user and a potential Kernel stack.
// Read order matters, since we read from the key buffer.
stack := [doubleStackDepth]uint64{}
userErr := p.readUserStack(r, &stack)
if userErr != nil {
if errors.Is(userErr, errUnrecoverable) {
return userErr
}
level.Debug(p.logger).Log("msg", "failed to read user stack", "err", userErr)
}
kernelStackID := int32(byteOrder.Uint32(kernelStackIDBytes))

valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return fmt.Errorf("get count value: %w", err)
kernelErr := p.readKernelStack(r, &stack)
if kernelErr != nil {
if errors.Is(kernelErr, errUnrecoverable) {
return kernelErr
}
level.Debug(p.logger).Log("msg", "failed to read kernel stack", "err", kernelErr)
}
value := byteOrder.Uint64(valueBytes)

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.missingStacks.WithLabelValues("user").Inc()
if userErr != nil && kernelErr != nil {
// Both stacks are missing. Nothing to do.
continue
}

// Twice the stack depth because we have a user and a potential Kernel stack.
stack := [doubleStackDepth]uint64{}
err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[:stackDepth])
value, err := p.readValue(keyBytes)
if err != nil {
return fmt.Errorf("read user stack trace: %w", err)
return fmt.Errorf("read value: %w", err)
}

if kernelStackID >= 0 {
stackBytes, err = p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.missingStacks.WithLabelValues("kernel").Inc()
continue
}

err = binary.Read(bytes.NewBuffer(stackBytes), byteOrder, stack[stackDepth:])
if err != nil {
return fmt.Errorf("read kernel stack trace: %w", err)
}
if value == 0 {
// This should never happen, but it's here just in case.
// If we have a zero value, we don't want to add it to the profile.
continue
}

sample, ok := samples[stack]
Expand Down Expand Up @@ -587,6 +617,64 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) readUserStack(r *bytes.Buffer, stack *[254]uint64) error {
userStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, userStackIDBytes); err != nil {
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

userStackID := int32(p.byteOrder.Uint32(userStackIDBytes))
if userStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("user").Inc()
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
}

if err := binary.Read(bytes.NewBuffer(stackBytes), p.byteOrder, stack[:stackDepth]); err != nil {
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

return nil
}

func (p *CgroupProfiler) readKernelStack(r *bytes.Buffer, stack *[254]uint64) error {
kernelStackIDBytes := make([]byte, 4)
if _, err := io.ReadFull(r, kernelStackIDBytes); err != nil {
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

kernelStackID := int32(p.byteOrder.Uint32(kernelStackIDBytes))
if kernelStackID == 0 {
p.metrics.failedStackUnwindingAttempts.WithLabelValues("kernel").Inc()
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
}

if err := binary.Read(bytes.NewBuffer(stackBytes), p.byteOrder, stack[stackDepth:]); err != nil {
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

return nil
}

func (p *CgroupProfiler) readValue(keyBytes []byte) (uint64, error) {
valueBytes, err := p.bpfMaps.counts.GetValue(unsafe.Pointer(&keyBytes[0]))
if err != nil {
return 0, fmt.Errorf("get count value: %w", err)
}
return p.byteOrder.Uint64(valueBytes), nil
}

func (p *CgroupProfiler) normalizeAddress(m *profile.Mapping, pid uint32, addr uint64) uint64 {
if m == nil {
return addr
Expand Down
Loading

0 comments on commit 8e48635

Please sign in to comment.