Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented broker and consume API for tracee-ebpf #793

Merged
merged 1 commit into from
Jun 30, 2021

Conversation

mtcherni95
Copy link
Contributor

This commit includes:

  • a Broker API with the Streamer interface and implementation of IOStreamer (aka printer).
  • the creation of Consume() API exposed by Tracee.

@mtcherni95 mtcherni95 force-pushed the consume-and-broker-api branch 2 times, most recently from 0520bc9 to 70a6ce9 Compare June 27, 2021 10:11
@@ -889,12 +863,19 @@ func (t *Tracee) writeProfilerStats(wr io.Writer) error {
return nil
}

func (t *Tracee) Consume() (<-chan external.Event, error) {
Copy link
Contributor Author

@mtcherni95 mtcherni95 Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The model proposed implies that the ownership of the channel should be on the producer (indeed who writes on the channel should be responsible to close it). IMO this is a better design compared to a "pass channel from configuration" design, where the consumer is supposed to be the owner of the channel. Moreover, in the model proposed by the commit we will still be able in future to add multiple channels (just return a struct/map of channels).

@itaysk itaysk self-assigned this Jun 27, 2021
Copy link
Collaborator

@itaysk itaysk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great job undertaking this complex task. It is a major change to tracee to some newly introduced concepts might need fine-tuning


import (
"fmt"
cmap "github.com/orcaman/concurrent-map"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what benefits does this library have over stdlib's sync.Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stdlib sync.Map is designed for append-only scenarios. So if you want to use the map for something more like in-memory db, you might benefit from using our version. You can read more about it in the golang repo, for example here:golang/go#21035 and here: https://stackoverflow.com/questions/11063473/map-with-concurrent-access

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this matters enough to make the map synchronous. Most (all?) implementations will register 1/2/maybe 3 streamers in a sequential manner. Even if you want to make the map thread safe, "append-only" sounds reasonable to our use case. Also, the worst case with sync.Map is a slight performance penalty but only in the insertion (and this is for a scenario that is very unlikely to happen). I don't think we need an in-memory high-throughput cache here

"github.com/aquasecurity/tracee/tracee-ebpf/tracee/streamers"
)

type Broker struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a NewBroker function that does all the one-time setup here instead of having the consumer do it.

func NewBroker(channel chan Event, stats Stats) (Broker, error) {
  // initialize a new broker
  // initialize it's internal cmap
  // save channel and stats on the struct
  // return struct
} 

Comment on lines 92 to 99
b := broker.Broker{Streamers: cmap.New()}
if err := b.Register(iostreamer); err != nil {
return fmt.Errorf("failed to register IOStreamer: %w", err)
}
b.ChanEvents, err = t.Consume()
if err != nil {
return fmt.Errorf("failed to consume event channels: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the consumer has too much visibility and control into the broker. we should simplify things for the consumer so it instantiates a new broker and the setup is done in the broker package (see my other comment in broker.go)

if ok := b.Streamers.SetIfAbsent(strconv.FormatUint(b.nextAvailStreamerId, 10), streamer); !ok {
return fmt.Errorf("failed to subscribe streamer")
}
streamer.SetId(b.nextAvailStreamerId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should the streamer care which id the broker gave it? the id is purely for disambiguation in the broker's internal map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mainly because the streamer can call the broker.Unregister API by passing its ID.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this explains why the broker should care about the IDs, but I asked why the streamer cares

return fmt.Errorf("failed to subscribe streamer")
}
streamer.SetId(b.nextAvailStreamerId)
streamer.Preamble()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preamble should be called in start, not in register

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register is considered as the "init" phase for the streamer. The moment is registered it starts its lifetime.

go func() {
defer close(errc)
for printEvent := range b.ChanEvents {
stats.EventCounter.Increment()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increment should be done after the event was handled (to avoid double counting)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how could double counting happen?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you incremented, then some error in the streamer occurred which cause the event to be "lost". so it's not in the output but still in the counter

errc := make(chan error, 1)
go func() {
defer close(errc)
for printEvent := range b.ChanEvents {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

printEvent sounds suggestive in this generic broker. maybe event or e?

for printEvent := range b.ChanEvents {
stats.EventCounter.Increment()
cb := func(key string, v interface{}) {
v.(streamers.Streamer).Stream(&printEvent)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds to me that "Stream" is a read only operation on the event, why do we pass a pointer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on purpose we pass a pointer and not a copy, we are reading only.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you meant to avoid a copy for micro-optimization, then I'm not sure you made it better by creating a closure here

@@ -0,0 +1,329 @@
package streamers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling it streamers is a big suggestive, they don't have to "stream" the event they handle, they just need to do whatever with it. what do you think about "handle" instead of "stream"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me.

@@ -0,0 +1,64 @@
package broker
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this broker design creates a (loose) coupling from tracee-ebpf to the consumers, a coupling which did not exist until now. The coupling is demonstrated at the function call from the broker (broker is a tracee-ebpf concept) to the registered consumer. a function call in this direction is not ideal because it's synchronously called within the critical path of event processing. If one time once streamer is stuck for one event, it could clog the entire pipeline. I would rather keep tracee-ebpf oblivious to it's consumers. To do this, the broker should be a simpler "fan out" machine, that duplicates the incoming event into consumer-specific channels. An example of this pattern can be found in tracee-rules' engine (dispatching code), here's a simpler example: https://stackoverflow.com/a/36418082/310298

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just go streamer.Stream() without having a blocking operation.
Copying here is a bad idea in my opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you suggest to change the broker code to invoke the callback in a goroutine? this will create a whole lot of different challenges, without seeing the code, I can expect ordering issues and resource contention for starts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Let's go with the "fan out" machine.

@mtcherni95 mtcherni95 force-pushed the consume-and-broker-api branch 3 times, most recently from c777e2a to 2579020 Compare June 28, 2021 12:00
entity can read from the channel without any dependencies with the
tracee printers.
@mtcherni95 mtcherni95 force-pushed the consume-and-broker-api branch from 2579020 to 5455088 Compare June 28, 2021 12:06
Copy link
Collaborator

@itaysk itaysk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done channel is ignored, which is not your problem, just making a note for future

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants