From f3d7bfc8e515ad3f4c052a7e44f730e3d0dc0bf7 Mon Sep 17 00:00:00 2001 From: Nadav Strahilevitz Date: Wed, 10 Jul 2024 14:27:04 +0000 Subject: [PATCH] feat(ebpf): configurable pipeline channel size Allow configuring the event channels in pipeline stages using the `pipeline-channel-size` flag and config entry. Rigorous testing of the effect of playing with this value has yet to be done, so the default value is currently 10000 as hardcoded before. Tentatitevely, reducing the size of this configuration value may help reduce the mean memory usage, perhaps reduce channel contention, but may be at the cost of a greater risk for event loss. --- cmd/tracee-ebpf/main.go | 5 +++++ cmd/tracee/cmd/root.go | 10 ++++++++++ pkg/cmd/cobra/cobra.go | 7 ++++--- pkg/cmd/urfave/urfave.go | 7 ++++--- pkg/config/config.go | 37 ++++++++++++++++++----------------- pkg/ebpf/events_enrich.go | 2 +- pkg/ebpf/events_pipeline.go | 10 +++++----- pkg/ebpf/signature_engine.go | 4 ++-- pkg/ebpf/tracee.go | 2 +- pkg/events/sorting/sorting.go | 4 ++-- tests/integration/tracee.go | 1 + 11 files changed, 54 insertions(+), 35 deletions(-) diff --git a/cmd/tracee-ebpf/main.go b/cmd/tracee-ebpf/main.go index cae2fcd17d47..69919130be73 100644 --- a/cmd/tracee-ebpf/main.go +++ b/cmd/tracee-ebpf/main.go @@ -118,6 +118,11 @@ func main() { Value: 1024, // 4 MB of contiguous pages Usage: "size, in pages, of the internal perf ring buffer used to send blobs from the kernel", }, + &cli.IntFlag{ + Name: "pipeline-channel-size", + Value: 10000, + Usage: "size, in event objects, of each pipeline stage's output channel", + }, &cli.StringFlag{ Name: "install-path", Value: "/tmp/tracee", diff --git a/cmd/tracee/cmd/root.go b/cmd/tracee/cmd/root.go index 8aeefe27df83..75e2656009b2 100644 --- a/cmd/tracee/cmd/root.go +++ b/cmd/tracee/cmd/root.go @@ -212,6 +212,16 @@ func initCmd() error { return errfmt.WrapError(err) } + rootCmd.Flags().Int( + "pipeline-channel-size", + 10000, + "\t\t\t\tSize, in event objects, of each pipeline stage's output channel", + ) + err = viper.BindPFlag("pipeline-channel-size", rootCmd.Flags().Lookup("pipeline-channel-size")) + if err != nil { + return errfmt.WrapError(err) + } + rootCmd.Flags().StringArrayP( "cache", "a", diff --git a/pkg/cmd/cobra/cobra.go b/pkg/cmd/cobra/cobra.go index 4ad94ff839a9..c58bf045aa28 100644 --- a/pkg/cmd/cobra/cobra.go +++ b/pkg/cmd/cobra/cobra.go @@ -72,9 +72,10 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) { // Initialize a tracee config structure cfg := config.Config{ - PerfBufferSize: viper.GetInt("perf-buffer-size"), - BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"), - NoContainersEnrich: viper.GetBool("no-containers"), + PerfBufferSize: viper.GetInt("perf-buffer-size"), + BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"), + PipelineChannelSize: viper.GetInt("pipeline-channel-size"), + NoContainersEnrich: viper.GetBool("no-containers"), } // OS release information diff --git a/pkg/cmd/urfave/urfave.go b/pkg/cmd/urfave/urfave.go index 8b7bfd967ba1..8d4f7c110839 100644 --- a/pkg/cmd/urfave/urfave.go +++ b/pkg/cmd/urfave/urfave.go @@ -19,9 +19,10 @@ func GetTraceeRunner(c *cli.Context, version string) (cmd.Runner, error) { // Initialize a tracee config structure cfg := config.Config{ - PerfBufferSize: c.Int("perf-buffer-size"), - BlobPerfBufferSize: c.Int("blob-perf-buffer-size"), - NoContainersEnrich: c.Bool("no-containers"), + PerfBufferSize: c.Int("perf-buffer-size"), + BlobPerfBufferSize: c.Int("blob-perf-buffer-size"), + PipelineChannelSize: c.Int("pipeline-channel-size"), + NoContainersEnrich: c.Bool("no-containers"), } // Output command line flags diff --git a/pkg/config/config.go b/pkg/config/config.go index d41fae6d2705..985247aeb47c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -18,24 +18,25 @@ import ( // NOTE: In the future, Tracee config will be changed at run time and will require // proper management. type Config struct { - InitialPolicies []*policy.Policy - Capture *CaptureConfig - Capabilities *CapabilitiesConfig - Output *OutputConfig - Cache queue.CacheConfig - ProcTree proctree.ProcTreeConfig - PerfBufferSize int - BlobPerfBufferSize int - MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns) - BTFObjPath string - BPFObjBytes []byte - KernelConfig *environment.KernelConfig - OSInfo *environment.OSInfo - Sockets runtime.Sockets - NoContainersEnrich bool - EngineConfig engine.Config - MetricsEnabled bool - DNSCacheConfig dnscache.Config + InitialPolicies []*policy.Policy + Capture *CaptureConfig + Capabilities *CapabilitiesConfig + Output *OutputConfig + Cache queue.CacheConfig + ProcTree proctree.ProcTreeConfig + PerfBufferSize int + BlobPerfBufferSize int + PipelineChannelSize int + MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns) + BTFObjPath string + BPFObjBytes []byte + KernelConfig *environment.KernelConfig + OSInfo *environment.OSInfo + Sockets runtime.Sockets + NoContainersEnrich bool + EngineConfig engine.Config + MetricsEnabled bool + DNSCacheConfig dnscache.Config } // Validate does static validation of the configuration diff --git a/pkg/ebpf/events_enrich.go b/pkg/ebpf/events_enrich.go index c0dce6c50d1d..ae52968ffb6d 100644 --- a/pkg/ebpf/events_enrich.go +++ b/pkg/ebpf/events_enrich.go @@ -70,7 +70,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E // big lock bLock := sync.RWMutex{} // pipeline channels - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) // state machine for enrichment enrichDone := make(map[uint64]bool) diff --git a/pkg/ebpf/events_pipeline.go b/pkg/ebpf/events_pipeline.go index c30ee137f50e..7af714af3c3e 100644 --- a/pkg/ebpf/events_pipeline.go +++ b/pkg/ebpf/events_pipeline.go @@ -47,7 +47,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{}) // Sort stage: events go through a sorting function. if t.config.Output.EventsSorting { - eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan) + eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan, t.config.BlobPerfBufferSize) errcList = append(errcList, errc) } @@ -112,7 +112,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{}) // caching function that will enqueue the event into a queue. The queue is then de-queued // by a different goroutine that will send the event down the pipeline. func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan *trace.Event, chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) done := make(chan struct{}, 1) @@ -156,7 +156,7 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan // through a decoding function that will decode the event from its raw format into a // trace.Event type. func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) sysCompatTranslation := events.Core.IDs32ToIDs() go func() { @@ -434,7 +434,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( <-chan *trace.Event, <-chan error, ) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) // Some "informational" events are started here (TODO: API server?) @@ -509,7 +509,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) ( <-chan *trace.Event, <-chan error, ) { - out := make(chan *trace.Event) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) go func() { diff --git a/pkg/ebpf/signature_engine.go b/pkg/ebpf/signature_engine.go index 8bbaff96a567..0b783db5fc6f 100644 --- a/pkg/ebpf/signature_engine.go +++ b/pkg/ebpf/signature_engine.go @@ -16,12 +16,12 @@ import ( // engineEvents stage in the pipeline allows signatures detection to be executed in the pipeline func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-chan *trace.Event, <-chan error) { - out := make(chan *trace.Event) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) engineOutput := make(chan *detect.Finding, 10000) engineInput := make(chan protocol.Event, 10000) - engineOutputEvents := make(chan *trace.Event, 10000) + engineOutputEvents := make(chan *trace.Event, t.config.PipelineChannelSize) source := engine.EventSources{Tracee: engineInput} // Prepare built in data sources diff --git a/pkg/ebpf/tracee.go b/pkg/ebpf/tracee.go index cff78f4743aa..fb3c5aa96485 100644 --- a/pkg/ebpf/tracee.go +++ b/pkg/ebpf/tracee.go @@ -2023,7 +2023,7 @@ func (t *Tracee) Subscribe(policyNames []string) (*streams.Stream, error) { func (t *Tracee) subscribe(policyMask uint64) *streams.Stream { // TODO: the channel size matches the pipeline channel size, // but we should make it configurable in the future. - return t.streamsManager.Subscribe(policyMask, 10000) + return t.streamsManager.Subscribe(policyMask, t.config.PipelineChannelSize) } // Unsubscribe unsubscribes stream diff --git a/pkg/events/sorting/sorting.go b/pkg/events/sorting/sorting.go index 9681a69757a5..291a4fc4072f 100644 --- a/pkg/events/sorting/sorting.go +++ b/pkg/events/sorting/sorting.go @@ -142,9 +142,9 @@ func InitEventSorter() (*EventsChronologicalSorter, error) { return &newSorter, nil } -func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event) ( +func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event, outChanSize int) ( chan *trace.Event, chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, outChanSize) errc := make(chan error, 1) go sorter.Start(in, out, ctx, errc) return out, errc diff --git a/tests/integration/tracee.go b/tests/integration/tracee.go index 2da4e7ec39f8..17728d499f34 100644 --- a/tests/integration/tracee.go +++ b/tests/integration/tracee.go @@ -95,6 +95,7 @@ func startTracee(ctx context.Context, t *testing.T, cfg config.Config, output *c cfg.PerfBufferSize = 1024 cfg.BlobPerfBufferSize = 1024 + cfg.PipelineChannelSize = 10000 // No process tree in the integration tests cfg.ProcTree = proctree.ProcTreeConfig{