-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
libbeat/beat: add ClientConfig.WaitCloseChan #19514
Conversation
@urso I'd appreciate a preliminary review of this before I go and add tests. I've marked it draft because I haven't done that yet. |
❕ Build Aborted
Expand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
I'm going to rework this. Although I think passing context to Close would be the more idiomatic approach, for now we can do this less invasively by specifying a channel sibling to EDIT: done. |
Set the "closing" variable to true, not false, upon signalling the waiter to close.
Introduce WaitCloseChan as an alternative to the WaitClose timeout for more flexible control over blocking during Client.Close.
0ae06e7
to
a22ae69
Compare
Pinging @elastic/integrations (Team:Integrations) |
Pinging @elastic/integrations-services (Team:Services) |
Have you considered to not rely on the waiting functionality from the pipeline? If possible I'd like to 'shrink' the pipeline API instead of adding more features. For example filebeat also has a shutdown timeout, but it is implemented using an ACKer that is attached to each beat.Client. The ACKer gets informed when an event is published, dropped by the processors, ACKed, or the Client is shut down. Using ACKers one can add behavior globally or locally, e.g. by incrementing and decrementing a counter. Filebeat is somewhat special, because it creates events to be dropped on purpose, but still requires them to be ACKed in order, so special state updates can be forward to it's registry. The implementation could be simpler (as we're planning to fix this behavior in filebeat). For reference see the current filebeat event counter: https://github.com/elastic/beats/blob/master/filebeat/beater/filebeat.go#L235 Libbeat provides some common ACKer (and helpers) in libbeat/common/acker. An ACKer that blocks on shutdown until all non-filtered events are published could be implemented like this (very naive implementation):
Being in control of the counting and 'Wait' we can be quite flexible in the implementation without touching the pipeline API (which is already quite big). The acker can be hooked up like this (Filebeat postpones the
If you look into Filebeat you will notice that it also installs the counter into the The |
@urso sounds reasonable. I'll take a look at that approach soonish. In the mean time, I'll create a separate PR for the bug fix. |
I've implemented this in elastic/apm-server#3971; that won't be going in until 7.10.0. Once I'm satisfied the implementation works well for apm-server I'll create a PR to propose adding it to libbeat. Closing this one - thanks for your feedback @urso! |
Cool, looking forward to it. |
What does this PR do?
This PR adds a
ClientConfig.WaitCloseChan
channel which, if signalled/closed, will interruptClient.Close
. Setting eitherClientConfig.WaitClose
(duration) orClient.WaitCloseChan
(channel) will cause the client to wait for enqueued events to be acknowledged, or for one of the wait conditions to be satisfied.Why is it important?
Users of
pipeline.Client
may wish to block for a limited amount of time while closing the client. Currently there is a single means of achieving this: settingWaitClose
when creating the client. This means the wait time cannot be dynamically adjusted.In APM Server we have a "shutdown timeout" configuration, which is intended to cover all operations during shutdown: graceful HTTP server shutdown, flushing internal event queues, and finally closing the pipeline.Client. The operations leading up to closing the client may take a non-negligible amount of time, so conceptually we need to adjust the
WaitClose
time while closing, by subtracting the amount of time taken so far from the configured shutdown timeout.We can achieve this by creating a cancellable context and passing its Done channel in as WaitCloseChan; then cancelling it with a timer started at the beginning of the APM Server's shutdown sequence.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues
elastic/apm-server#3789