Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ebpf): configurable pipeline channel size #4182

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/tracee-ebpf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ func main() {
Value: 1024, // 4 MB of contiguous pages
Usage: "size, in pages, of the internal perf ring buffer used to send blobs from the kernel",
},
&cli.IntFlag{
Name: "pipeline-channel-size",
Value: 10000,
Usage: "size, in event objects, of each pipeline stage's output channel",
},
&cli.StringFlag{
Name: "install-path",
Value: "/tmp/tracee",
Expand Down
10 changes: 10 additions & 0 deletions cmd/tracee/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func initCmd() error {
return errfmt.WrapError(err)
}

rootCmd.Flags().Int(
"pipeline-channel-size",
10000,
"<size>\t\t\t\tSize, in event objects, of each pipeline stage's output channel",
)
err = viper.BindPFlag("pipeline-channel-size", rootCmd.Flags().Lookup("pipeline-channel-size"))
if err != nil {
return errfmt.WrapError(err)
}

rootCmd.Flags().StringArrayP(
"cache",
"a",
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/cobra/cobra.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ func GetTraceeRunner(c *cobra.Command, version string) (cmd.Runner, error) {
// Initialize a tracee config structure

cfg := config.Config{
PerfBufferSize: viper.GetInt("perf-buffer-size"),
BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"),
NoContainersEnrich: viper.GetBool("no-containers"),
PerfBufferSize: viper.GetInt("perf-buffer-size"),
BlobPerfBufferSize: viper.GetInt("blob-perf-buffer-size"),
PipelineChannelSize: viper.GetInt("pipeline-channel-size"),
NoContainersEnrich: viper.GetBool("no-containers"),
}

// OS release information
Expand Down
7 changes: 4 additions & 3 deletions pkg/cmd/urfave/urfave.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ func GetTraceeRunner(c *cli.Context, version string) (cmd.Runner, error) {

// Initialize a tracee config structure
cfg := config.Config{
PerfBufferSize: c.Int("perf-buffer-size"),
BlobPerfBufferSize: c.Int("blob-perf-buffer-size"),
NoContainersEnrich: c.Bool("no-containers"),
PerfBufferSize: c.Int("perf-buffer-size"),
BlobPerfBufferSize: c.Int("blob-perf-buffer-size"),
PipelineChannelSize: c.Int("pipeline-channel-size"),
NoContainersEnrich: c.Bool("no-containers"),
}

// Output command line flags
Expand Down
37 changes: 19 additions & 18 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@ import (
// NOTE: In the future, Tracee config will be changed at run time and will require
// proper management.
type Config struct {
InitialPolicies []*policy.Policy
Capture *CaptureConfig
Capabilities *CapabilitiesConfig
Output *OutputConfig
Cache queue.CacheConfig
ProcTree proctree.ProcTreeConfig
PerfBufferSize int
BlobPerfBufferSize int
MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns)
BTFObjPath string
BPFObjBytes []byte
KernelConfig *environment.KernelConfig
OSInfo *environment.OSInfo
Sockets runtime.Sockets
NoContainersEnrich bool
EngineConfig engine.Config
MetricsEnabled bool
DNSCacheConfig dnscache.Config
InitialPolicies []*policy.Policy
Capture *CaptureConfig
Capabilities *CapabilitiesConfig
Output *OutputConfig
Cache queue.CacheConfig
ProcTree proctree.ProcTreeConfig
PerfBufferSize int
BlobPerfBufferSize int
PipelineChannelSize int
MaxPidsCache int // maximum number of pids to cache per mnt ns (in Tracee.pidsInMntns)
BTFObjPath string
BPFObjBytes []byte
KernelConfig *environment.KernelConfig
OSInfo *environment.OSInfo
Sockets runtime.Sockets
NoContainersEnrich bool
EngineConfig engine.Config
MetricsEnabled bool
DNSCacheConfig dnscache.Config
}

// Validate does static validation of the configuration
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/events_enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (t *Tracee) enrichContainerEvents(ctx gocontext.Context, in <-chan *trace.E
// big lock
bLock := sync.RWMutex{}
// pipeline channels
out := make(chan *trace.Event, 10000)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)
// state machine for enrichment
enrichDone := make(map[uint64]bool)
Expand Down
10 changes: 5 additions & 5 deletions pkg/ebpf/events_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{})
// Sort stage: events go through a sorting function.

if t.config.Output.EventsSorting {
eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan)
eventsChan, errc = t.eventsSorter.StartPipeline(ctx, eventsChan, t.config.BlobPerfBufferSize)
errcList = append(errcList, errc)
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func (t *Tracee) handleEvents(ctx context.Context, initialized chan<- struct{})
// caching function that will enqueue the event into a queue. The queue is then de-queued
// by a different goroutine that will send the event down the pipeline.
func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan *trace.Event, chan error) {
out := make(chan *trace.Event, 10000)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)
done := make(chan struct{}, 1)

Expand Down Expand Up @@ -156,7 +156,7 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan
// through a decoding function that will decode the event from its raw format into a
// trace.Event type.
func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) {
out := make(chan *trace.Event, 10000)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)
sysCompatTranslation := events.Core.IDs32ToIDs()
go func() {
Expand Down Expand Up @@ -434,7 +434,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event
func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
<-chan *trace.Event, <-chan error,
) {
out := make(chan *trace.Event, 10000)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)

// Some "informational" events are started here (TODO: API server?)
Expand Down Expand Up @@ -509,7 +509,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) (
<-chan *trace.Event, <-chan error,
) {
out := make(chan *trace.Event)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)

