Skip to content

Commit

Permalink
[CWS] make use of statsd client interface (#11386)
Browse files Browse the repository at this point in the history
* Make use of statsd client interface

* Add dentry erpc vs map test
  • Loading branch information
safchain authored Mar 29, 2022
1 parent c2c8c20 commit ecf45b4
Show file tree
Hide file tree
Showing 26 changed files with 339 additions and 206 deletions.
2 changes: 1 addition & 1 deletion pkg/ebpf/bytecode/runtime/runtime_compilation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (tm *RuntimeCompilationTelemetry) GetTelemetry() map[string]int64 {
return stats
}

func (tm *RuntimeCompilationTelemetry) SendMetrics(client *statsd.Client) error {
func (tm *RuntimeCompilationTelemetry) SendMetrics(client statsd.ClientInterface) error {
tags := []string{fmt.Sprintf("version:%s", version.AgentVersion)}

var enabled float64 = 0
Expand Down
42 changes: 25 additions & 17 deletions pkg/security/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"
"unsafe"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/skydive-project/go-debouncer"
Expand All @@ -37,13 +38,17 @@ import (
"github.com/DataDog/datadog-agent/pkg/security/secl/rules"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/version"
"github.com/DataDog/datadog-go/v5/statsd"
)

const (
statsdPoolSize = 64
)

// Opts define module options
type Opts struct {
StatsdClient statsd.ClientInterface
}

// Module represents the system-probe module for the runtime security agent
type Module struct {
sync.RWMutex
Expand All @@ -52,7 +57,7 @@ type Module struct {
config *sconfig.Config
currentRuleSet atomic.Value
reloading uint64
statsdClient *statsd.Client
statsdClient statsd.ClientInterface
apiServer *APIServer
grpcServer *grpc.Server
listener net.Listener
Expand Down Expand Up @@ -107,7 +112,7 @@ func (m *Module) Init() error {

// initialize the eBPF manager and load the programs and maps in the kernel. At this stage, the probes are not
// running yet.
if err := m.probe.Init(m.statsdClient); err != nil {
if err := m.probe.Init(); err != nil {
return errors.Wrap(err, "failed to init probe")
}

Expand Down Expand Up @@ -497,21 +502,24 @@ func (m *Module) SetRulesetLoadedCallback(cb func(rs *rules.RuleSet, err *multie
m.rulesLoaded = cb
}

// NewModule instantiates a runtime security system-probe module
func NewModule(cfg *sconfig.Config) (module.Module, error) {
var statsdClient *statsd.Client
var err error
if cfg != nil {
statsdAddr := os.Getenv("STATSD_URL")
if statsdAddr == "" {
statsdAddr = cfg.StatsdAddr
}
func getStatdClient(cfg *sconfig.Config, opts ...Opts) (statsd.ClientInterface, error) {
if len(opts) != 0 && opts[0].StatsdClient != nil {
return opts[0].StatsdClient, nil
}

if statsdClient, err = statsd.New(statsdAddr, statsd.WithBufferPoolSize(statsdPoolSize)); err != nil {
return nil, err
}
} else {
log.Warn("metrics won't be sent to DataDog")
statsdAddr := os.Getenv("STATSD_URL")
if statsdAddr == "" {
statsdAddr = cfg.StatsdAddr
}

return statsd.New(statsdAddr, statsd.WithBufferPoolSize(statsdPoolSize))
}

// NewModule instantiates a runtime security system-probe module
func NewModule(cfg *sconfig.Config, opts ...Opts) (module.Module, error) {
statsdClient, err := getStatdClient(cfg, opts...)
if err != nil {
return nil, err
}

probe, err := sprobe.NewProbe(cfg, statsdClient)
Expand Down
4 changes: 2 additions & 2 deletions pkg/security/module/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ type RateLimiter struct {
sync.RWMutex
opts LimiterOpts
limiters map[rules.RuleID]*Limiter
statsdClient *statsd.Client
statsdClient statsd.ClientInterface
}

// NewRateLimiter initializes an empty rate limiter
func NewRateLimiter(client *statsd.Client, opts LimiterOpts) *RateLimiter {
func NewRateLimiter(client statsd.ClientInterface, opts LimiterOpts) *RateLimiter {
return &RateLimiter{
limiters: make(map[string]*Limiter),
statsdClient: client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/security/module/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type APIServer struct {
expiredEventsLock sync.RWMutex
expiredEvents map[rules.RuleID]*int64
rate *Limiter
statsdClient *statsd.Client
statsdClient statsd.ClientInterface
probe *sprobe.Probe
queueLock sync.Mutex
queue []*pendingMsg
Expand Down Expand Up @@ -468,7 +468,7 @@ func (a *APIServer) Apply(ruleIDs []rules.RuleID) {
}

// NewAPIServer returns a new gRPC event server
func NewAPIServer(cfg *config.Config, probe *sprobe.Probe, client *statsd.Client) *APIServer {
func NewAPIServer(cfg *config.Config, probe *sprobe.Probe, client statsd.ClientInterface) *APIServer {
es := &APIServer{
msgs: make(chan *api.SecurityEventMessage, cfg.EventServerBurst*3),
expiredEvents: make(map[rules.RuleID]*int64),
Expand Down
5 changes: 1 addition & 4 deletions pkg/security/probe/activity_dump_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"
"time"

"github.com/DataDog/datadog-go/v5/statsd"
"github.com/cilium/ebpf"
"github.com/pkg/errors"

Expand Down Expand Up @@ -44,7 +43,6 @@ type ActivityDumpManager struct {
tracedCgroupsMap *ebpf.Map
cgroupWaitListMap *ebpf.Map
tracedEventTypes []model.EventType
statsdClient *statsd.Client
outputDirectory string

activeDumps []*ActivityDump
Expand Down Expand Up @@ -102,7 +100,7 @@ func (adm *ActivityDumpManager) cleanup() {
}

// NewActivityDumpManager returns a new ActivityDumpManager instance
func NewActivityDumpManager(p *Probe, client *statsd.Client) (*ActivityDumpManager, error) {
func NewActivityDumpManager(p *Probe) (*ActivityDumpManager, error) {
tracedPIDs, found, err := p.manager.GetMap("traced_pids")
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,7 +152,6 @@ func NewActivityDumpManager(p *Probe, client *statsd.Client) (*ActivityDumpManag

return &ActivityDumpManager{
probe: p,
statsdClient: client,
tracedPIDsMap: tracedPIDs,
tracedCommsMap: tracedComms,
tracedEventTypesMap: tracedEventTypesMap,
Expand Down
5 changes: 3 additions & 2 deletions pkg/security/probe/constantfetch/available.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
package constantfetch

import (
"github.com/DataDog/datadog-go/v5/statsd"

"github.com/DataDog/datadog-agent/pkg/security/config"
"github.com/DataDog/datadog-agent/pkg/security/ebpf/kernel"
"github.com/DataDog/datadog-agent/pkg/security/log"
"github.com/DataDog/datadog-go/v5/statsd"
)

// GetAvailableConstantFetchers returns available constant fetchers
func GetAvailableConstantFetchers(config *config.Config, kv *kernel.Version, statsdClient *statsd.Client) []ConstantFetcher {
func GetAvailableConstantFetchers(config *config.Config, kv *kernel.Version, statsdClient statsd.ClientInterface) []ConstantFetcher {
fetchers := make([]ConstantFetcher, 0)

if coreFetcher, err := NewBTFConstantFetcherFromCurrentKernel(); err == nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/security/probe/constantfetch/available_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
package constantfetch

import (
"github.com/DataDog/datadog-go/v5/statsd"

"github.com/DataDog/datadog-agent/pkg/security/config"
"github.com/DataDog/datadog-agent/pkg/security/ebpf/kernel"
"github.com/DataDog/datadog-agent/pkg/security/log"
"github.com/DataDog/datadog-go/v5/statsd"
)

// GetAvailableConstantFetchers returns available constant fetchers
func GetAvailableConstantFetchers(config *config.Config, kv *kernel.Version, statsdClient *statsd.Client) []ConstantFetcher {
func GetAvailableConstantFetchers(config *config.Config, kv *kernel.Version, statsdClient statsd.ClientInterface) []ConstantFetcher {
fetchers := make([]ConstantFetcher, 0)

if coreFetcher, err := NewBTFConstantFetcherFromCurrentKernel(); err == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/security/probe/constantfetch/runtime_compiled.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type rcSymbolPair struct {

type RuntimeCompilationConstantFetcher struct {
config *ebpf.Config
statsdClient *statsd.Client
statsdClient statsd.ClientInterface
headers []string
symbolPairs []rcSymbolPair
result map[string]uint64
}

func NewRuntimeCompilationConstantFetcher(config *ebpf.Config, statsdClient *statsd.Client) *RuntimeCompilationConstantFetcher {
func NewRuntimeCompilationConstantFetcher(config *ebpf.Config, statsdClient statsd.ClientInterface) *RuntimeCompilationConstantFetcher {
return &RuntimeCompilationConstantFetcher{
config: config,
statsdClient: statsdClient,
Expand Down
10 changes: 5 additions & 5 deletions pkg/security/probe/dentry_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
// DentryResolver resolves inode/mountID to full paths
type DentryResolver struct {
config *config.Config
client *statsd.Client
statsdClient statsd.ClientInterface
pathnames *lib.Map
erpcStats [2]*lib.Map
bufferSelector *lib.Map
Expand Down Expand Up @@ -175,7 +175,7 @@ func (dr *DentryResolver) SendStats() error {
for resolutionType, value := range hitsCounters {
val := atomic.SwapInt64(value, 0)
if val > 0 {
_ = dr.client.Count(metrics.MetricDentryResolverHits, val, []string{resolutionType, resolution}, 1.0)
_ = dr.statsdClient.Count(metrics.MetricDentryResolverHits, val, []string{resolutionType, resolution}, 1.0)
}
}
}
Expand All @@ -184,7 +184,7 @@ func (dr *DentryResolver) SendStats() error {
for resolutionType, value := range hitsCounters {
val := atomic.SwapInt64(value, 0)
if val > 0 {
_ = dr.client.Count(metrics.MetricDentryResolverMiss, val, []string{resolutionType, resolution}, 1.0)
_ = dr.statsdClient.Count(metrics.MetricDentryResolverMiss, val, []string{resolutionType, resolution}, 1.0)
}
}
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (dr *DentryResolver) sendERPCStats() error {
}
for r, count := range counters {
if count > 0 {
_ = dr.client.Count(metrics.MetricDentryERPC, count, []string{fmt.Sprintf("ret:%s", r)}, 1.0)
_ = dr.statsdClient.Count(metrics.MetricDentryERPC, count, []string{fmt.Sprintf("ret:%s", r)}, 1.0)
}
}
for _, r := range allERPCRet() {
Expand Down Expand Up @@ -875,7 +875,7 @@ func NewDentryResolver(probe *Probe) (*DentryResolver, error) {

return &DentryResolver{
config: probe.config,
client: probe.statsdClient,
statsdClient: probe.statsdClient,
cache: make(map[uint32]*lru.Cache),
erpc: probe.erpc,
erpcSegment: segment,
Expand Down
17 changes: 9 additions & 8 deletions pkg/security/probe/discarder_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ package probe
import (
"fmt"

"github.com/DataDog/datadog-go/v5/statsd"

"github.com/DataDog/datadog-agent/pkg/security/ebpf"
"github.com/DataDog/datadog-agent/pkg/security/metrics"
"github.com/DataDog/datadog-agent/pkg/security/secl/model"
"github.com/DataDog/datadog-agent/pkg/security/utils"
"github.com/DataDog/datadog-go/v5/statsd"
lib "github.com/cilium/ebpf"
"github.com/pkg/errors"
)
Expand All @@ -28,7 +29,7 @@ type DiscarderStats struct {

// DiscarderMonitor defines a discarder monitor
type DiscarderMonitor struct {
client *statsd.Client
statsdClient statsd.ClientInterface
stats [2]*lib.Map
bufferSelector *lib.Map
statsZero []DiscarderStats
Expand Down Expand Up @@ -72,8 +73,8 @@ func (d *DiscarderMonitor) SendStats() error {
}
}

_ = d.client.Count(metrics.MetricDiscarderAdded, int64(stats.DiscardersAdded), tags, 1.0)
_ = d.client.Count(metrics.MetricEventDiscarded, int64(stats.EventDiscarded), tags, 1.0)
_ = d.statsdClient.Count(metrics.MetricDiscarderAdded, int64(stats.DiscardersAdded), tags, 1.0)
_ = d.statsdClient.Count(metrics.MetricEventDiscarded, int64(stats.EventDiscarded), tags, 1.0)

}
for i := uint32(0); i != uint32(model.LastDiscarderEventType); i++ {
Expand All @@ -85,16 +86,16 @@ func (d *DiscarderMonitor) SendStats() error {
}

// NewDiscarderMonitor returns a new DiscarderMonitor
func NewDiscarderMonitor(p *Probe, client *statsd.Client) (*DiscarderMonitor, error) {
func NewDiscarderMonitor(p *Probe) (*DiscarderMonitor, error) {
numCPU, err := utils.NumCPU()
if err != nil {
return nil, errors.Wrapf(err, "couldn't fetch the host CPU count")
}

d := &DiscarderMonitor{
client: client,
statsZero: make([]DiscarderStats, numCPU),
numCPU: numCPU,
statsdClient: p.statsdClient,
statsZero: make([]DiscarderStats, numCPU),
numCPU: numCPU,
}

statsFB, err := p.Map("discarder_stats_fb")
Expand Down
Loading

0 comments on commit ecf45b4

Please sign in to comment.