-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check if cache of add_kubernetes_metadata processors is already closed before closing #32150
Check if cache of add_kubernetes_metadata processors is already closed before closing #32150
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a test to make sure there is no regression in the future and I think the bug fix can be achieved easier.
@@ -94,5 +94,18 @@ func (c *cache) cleanup() { | |||
} | |||
|
|||
func (c *cache) stop() { | |||
close(c.done) | |||
if !isClosed(c.done) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would not it be enough to write:
_, open := <-c.done
if open {
close(c.done)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After taking a closer look I think either fix would not work, because the c.done
channel will remain blocked forever (since nothing is writing into it).
In the main
branch I can't see anything sending to c.done
and it only gets closed here which should not be a problem if the channel is properly initialised via newCache
beats/libbeat/processors/add_kubernetes_metadata/cache.go
Lines 38 to 47 in e85d8b9
func newCache(cleanupTimeout time.Duration) *cache { | |
c := &cache{ | |
timeout: cleanupTimeout, | |
deleted: make(map[string]time.Time), | |
metadata: make(map[string]mapstr.M), | |
done: make(chan struct{}), | |
} | |
go c.cleanup() | |
return c | |
} |
Is it possible that something is using type cache
without its initialiser?
My bet is on this function called twice for some reason: beats/libbeat/processors/add_kubernetes_metadata/kubernetes.go Lines 306 to 314 in e85d8b9
First it closes the cache properly and the second time it's trying to close a closed channel. Simply modifying it to this should do the trick: func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
}
if k.cache != nil {
k.cache.stop()
k.cache = nil
}
return nil
} |
@rdner thanks a lot for taking such a close look. I was also thinking the same. That for some reason the function is called twice by different thread probably. |
That is why I am using a select with a default clause which ensures the select never blocks, so either the channel is closed or not |
You're right but it only works because nothing is writing to func isClosed(ch <-chan struct{}) bool {
select {
case <-ch:
// reading from a closed channel returns
// a default value of the channel type – this is expected
// BUT if the channel was not closed and something wrote into it
// we would end up here too.
return true
default:
}
return false
} I still think we should just prevent |
…b.com:MichaelKatsoulis/beats into add_kubernetes_metadata_processor_cache_close
// isClosed checks if a given chan is already closed | ||
func isClosed(ch <-chan struct{}) bool { | ||
select { | ||
case <-ch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MichaelKatsoulis and @rdner I agree that we should make sure to never try to close a channel twice.
I am not a huge fan of this isClosed
function, even if it seems to work, it is not trivial to understand.
Just to clarify, since done
channel is private, nothing writes into it except for stop(c.done)
, and only used for stopping the cleanup
function... I would suggest to
func (c *cache) cleanup() {
if timeout <= 0 {
return
}
ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
select {
case <-c.done:
c.done = nil
return
case now := <-ticker.C:
c.Lock()
for k, t := range c.deleted {
if now.After(t) {
delete(c.deleted, k)
delete(c.metadata, k)
}
}
c.Unlock()
}
}
}
func (c *cache) stop() {
if c.done != nil {
close(c.done)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dangerous because if you run cleanup
twice it will block forever on the c.done=nil
(nil channel). Since we already have a problem of running stop
twice I think it's quite possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, but cleanup() is only called once from the newCache constructor while k.cache.stop() is called from a public method Stop() that can be called multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, k.cache = nil
is a better solution that isClosed. So I approved the PR
@MichaelKatsoulis let's make sure it's tested before merging and the panic is not reproducible anymore. |
Yes of course |
/test |
Neither of the solutions work unfortunately. The problem is deeper and is connected to how filestream input handles the processors. I have noticed that in case filebeat is configured like this (the processor is part of the input):
then each time a filestream reader closes ( In case of kubernetes jobs that stop after their work is finished, the file remains idle and I guess at some point the reader closes) then it stops the connected processors which means:
Next time the same happens (filestream reader stops) the same process takes place. But the watcher is already stoped (nothing happens) and the cache done channel is closed (That is the error). If the first time that the cache is closed we set its value to nil then this will lead to errors because there are still other clients (from different filestream readers) that try to access the cache. If we check first wether the cache channel is closed or not before closing then we avoid that error. But it is still doesn't make sense as the watcher is stopped and the cache doesn't not get updated. So all the next clients of the cache will get invalid data. This problem occurs because multiple clients are using the same processor. And each one tries to close it whenever they finish. That scenario does not occur when in filebeat configuration we set the processor part outside the input like this
In that case the processors only stop once. None of this happen with cc @elastic/obs-cloudnative-monitoring |
I guess the way forward here is summarised at 2 questions/points here:
|
The clients are part of a pipeline where clients publish event into. It looks like one client is created per file(not 100% sure). @rdner as a member of data plane team are you more familiar with these bits of code?
Yes this will be a breaking change. But the update needed is only the filebeat.yml |
What does this PR do?
This PR checks if the
add_kubernetes_metadata
processors cache is already closed before closing.Why is it important?
In case the cache channel is already closed then the code panics leading to filebeat restarting
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.