go func() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ebpf/signature_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (

// engineEvents stage in the pipeline allows signatures detection to be executed in the pipeline
func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-chan *trace.Event, <-chan error) {
out := make(chan *trace.Event)
out := make(chan *trace.Event, t.config.PipelineChannelSize)
errc := make(chan error, 1)

engineOutput := make(chan *detect.Finding, 10000)
engineInput := make(chan protocol.Event, 10000)
engineOutputEvents := make(chan *trace.Event, 10000)
engineOutputEvents := make(chan *trace.Event, t.config.PipelineChannelSize)
source := engine.EventSources{Tracee: engineInput}

// Prepare built in data sources
Expand Down
2 changes: 1 addition & 1 deletion pkg/ebpf/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ func (t *Tracee) Subscribe(policyNames []string) (*streams.Stream, error) {
func (t *Tracee) subscribe(policyMask uint64) *streams.Stream {
// TODO: the channel size matches the pipeline channel size,
// but we should make it configurable in the future.
return t.streamsManager.Subscribe(policyMask, 10000)
return t.streamsManager.Subscribe(policyMask, t.config.PipelineChannelSize)
}

// Unsubscribe unsubscribes stream
Expand Down
4 changes: 2 additions & 2 deletions pkg/events/sorting/sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ func InitEventSorter() (*EventsChronologicalSorter, error) {
return &newSorter, nil
}

func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event) (
func (sorter *EventsChronologicalSorter) StartPipeline(ctx gocontext.Context, in <-chan *trace.Event, outChanSize int) (
chan *trace.Event, chan error) {
out := make(chan *trace.Event, 10000)
out := make(chan *trace.Event, outChanSize)
errc := make(chan error, 1)
go sorter.Start(in, out, ctx, errc)
return out, errc
Expand Down
1 change: 1 addition & 0 deletions tests/integration/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func startTracee(ctx context.Context, t *testing.T, cfg config.Config, output *c

cfg.PerfBufferSize = 1024
cfg.BlobPerfBufferSize = 1024
cfg.PipelineChannelSize = 10000

// No process tree in the integration tests
cfg.ProcTree = proctree.ProcTreeConfig{
Expand Down
Loading