diff --git a/cmd/tracee-ebpf/main.go b/cmd/tracee-ebpf/main.go index cae2fcd17d47..69919130be73 100644 --- a/cmd/tracee-ebpf/main.go +++ b/cmd/tracee-ebpf/main.go @@ -118,6 +118,11 @@ func main() { Value: 1024, // 4 MB of contiguous pages Usage: "size, in pages, of the internal perf ring buffer used to send blobs from the kernel", }, + &cli.IntFlag{ + Name: "pipeline-channel-size", + Value: 10000, + Usage: "size, in event objects, of each pipeline stage's output channel", + }, &cli.StringFlag{ Name: "install-path", Value: "/tmp/tracee", diff --git a/cmd/tracee/cmd/root.go b/cmd/tracee/cmd/root.go index 8aeefe27df83..75e2656009b2 100644 --- a/cmd/tracee/cmd/root.go +++ b/cmd/tracee/cmd/root.go @@ -212,6 +212,16 @@ func initCmd() error { return errfmt.WrapError(err) } + rootCmd.Flags().Int( + "pipeline-channel-size", + 10000, + "\t\t\t\tSize, in event objects, of each pipeline stage's output channel", + ) + err = viper.BindPFlag("pipeline-channel-size", rootCmd.Flags().Lookup("pipeline-channel-size")) + if err != nil { + return errfmt.WrapError(err) + } + rootCmd.Flags().StringArrayP( "cache", "a", diff --git a/pkg/cmd/cobra/cobra.go b/pkg/cmd/cobra/cobra.go index 4ad94ff839a9..c58bf045aa28 100644 --- a/pkg/cmd/cobra/cobra.go +++ b/pkg/cmd/cobra/cobra.go @@ -72,9 +72,10 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) { // Initialize a tracee config structure cfg := config.Config{ - PerfBufferSize: viper.GetInt("perf-buffer-size"), - BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"), - NoContainersEnrich: viper.GetBool("no-containers"), + PerfBufferSize: viper.GetInt("perf-buffer-size"), + BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"), + PipelineChannelSize: viper.GetInt("pipeline-channel-size"), + NoContainersEnrich: viper.GetBool("no-containers"), } // OS release information diff --git a/pkg/cmd/urfave/urfave.go b/pkg/cmd/urfave/urfave.go index 8b7bfd967ba1..8d4f7c110839 100644 --- a/pkg/cmd/urfave/urfave.go +++ b/pkg/cmd/urfave/urfave.go @@ -19,9 +19,10 @@ func GetTraceeRunner(c *cli.Context, version string) (cmd.Runner, error) { // Initialize a tracee config structure cfg := config.Config{ - PerfBufferSize: c.Int("perf-buffer-size"), - BlobPerfBufferSize: c.Int("blob-perf-buffer-size"), - NoContainersEnrich: c.Bool("no-containers"), + PerfBufferSize: c.Int("perf-buffer-size"), + BlobPerfBufferSize: c.Int("blob-perf-buffer-size"), + PipelineChannelSize: c.Int("pipeline-channel-size"), + NoContainersEnrich: c.Bool("no-containers"), } // Output command line flags diff --git a/pkg/config/config.go b/pkg/config/config.go index d41fae6d2705..985247aeb47c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -18,24 +18,25 @@ import ( // NOTE: In the future, Tracee config will be changed at run time and will require // proper management. type Config struct { - InitialPolicies []*policy.Policy - Capture *CaptureConfig - Capabilities *CapabilitiesConfig - Output *OutputConfig - Cache queue.CacheConfig - ProcTree proctree.ProcTreeConfig - PerfBufferSize int - BlobPerfBufferSize int - MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns) - BTFObjPath string - BPFObjBytes []byte - KernelConfig *environment.KernelConfig - OSInfo *environment.OSInfo - Sockets runtime.Sockets - NoContainersEnrich bool - EngineConfig engine.Config - MetricsEnabled bool - DNSCacheConfig dnscache.Config + InitialPolicies []*policy.Policy + Capture *CaptureConfig + Capabilities *CapabilitiesConfig + Output *OutputConfig + Cache queue.CacheConfig + ProcTree proctree.ProcTreeConfig + PerfBufferSize int + BlobPerfBufferSize int + PipelineChannelSize int + MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns) + BTFObjPath string + BPFObjBytes []byte + KernelConfig *environment.KernelConfig + OSInfo *environment.OSInfo + Sockets runtime.Sockets + NoContainersEnrich bool + EngineConfig engine.Config + MetricsEnabled bool + DNSCacheConfig dnscache.Config } // Validate does static validation of the configuration diff --git a/pkg/ebpf/events_enrich.go b/pkg/ebpf/events_enrich.go index c0dce6c50d1d..ae52968ffb6d 100644 --- a/pkg/ebpf/events_enrich.go +++ b/pkg/ebpf/events_enrich.go @@ -70,7 +70,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E // big lock bLock := sync.RWMutex{} // pipeline channels - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) // state machine for enrichment enrichDone := make(map[uint64]bool) diff --git a/pkg/ebpf/events_pipeline.go b/pkg/ebpf/events_pipeline.go index c30ee137f50e..7af714af3c3e 100644 --- a/pkg/ebpf/events_pipeline.go +++ b/pkg/ebpf/events_pipeline.go @@ -47,7 +47,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{}) // Sort stage: events go through a sorting function. if t.config.Output.EventsSorting { - eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan) + eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan, t.config.BlobPerfBufferSize) errcList = append(errcList, errc) } @@ -112,7 +112,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{}) // caching function that will enqueue the event into a queue. The queue is then de-queued // by a different goroutine that will send the event down the pipeline. func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan *trace.Event, chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) done := make(chan struct{}, 1) @@ -156,7 +156,7 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan // through a decoding function that will decode the event from its raw format into a // trace.Event type. func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) sysCompatTranslation := events.Core.IDs32ToIDs() go func() { @@ -434,7 +434,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( <-chan *trace.Event, <-chan error, ) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) // Some "informational" events are started here (TODO: API server?) @@ -509,7 +509,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) ( <-chan *trace.Event, <-chan error, ) { - out := make(chan *trace.Event) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) go func() { diff --git a/pkg/ebpf/signature_engine.go b/pkg/ebpf/signature_engine.go index 8bbaff96a567..0b783db5fc6f 100644 --- a/pkg/ebpf/signature_engine.go +++ b/pkg/ebpf/signature_engine.go @@ -16,12 +16,12 @@ import ( // engineEvents stage in the pipeline allows signatures detection to be executed in the pipeline func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-chan *trace.Event, <-chan error) { - out := make(chan *trace.Event) + out := make(chan *trace.Event, t.config.PipelineChannelSize) errc := make(chan error, 1) engineOutput := make(chan *detect.Finding, 10000) engineInput := make(chan protocol.Event, 10000) - engineOutputEvents := make(chan *trace.Event, 10000) + engineOutputEvents := make(chan *trace.Event, t.config.PipelineChannelSize) source := engine.EventSources{Tracee: engineInput} // Prepare built in data sources diff --git a/pkg/ebpf/tracee.go b/pkg/ebpf/tracee.go index cff78f4743aa..fb3c5aa96485 100644 --- a/pkg/ebpf/tracee.go +++ b/pkg/ebpf/tracee.go @@ -2023,7 +2023,7 @@ func (t *Tracee) Subscribe(policyNames []string) (*streams.Stream, error) { func (t *Tracee) subscribe(policyMask uint64) *streams.Stream { // TODO: the channel size matches the pipeline channel size, // but we should make it configurable in the future. - return t.streamsManager.Subscribe(policyMask, 10000) + return t.streamsManager.Subscribe(policyMask, t.config.PipelineChannelSize) } // Unsubscribe unsubscribes stream diff --git a/pkg/events/sorting/sorting.go b/pkg/events/sorting/sorting.go index 9681a69757a5..291a4fc4072f 100644 --- a/pkg/events/sorting/sorting.go +++ b/pkg/events/sorting/sorting.go @@ -142,9 +142,9 @@ func InitEventSorter() (*EventsChronologicalSorter, error) { return &newSorter, nil } -func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event) ( +func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event, outChanSize int) ( chan *trace.Event, chan error) { - out := make(chan *trace.Event, 10000) + out := make(chan *trace.Event, outChanSize) errc := make(chan error, 1) go sorter.Start(in, out, ctx, errc) return out, errc diff --git a/tests/integration/tracee.go b/tests/integration/tracee.go index 2da4e7ec39f8..17728d499f34 100644 --- a/tests/integration/tracee.go +++ b/tests/integration/tracee.go @@ -95,6 +95,7 @@ func startTracee(ctx context.Context, t *testing.T, cfg config.Config, output *c cfg.PerfBufferSize = 1024 cfg.BlobPerfBufferSize = 1024 + cfg.PipelineChannelSize = 10000 // No process tree in the integration tests cfg.ProcTree = proctree.ProcTreeConfig{