-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Publisher pipeline enhancements #4641
Conversation
- Clear the broker event buffer when events got ACKed by outputs. This helps the garbage collector to pick up already ACKed events
- Introduce BufferConfig, for the pipeline to ask some of the brokers settings on event buffering.
- have broker report a max number of events. Have an upper bound on events (published/dropped) in the pipeline. Note: - The counter is currently only used for pipeline clients potentially dropping events. Due to the new pipeline setup normalizing events in the processors, this condition is always true. - still, the broker blocks if max event count is reached -> even in worst case the total number of events in the publisher pipeline can only be 2*max - add support for 'global' ACK to pipeline - register pipeline global ACK handler, getting ACKs of published events for all registered clients - ACK handlers updated to treat client and global ACK handlers
- add ClientConfig.DropOnCancel: producer.Cancel disconnects the producer from the publisher pipeline. Only if DropOnCancel is true, an additional signal will be send to the publisher pipeline, to drop any active event. This allows the pipeline client to determine if dropping events on close is feasible or not - Producer.Publish returns a boolean to indicate publish potentially failed by the producer being closed - properly disconnect blocked producer on close of producer - add assert/panic if ACKed events succeeds total events in membroker - fix index update summing the end-pointer eventually leading to slice-out-of-bounds panic
- add `SetACKHandler(PipelineACKHandler) error` to pipeline interface => global listener for events being ACKed by publisher pipeline (in order) - add more settings to ClientConfig: - Meta: provide `@metadata` to be added to each single event published (e.g. pipeline) - Events: registers callbacks for pipeline client internal events like: - Closing, Closed, Published, FilteredOut, DroppedOnPublish - redo acker and creating acker: - ackers operate mostly on pipeline now, so PipelineACKHandler can be served - client ACKer hooks into pipeline ACKing when building the ACKer. When client is closed, the client ACK will see no more events from the still active pipeline ACKer - add asserts/panics to pipeline ACKers in case of too many events (acked > published) being ACKed - make pipeline.Client thread-safe (big mutex on Publish), so multiple go-routines can share a beat client for publishing events (requirement for filebeat) - filter out empty events (event.Fields == nil), but ensure events are - update pipeline processors order to: 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4) 2. (P) generalize/normalize event 3. (P) add beats metadata (name, hostname, version) 4. (C) add Meta from client Config to event.Meta 5. (P) add pipeline fields + tags 6. (C) add client fields + tags 7. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4) 8. (C) client processors list 9. (P) pipeline processors list 10. (P) (if publish/debug enabled) log event 11. (P) (if output disabled) dropEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. TBH pretty hard to build up a mental model of the change. There are lots of small nitty gritty changes which look reasonable but couldn't fully proof it. I will move forward and trust you on this one ;-)
broker := membroker.NewBroker(20, false) | ||
settings := pipeline.Settings{ | ||
brokerFactory := func(e broker.Eventer) (broker.Broker, error) { | ||
return membroker.NewBroker(e, 20, false), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it was already in previously but curious how you came up with 20?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to have a small queue. There's no particular reason for the value 20. As the broker works asynchronously and has to serve consumers as well, I wanted to have a small buffered channel for events, so producers are not immediately blocked when broker has to do some more book-keeping.
@@ -57,6 +65,29 @@ type ClientConfig struct { | |||
ACKLastEvent func(Event) | |||
} | |||
|
|||
// ClientEventer provides access to internal client events. | |||
type ClientEventer interface { | |||
Closing() // Closing indicates the client is being shutdown next |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of expected these methods to return bool. But will probably see later how they are used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why a bool? These are notifications a beat can register too (e.g. adapt wgEvents in filebeat). The listener can not influence the publisher pipeline. It's more a Listener/Observer pattern, no Delegate or such.
Changes/Improvements to the publisher pipeline (required for filebeat integration).
the garbage collector to pick up already ACKed events
on event buffering.
(published/dropped) in the pipeline.
Note:
dropping events. Due to the new pipeline setup normalizing events in the
processors, this condition is always true.
-> even in worst case the total number of events in the publisher pipeline
should only be 2*max
producer.Cancel disconnects the producer from the publisher pipeline.
Only if DropOnCancel is true, an additional signal will be send to the
publisher pipeline, to drop any active event.
This allows the pipeline client to determine if dropping events on close is
feasible or not
failed by the producer being closed
slice-out-of-bounds panic
SetACKHandler(PipelineACKHandler) error
to pipeline interface=> global listener for events being ACKed by publisher pipeline (in order)
@metadata
to be added to each single event published(e.g. pipeline)
When client is closed, the client ACK will see no more events from the
still active pipeline ACKer
published) being ACKed
go-routines can share a beat client for publishing events (requirement for
filebeat)