-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix BeatV2Manager to configure inputs and set log level #34066
Conversation
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
I am keeping this in draft at the moment unit I get the unit tests done, but I have tested this manually with the Elastic Agent and it is working correctly. Still not comfortable to land this until I have unit tests to cover it. Wanted to get it up early so others can review and test it as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments/questions, mostly about overall logic.
units map[string]*client.Unit | ||
mainUnit string | ||
// track individual units given to us by the V2 API | ||
mx sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the above comment makes it look like this mutex is just used for controlling access to the hashmap, but it's used in other places. Maybe rename it or move it to the top of the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I can move it. I originally have multiple mutex for different parts but then I would still need to grab the unitsMx so I just changed it to a single mutex because it was safer than causing some deadlock in lock ordering.
// `reload` method and will be marked stopped in that code path) | ||
continue | ||
} | ||
err := unit.UpdateState(status, message, payload) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we're manually looping over the unit UpdateState
here? Is there a reason why we just can't call the global UpdateStatus
or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global UpdateStatus
will grab the mutex for units, to prevent that we call this logic on the units that reload just performed. We also would not want to update the status of units that might have been added to the cm.units
that is different than the actual units that are being processed in the reload (because they where passed in).
cm.deleteUnit(change.Unit) | ||
} | ||
case <-cm.reloadCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why there's so much logic involved in the reloading process? triggerReload()
and reload()
are both only called in this for block, it might just be simpler to call reload()
in a goroutine or something? Also, why is reload()
getting its own map of the units and not just using cm.units
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because performing reload
might result in the config logic of the beat to call UpdateStatus
which will then grab the mutex for the units.
So to ensure that a dead lock doesn't occur we ensure that the actual reload logic is performed in the main loop of the manager, but not in a path that would hold that mutex. So that is why a copy is sent to the reload function. Each client.Unit
has an internal lock for state so it is also save to have a pointer to the same unit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you mention a deadlock starting with UpdateStatus
I assume you mean the Reload()
call blocking while the beat waits for UpdateStatus
?
I wonder if we can do something like have a addUnit()
return a new copy of the unit map that's sent to reload()
so we can avoid the extra reloadCh
? If not, can you at least add a comment explaining the loop, was a tad confusing at first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining the potential for deadlock here so it's obvious the next time we read this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a comment as well as add some debounce to consolidate unit changes into a single reload. It makes it simpler to understand as well.
// now update the statuses of all units | ||
cm.mx.Lock() | ||
status := getUnitState(cm.status) | ||
message := cm.message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reasoning behind using the global message? Seems like we should have a status message specifically mentioning the reload operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically setting the units to Healthy, or what ever beats has set with the UpdateStatus
for the current state of the beat. The state is global because beats doesn't have a way of passing each unit to each running input and have it handle its own state.
cm.deleteUnit(change.Unit) | ||
} | ||
case <-cm.reloadCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining the potential for deadlock here so it's obvious the next time we read this code?
Okay I think this is ready to be merged. I have added unit tests, answered all questions, updated comments, an added debounce to unit changes. I also merged #34049 into this PR so its already present and ready to go once this lands. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few new questions, but generally this looks good to me.
We will need a matching change to the Beat spec files to:
- Enable the output_restart/restart_on_output_change option for each Beat.
- Restore the default log level to INFO.
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
@axw and @oren-zohar you will want to make sure you pick up this change to libbeat in APM server and Cloudbeat when it merges. The major highlights of what this addresses are:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for the extra testing!
/test |
SonarCloud Quality Gate failed. 0 Bugs No Coverage information |
* Refactor the V2 manager. * Add debounce to unit changes. * add stop functionality for output config changes * Add tests. * Fix typo. * Fix code review, add more to the test. * Re-order the processor injection so proper order is maintained. * Fix unit tests. * Copy global processors per stream to ensure that multiple streams don't get the same slice. Co-authored-by: Alex Kristiansen <[email protected]> (cherry picked from commit 15d9a87)
) * Refactor the V2 manager. * Add debounce to unit changes. * add stop functionality for output config changes * Add tests. * Fix typo. * Fix code review, add more to the test. * Re-order the processor injection so proper order is maintained. * Fix unit tests. * Copy global processors per stream to ensure that multiple streams don't get the same slice. Co-authored-by: Alex Kristiansen <[email protected]> (cherry picked from commit 15d9a87) Co-authored-by: Blake Rouse <[email protected]>
@axw and @oren-zohar reminder ping now that this has merged that you will want to pick up this change in libbeat on both main and 8.6. You will also want to ensure that the elastic-agent-client version is bumped to https://github.com/elastic/elastic-agent-client/releases/tag/v7.0.3 in your go.mod files if the libbeat update does not do this automatically. |
Thanks @cmacknz, apm-server main and 8.6 are both updated, and I've confirmed that elastic-agent-client was bumped to 7.0.3. |
Thanks @cmacknz @blakerouse, cloudbeat was updated and I can confirm this fix works as expected 🚀 |
* Refactor the V2 manager. * Add debounce to unit changes. * add stop functionality for output config changes * Add tests. * Fix typo. * Fix code review, add more to the test. * Re-order the processor injection so proper order is maintained. * Fix unit tests. * Copy global processors per stream to ensure that multiple streams don't get the same slice. Co-authored-by: Alex Kristiansen <[email protected]>
What does this PR do?
This refactors the
BeatV2Manager
so it works correctly with the Elastic Agent V2 model of components/units. The log level is now computed from the defined units and the log level is now updated with thelogp.SetLevel
.Why is it important?
Previously the
BeatV2Manager
would act incorrectly when a new unit was added to the beat from the Elastic Agent replacing the already existing unit with the new unit configuration instead of merging the configuration so the beat would operating with 2 inputs. Previous the log level was not set-able by the V2 control protocol, this now works.Checklist
[ ] I have made corresponding changes to the documentation[ ] I have made corresponding change to the default configuration files[ ] I have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues