-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a GRPC listener service for Agent #18827
Conversation
Pinging @elastic/ingest-management (Team:Ingest Management) |
@graphaelli I believe you are using gRPC in apm-server, we are planning to update the vendor version on our side. Not sure if that has an impact on you or not? |
💔 Tests FailedExpand to view the summary
Build stats
Test stats 🧪
Test errorsExpand to view the tests failures
Steps errorsExpand to view the steps failures
Log outputExpand to view the last 100 lines of log output
|
This is actually dependent on #18829, because of the usage of |
Thanks for the heads up, broadening to @elastic/apm-server |
Thanks for the heads up @ph. I just tested updating apm-server's grpc to v1.29.1, and it appears to be fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lot of code but i like it. just a small questions along the way
statusMessage string | ||
statusConfigIdx uint64 | ||
statusTime time.Time | ||
checkinConn bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you think about different name please? i imagine a connection itself under this name, something like isCheckingConnected, hasCheckingConnection...
|
||
pendingActions chan *pendingAction | ||
sentActions map[string]*sentAction | ||
actionsConn bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as ^^
sentActions: make(map[string]*sentAction), | ||
actionsConn: true, | ||
} | ||
s.lock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be that in between get and set another set happens? in agent we do thing sync so it should be fine
func (s *Server) Checkin(server proto.ElasticAgent_CheckinServer) error { | ||
firstCheckinChan := make(chan *proto.StateObserved) | ||
go func() { | ||
// go func will not be leaked, because when the main function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 for comments about when go routine is done
if err != nil { | ||
// failed to send action; add back to channel to retry on re-connect from the client | ||
appState.actionsLock.Unlock() | ||
appState.pendingActions <- pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
food for thought: can out of order application of actions be an issue? [not a blocker]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why they would be out of order we append them to the pendingAction channel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have A1, A2, A3,
A1 fails so it is sent to pendingActions channel which is buffered up to 100 items. then you will proceed with A2 and A3 and then A1 is there again from channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michalpristas is correct on the ordering. But its not an issue because the actions on the client side are not blocking when it comes from reading from the stream.
So even if it goes A2, A3, A1 all 3 will be executed at the same time. Now on Agent side the PerformAction
is blocking, even though the communication is not.
So the order of actions is still serial on Agent side:
PerformAction("action1") // block waiting for response
PerformAction("action2"). // this wont even be added to the channel until action1 completes, fails, or timesout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, added a few comments but testing looks good.
@@ -434,6 +440,7 @@ def detect_license_summary(content): | |||
"MPL-2.0", | |||
"UPL-1.0", | |||
"ISC", | |||
"ELASTIC", | |||
] | |||
SKIP_NOTICE = [] | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if err != nil { | ||
// failed to send action; add back to channel to retry on re-connect from the client | ||
appState.actionsLock.Unlock() | ||
appState.pendingActions <- pending |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why they would be out of order we append them to the pendingAction channel?
ConfigStateIdx: as.statusConfigIdx, // stopping always inform that the config it has is correct | ||
Config: "", | ||
} | ||
} else if checkin.ConfigStateIdx != as.expectedConfigIdx { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this check also be covered in the lock above should we indeed defer the unlock of the struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added it into the lock as well, thanks for pointing that out.
s := prevStatus | ||
prevMessage := serverApp.statusMessage | ||
message := prevMessage | ||
if serverApp.status == proto.StateObserved_DEGRADED { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
love that a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will, we ever try to restart a process if the watchdog
doesn't have news of the client for an extended period of time, I am curious what would be the actions required to recover from that state? (can be a followup)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can add a hook here and decide later, with forking we have a watchdog for process to be killed so it could handle also this callback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is what the OnStatusChange
is given when the server is started. We will add the logic in that callback to handle when an application is marked FAILED.
Removing the requirement for go 1.14 requires elastic/elastic-agent-client#9 to land so I can vendor it into this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
a56aa73
to
320259c
Compare
* Work on the GRPC server for agent. * Lots of testing. * Fix data races. * Add support for elastic license in generate_notice.py. * Update to generate server name unique per application. * Fix go vet on stackdriver metricset using latest protobuf. * Fix data race issue. * Fix tests. (cherry picked from commit 6e91ce4)
…-stage-level * upstream/master: (30 commits) Add a GRPC listener service for Agent (elastic#18827) Disable host.* fields by default for iptables module (elastic#18756) [WIP] Clarify capabilities of the Filebeat auditd module (elastic#17068) fix: rename file and remove extra separator (elastic#18881) ci: enable JJBB (elastic#18812) Disable host.* fields by default for Checkpoint module (elastic#18754) Disable host.* fields by default for Cisco module (elastic#18753) Update latest.yml testing env to 7.7.0 (elastic#18535) Upgrade k8s.io/client-go and k8s keystore tests (elastic#18817) Add missing Jenkins stages for Auditbeat (elastic#18835) [Elastic Log Driver] Create a config shim between libbeat and the user (elastic#18605) Use indexers and matchers in config when defaults are enabled (elastic#18818) Fix panic on `metricbeat test modules` (elastic#18797) [CI] Fix permissions in MacOSX agents (elastic#18847) [Ingest Manager] When not port are specified and the https is used fallback to 443 (elastic#18844) [Ingest Manager] Fix install service script for windows (elastic#18814) [Metricbeat] Fix getting compute instance metadata with partial zone/region config (elastic#18757) Improve error messages in s3 input (elastic#18824) Add memory metrics into compute googlecloud (elastic#18802) include bucket name when logging error (elastic#18679) ...
* Work on the GRPC server for agent. * Lots of testing. * Fix data races. * Add support for elastic license in generate_notice.py. * Update to generate server name unique per application. * Fix go vet on stackdriver metricset using latest protobuf. * Fix data race issue. * Fix tests. (cherry picked from commit 6e91ce4)
What does this PR do?
Adds a GRPC server implementation to the Elastic Agent. This is just the implementation, the server is not actually used by the Elastic Agent (coming in later PR).
The GRPC server maintains the currently reported status of an application (connected or not connected). Pushes config updates to the application and informs the application when to stop. A watchdog is included in the server to ensure that the application checkin every 30 seconds if not then the first missed window of time the application will be marked degraded and then after another missed window (total of 60 seconds) the application will be marked failed (currently nothing is done at this point, follow up PR will add the kill/restart logic).
Actions are also handled by the GRPC server implementation, even across connections and disconnections, including timeout of operations. A action can timeout or be cancelled depending on the application state in the GRPC server.
Usage:
Why is it important?
This is need as the contract between Elastic Agent and the spawned applications has flipped where the applications now connecting back to the Agent. Support for stopping and performing actions on application was also required this PR adds those required building blocks.
Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
go test -race github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server
How to test this PR locally
Related issues