-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix issues with output reloading #17381
Conversation
@@ -407,6 +407,9 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) | |||
req.Host = host | |||
} | |||
|
|||
// TODO: workaround for output reloading leaking FDs until context.WithCancel is used on transport dialer instead | |||
req.Header.Set("Connection", "close") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I made this workaround/hack, I tried two different approaches to fix the FD leak the "right way":
-
Just as there is an
http.NewRequest
function (that we currently use in theeslegclient
code), there is also ahttp.NewRequestWithContext
function. I replaced our usage with the latter one, created acontext.WithCancel
insideeslegclient.NewConnection
and used the cancel function returned to cancel the context from theeslegclient.(*Connection).Close()
method. This is essentially the same fix that @urso tried in Try to close HTTP connections and wait for completion on output reload/close #10599. However, when I usedlsof
to monitor FDs being used by my Filebeat process, I noticed that this change had no effect! -
So then, thinking about it some more, I realized the problem wasn't so much on canceling the HTTP requests but it was more to do with closing the underlying TCP connections that were being established each time the output was reloaded. To fix this the "right way", I tried threading
context.Context
through the variousDialer
s (which would allow us to callDialContext
instead ofDial
in the right places). This turned out to be quite a complicated and large refactoring! So instead, I settled for this workaround here. I think we do need to undertake this refactoring at some point but I'm not sure it's worth blocking progress at the moment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would force the TCP connection to be dropped after each bulk request. Internally the go HTTP package uses a connection pool (actually the Transport
type does) and keeps the HTTP session alive in order to send more requests. Would be nice if we could keep this.
This makes me wonder if this is really a leak, or just an open connection that eventually gets closed in a few minutes by the go runtime. The Transport
type has a few settings for controlling the connection pool. For example IdleConnTimeout
(which we must configure). On shutdown one can also call CloseIdleConnections
.
I think we should always configure a timeout (and expose it as a setting) + call CloseIdleConnections
(best effort) approach to close connections early.
We also have some helpers for working with contexts in this package: https://pkg.go.dev/github.com/elastic/go-concert/ctxtool. This allows us to hook up contexts with custom functions, plain go channels, or merge cancelation from multiple contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @urso.
This makes me wonder if this is really a leak, or just an open connection that eventually gets closed in a few minutes by the go runtime.
Yes, good point. I will run my tests for longer and see if the connection FDs eventually get released.
I think we should always configure a timeout (and expose it as a setting) + call CloseIdleConnections (best effort) approach to close connections early.
++ to this approach, will implement.
We also have some helpers for working with contexts in this package: https://pkg.go.dev/github.com/elastic/go-concert/ctxtool. This allows us to hook up contexts with custom functions, plain go channels, or merge cancelation from multiple contexts.
I will look into this as well, but probably part of a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ Will all the above that @urso mentioned :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented a new setting in eslegclient.ConnectionSettings
named IdleConnTimeout
. The name matches the setting onhttp.Transport
. Default value is 1 minute.
Also implemented calling CloseIdleConnections()
from eslegclient.(*Connection).Close()
.
Net result is that we don't see FDs for TCP connections to the ES node sticking around in the test.
Pinging @elastic/integrations-services (Team:Services) |
cef687e
to
4852b7a
Compare
Would be nice if we could introduce an unit test trying to replace an output and check all batches are eventually handled. |
I'm actually going to implement unit tests on the original code first to ensure they keep passing with the current PR. I've done this in #17460. Please take a look when you get a chance. Once #1746 is reviewed and merged, I'll rebase this PR on it. |
63e764d
to
b594a7b
Compare
Unit tests (per the previous two comments) have now been added in #17460. I will now resume working on this PR, making sure the same tests continue to pass with the changes in this PR and also address review feedback. |
b594a7b
to
f36d58d
Compare
I've addressed @urso's review feedback and also added another unit test in this PR specifically to test output reloading. That test is currently failing sometimes so I need to investigate why and fix. |
While investigating this I found that this new unit test isn't passing on |
@ycombinator Any idea how to fix it? |
@ph Not 100% but I made some progress in #17657. After going over it with @urso on Tuesday (I was off yesterday), my next steps here are to bring the unit test written in #17567 into this PR and then try to make it pass using the code fixes in this PR, maybe some that were experimented with in #17567, and maybe some others still that we haven't discovered yet. The general problem seems to be that there's a race condition somewhere in the publisher code that's being exposed when the output controller's |
3fba2db
to
ad33238
Compare
@@ -154,7 +154,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) { | |||
} | |||
|
|||
paused = c.paused() | |||
if !paused && c.out != nil && batch != nil { | |||
if c.out != nil && batch != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This turned out to be a key change in getting the test to pass (i.e. ensuring that all batches are eventually published despite rapid reloading of outputs multiple times).
Consider the case where the consumer is paused (so paused == true
here) but it still has a batch of events that it has consumed from the queue but not yet sent to the workqueue. In this scenario because of the !paused
clause here, we would fall into the else
clause, which would set out = nil
. This would then cause the last select
in this method to block (unless, by luck of timing, a signal to unpause the consumer just happened to come along).
Now, if we have a valid output group c.out
and a batch
, we don't pay attention to the paused
state; this ensures that this batch will be send to that output group's work queue via that final select
's final case
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow nice fix!
@@ -34,7 +34,8 @@ type outputController struct { | |||
monitors Monitors | |||
observer outputObserver | |||
|
|||
queue queue.Queue | |||
queue queue.Queue | |||
workQueue workQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another change I made, to simplify things. Now the output controller creates a work queue in it's constructor and the same one is used by all output workers across time. This removes the need to drain the old work queue to a new one when outputs are reloaded and new output workers are created in the process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good change!
7c8de5c
to
041922b
Compare
beat: beat, | ||
monitors: monitors, | ||
observer: observer, | ||
queue: b, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's funny. Why is the parameter named b
? As we've modified this piece of code, leats clean up the naming a little :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 5ff7219.
@@ -136,12 +137,13 @@ func (c *outputController) Set(outGrp outputs.Group) { | |||
|
|||
// restart consumer (potentially blocked by retryer) | |||
c.consumer.sigContinue() | |||
c.consumer.sigUnWait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigUnWait
is used by the retryer, no? Why add it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that this was needed to make the TestReload
test pass. Without it the retryer never un-waited the consumer by itself and so events stopped being consumed and sent to the output.
That said, I agree that this is something the retryer should automatically call internally rather than "leak" this responsibility to the controller. I'll look into why it's not doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this sounds like a potential bug to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 6a6680c.
|
||
var published atomic.Uint | ||
var batchCount int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used batchCount
for debugging. We can remove it I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 15a1383.
|
||
for name, ctor := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
seedPRNG(t) | ||
|
||
err := quick.Check(func(i uint) bool { | ||
numBatches := 1000 + (i % 100) // between 1000 and 1099 | ||
numBatches := 10000 + (i % 100) // between 1000 and 1099 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, I guess that was me. Want to to reduce it to 1000
again to reduce the running time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced in 6e10331.
|
||
var publishedFirst atomic.Uint | ||
blockCtrl := make(chan struct{}) | ||
var batchCount int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be a debugging variable that can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 15a1383.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ycombinator for adding theses tests in. This was a good one to track down!
@@ -34,7 +34,8 @@ type outputController struct { | |||
monitors Monitors | |||
observer outputObserver | |||
|
|||
queue queue.Queue | |||
queue queue.Queue | |||
workQueue workQueue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good change!
I've addressed the minor "cleanup" points from the last round of review. The major point that remains to be addressed is #17381 (comment). I will do that tomorrow. |
@urso I've addressed all your latest feedback. Please re-review when you get a chance. Thanks! |
CI failures are unrelated. Libbeat builds in both Jenkins and Travis CI are green. Merging. |
* Refactoring: extracting common fields into worker struct * More refactoring * Address goroutine leak in publisher * Workaround: add Connection: close header to prevent FD leaks * Adding CHANGELOG entry * Adding IdleConnTimeout setting * Close idle connections when ES client is closed * When closing worker, make sure to cancel in-flight batches * Cancel batch + guard * [WIP] Adding output reload test * More WIP * Update test * Try to get test passing for client first * Make workqueue shared * Making tests pass * Clean up * Moving SeedFlag var to correct place * Clarifying comments * Reducing the number of quick iterations * Reducing quick iterations even more * Trying just 1 iteration * Setting out to nil after sending batch if paused * Restoring old order of operations in Set() * proposal * Do not copy mutex * Remove debugging statements * Bumping up testing/quick max count on TestOutputReload * Removing commented out helper function * Simplifying retryer now that workqueue is used across output loads * Renaming parameter * Removing debugging variable * Reducing lower bound of random # of batches * Removing sigUnWait from controller * Removing unused field Co-authored-by: urso <[email protected]> Co-authored-by: urso <[email protected]>
What does this PR do?
Beats Central Management (today) and Agent (in the future) will need to reload outputs on the fly, whenever users change the Beat's configuration while the Beat is running. The current output reloading implementation has three bugs:
When outputs are reloaded several times, and very quickly at that, all events don't always getting eventually published,
Whenever an output is reloaded, a goroutine is leaked from the publisher pipeline's output handling code, and
If the output being reloaded is the Elasticsearch output, old TCP connections to Elasticsearch are hanging around.
This PR addresses all three issues.
Why is it important?
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
filebeat.yml
file and create a new one suitable for Beats Central Management./tmp/logs/*.log
and the Elasticsearch output pointing to localhost:9200 (or wherever your local ES is listening). Make sure to assign this tag to your enrolled Filebeat instance in the UI.filebeat.yml
file and setmanagement.period: 2s
to speed up output reloading.-httpprof localhost:5050
flag. This will let us see if Filebeat is leaking goroutines.New configurations retrieved
to confirm that your changes in Kibana are being picked up by Filebeat.lsof
window and make sure the list of file descriptors isn't growing over time.Related issues