Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup internal agent controller #203

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package controller // import "go.opentelemetry.io/ebpf-profiler/internal/controller"

import (
"context"
"fmt"
"time"

log "github.com/sirupsen/logrus"
"github.com/tklauser/numcpus"

"go.opentelemetry.io/ebpf-profiler/host"
"go.opentelemetry.io/ebpf-profiler/hostmetadata"
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/tracer"
tracertypes "go.opentelemetry.io/ebpf-profiler/tracer/types"
"go.opentelemetry.io/ebpf-profiler/util"
)

// Controller is an instance that runs, manages and stops the agent.
type Controller struct {
config *Config
reporter reporter.Reporter
tracer *tracer.Tracer
}

// New creates a new controller
// The controller can set global configurations (such as the eBPF syscalls) on
// setup. So there should only ever be one running.
func New(cfg *Config) *Controller {
return &Controller{
config: cfg,
}
}

// Start starts the controller
func (c *Controller) Start(ctx context.Context) error {
if err := tracer.ProbeBPFSyscall(); err != nil {
return fmt.Errorf("failed to probe eBPF syscall: %w", err)
}

if err := tracer.ProbeTracepoint(); err != nil {
return fmt.Errorf("failed to probe tracepoint: %w", err)
}

presentCores, err := numcpus.GetPresent()
if err != nil {
return fmt.Errorf("failed to read CPU file: %w", err)
}

traceHandlerCacheSize :=
traceCacheSize(c.config.MonitorInterval, c.config.SamplesPerSecond, uint16(presentCores))

intervals := times.New(c.config.MonitorInterval,
c.config.ReporterInterval, c.config.ProbabilisticInterval)

// Start periodic synchronization with the realtime clock
times.StartRealtimeSync(ctx, c.config.ClockSyncInterval)

log.Debugf("Determining tracers to include")
includeTracers, err := tracertypes.Parse(c.config.Tracers)
if err != nil {
return fmt.Errorf("failed to parse the included tracers: %w", err)
}

metadataCollector := hostmetadata.NewCollector(c.config.CollAgentAddr)
metadataCollector.AddCustomData("os.type", "linux")

kernelVersion, err := helpers.GetKernelVersion()
if err != nil {
return fmt.Errorf("failed to get Linux kernel version: %w", err)
}
// OTel semantic introduced in https://github.com/open-telemetry/semantic-conventions/issues/66
metadataCollector.AddCustomData("os.kernel.release", kernelVersion)

// hostname and sourceIP will be populated from the root namespace.
hostname, sourceIP, err := helpers.GetHostnameAndSourceIP(c.config.CollAgentAddr)
if err != nil {
log.Warnf("Failed to fetch metadata information in the root namespace: %v", err)
}
metadataCollector.AddCustomData("host.name", hostname)
metadataCollector.AddCustomData("host.ip", sourceIP)

// Network operations to CA start here
var rep reporter.Reporter
// Connect to the collection agent
rep, err = reporter.Start(ctx, &reporter.Config{
CollAgentAddr: c.config.CollAgentAddr,
DisableTLS: c.config.DisableTLS,
MaxRPCMsgSize: 32 << 20, // 32 MiB
MaxGRPCRetries: 5,
GRPCOperationTimeout: intervals.GRPCOperationTimeout(),
GRPCStartupBackoffTime: intervals.GRPCStartupBackoffTime(),
GRPCConnectionTimeout: intervals.GRPCConnectionTimeout(),
ReportInterval: intervals.ReportInterval(),
CacheSize: traceHandlerCacheSize,
SamplesPerSecond: c.config.SamplesPerSecond,
KernelVersion: kernelVersion,
HostName: hostname,
IPAddress: sourceIP,
})
if err != nil {
return fmt.Errorf("failed to start reporting: %w", err)
}
c.reporter = rep

metrics.SetReporter(rep)

// Now that set the initial host metadata, start a goroutine to keep sending updates regularly.
metadataCollector.StartMetadataCollection(ctx, rep)

// Load the eBPF code and map definitions
trc, err := tracer.NewTracer(ctx, &tracer.Config{
Reporter: rep,
Intervals: intervals,
IncludeTracers: includeTracers,
FilterErrorFrames: !c.config.SendErrorFrames,
SamplesPerSecond: c.config.SamplesPerSecond,
MapScaleFactor: int(c.config.MapScaleFactor),
KernelVersionCheck: !c.config.NoKernelVersionCheck,
DebugTracer: c.config.VerboseMode,
BPFVerifierLogLevel: uint32(c.config.BpfVerifierLogLevel),
ProbabilisticInterval: c.config.ProbabilisticInterval,
ProbabilisticThreshold: c.config.ProbabilisticThreshold,
})
if err != nil {
return fmt.Errorf("failed to load eBPF tracer: %w", err)
}
c.tracer = trc
log.Printf("eBPF tracer loaded")

now := time.Now()
// Initial scan of /proc filesystem to list currently active PIDs and have them processed.
if err = trc.StartPIDEventProcessor(ctx); err != nil {
log.Errorf("failed to list processes from /proc: %v", err)
}
metrics.Add(metrics.IDProcPIDStartupMs, metrics.MetricValue(time.Since(now).Milliseconds()))
log.Debug("Completed initial PID listing")

// Attach our tracer to the perf event
if err := trc.AttachTracer(); err != nil {
return fmt.Errorf("failed to attach to perf event: %w", err)
}
log.Info("Attached tracer program")

if c.config.ProbabilisticThreshold < tracer.ProbabilisticThresholdMax {
trc.StartProbabilisticProfiling(ctx)
log.Printf("Enabled probabilistic profiling")
} else {
if err := trc.EnableProfiling(); err != nil {
return fmt.Errorf("failed to enable perf events: %w", err)
}
}

if err := trc.AttachSchedMonitor(); err != nil {
return fmt.Errorf("failed to attach scheduler monitor: %w", err)
}

// This log line is used in our system tests to verify if that the agent has started. So if you
// change this log line update also the system test.
log.Printf("Attached sched monitor")

if err := startTraceHandling(ctx, rep, intervals, trc, traceHandlerCacheSize); err != nil {
return fmt.Errorf("failed to start trace handling: %w", err)
}

return nil
}

// Shutdown stops the controller
func (c *Controller) Shutdown() {
log.Info("Stop processing ...")
if c.reporter != nil {
c.reporter.Stop()
}

if c.tracer != nil {
c.tracer.Close()
}
}

func startTraceHandling(ctx context.Context, rep reporter.TraceReporter,
intervals *times.Times, trc *tracer.Tracer, cacheSize uint32) error {
// Spawn monitors for the various result maps
traceCh := make(chan *host.Trace)

if err := trc.StartMapMonitors(ctx, traceCh); err != nil {
return fmt.Errorf("failed to start map monitors: %v", err)
}

_, err := tracehandler.Start(ctx, rep, trc.TraceProcessor(),
traceCh, intervals, cacheSize)
return err
}

// traceCacheSize defines the maximum number of elements for the caches in tracehandler.
//
// The caches in tracehandler have a size-"processing overhead" trade-off: Every cache miss will
// trigger additional processing for that trace in userspace (Go). For most maps, we use
// maxElementsPerInterval as a base sizing factor. For the tracehandler caches, we also multiply
// with traceCacheIntervals. For typical/small values of maxElementsPerInterval, this can lead to
// non-optimal map sizing (reduced cache_hit:cache_miss ratio and increased processing overhead).
// Simply increasing traceCacheIntervals is problematic when maxElementsPerInterval is large
// (e.g. too many CPU cores present) as we end up using too much memory. A minimum size is
// therefore used here.
func traceCacheSize(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
const (
traceCacheIntervals = 6
traceCacheMinSize = 65536
)

maxElements := maxElementsPerInterval(monitorInterval, samplesPerSecond, presentCPUCores)

size := maxElements * uint32(traceCacheIntervals)
if size < traceCacheMinSize {
size = traceCacheMinSize
}
return util.NextPowerOfTwo(size)
}

func maxElementsPerInterval(monitorInterval time.Duration, samplesPerSecond int,
presentCPUCores uint16) uint32 {
return uint32(uint16(samplesPerSecond) * uint16(monitorInterval.Seconds()) * presentCPUCores)
}
Loading