-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add observer support to receiver_creator #173
Conversation
* Add observer notification interface (k8s observer will be in separate PR) * Refactor receiver_creator to be more easily testable and organized * receiver.go mostly implements OT interface and delegates to the new files * observerhandler.go responds to observer events and manages the starting/stopping of receivers * rules.go implements rules evaluation (not currently implemented) * runner.go contains a runner interface that handles the details of how to start and stop a receiver instance that the observer handler wants to start/stop * Implement basic add/remove/change response in receiver_creator to observer events
extension/observer/observer.go
Outdated
type Endpoint interface { | ||
// ID uniquely identifies this endpoint. | ||
ID() string | ||
// Target is an address or hostname of the endpoint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this specifically an IP address or can be some other form of an address?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it's an IP address for pods but there's no reason it couldn't be a DNS name which is why I named it generically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so perhaps the comment can say "IP address or hostname". It was not clear to me that this could not be some other form of an address (other than IP address or host name).
extension/observer/observer.go
Outdated
// PortEndpoint is an endpoint that has a target as well as a port. | ||
type PortEndpoint struct { | ||
endpointBase | ||
Port int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: IP port numbers are unsigned 16-bit integers. Do we need the larger int32 range?
// receiverTemplates maps receiver template full name to a receiverTemplate value. | ||
receiverTemplates map[string]receiverTemplate | ||
// receiversByEndpointID is a map of endpoint IDs to a receiver instance. | ||
receiversByEndpointID multimap.MultiMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a multimap? Can we have more than one receiver per endpoint ID? Is it because we can have different receiver types associated with the same ID? Should this be a map of maps: map[endpointID]map[receiverFullName]receiver
in that case or we don't care about having the nested map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a multimap? Can we have more than one receiver per endpoint ID?
Right, multiple receivers can be started for a single endpoint. The multimap is basically just a wrapper around a map of maps. I could do my own light wrapper instead but it's a lightweight library with no other dependencies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, up to you. I thought it might be nicer to have stronger compoile-time typing from map of maps. Your choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the lack of type checking is unfortunate. Maybe one day Go will bless us with generics. :) I'll try doing custom wrapper, doesn't need all possible multimap functions so should be limited.
continue | ||
} | ||
rcvr, err := re.runner.start(template.receiverConfig, userConfigMap{ | ||
"endpoint": e.Target(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define "endpoint" as a const and give meaningful name? Otherwise it looks like a magic value. Are receiver configs expected to support a configuration option with this precise name? Is this enforced or checked at compile or runtime? Is this expected to match ReceiverSettings.Endpoint
field's tag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this expected to match ReceiverSettings.Endpoint field's tag?
Yeah this is required to match ReceiverSettings.Endpoint field. Is a constant in receivercreator ok or does it need to be a constant in ot-collector core?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const here is OK, just add a comment to explain that it has to match the ReceiverSettings.Endpoint field. We are not going to change that field name (hopefully ever) since it is a config file setting.
receiver/receivercreator/receiver.go
Outdated
@@ -38,11 +37,12 @@ type receiverCreator struct { | |||
nextConsumer consumer.MetricsConsumerOld | |||
logger *zap.Logger | |||
cfg *Config | |||
receivers []component.Receiver | |||
responder *observerHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the name responder
reflect the purpose? It seems to be a bit of a mismatch with its type name observerHandler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep I renamed observerHandler at one point. Will fix.
receiver/receivercreator/runner.go
Outdated
func (run *receiverRunner) loadRuntimeReceiverConfig( | ||
factory component.ReceiverFactoryOld, | ||
receiver receiverConfig, | ||
discoveredConfig userConfigMap) (configmodels.Receiver, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually use this style in the codebase:
discoveredConfig userConfigMap) (configmodels.Receiver, error) { | |
discoveredConfig userConfigMap, | |
) (configmodels.Receiver, error) { |
return nil, err | ||
} | ||
|
||
if err := recvr.Start(run.ctx, run.host); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start()
may call Host.ReportFatalError()
. ReportFatalError()
exits the process. This was considered acceptable since it would normally happen at startup when receivers are created.
Now this allows receivers to be created at any time long after the startup. It means if something goes wrong the receiver's Start
function can exit the process during normal operation. That is not considered acceptable: https://github.com/open-telemetry/opentelemetry-collector/blob/master/CONTRIBUTING.md#do-not-crash-after-startup
We will need to wrap the host here and provide a different ReportFatalError
implementation which does not exit the process but only logs the problem and perhaps emits a metric/trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of ReportFatalError if Start() can return an error? Why not let the caller (the core in the usual case, receivercreator) in this case decide how to handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start() is expected to return quickly. If the component needs more time to initialize it is expected to return and continue initialization asynchronously. ReportFatalError allows the component to report initialization failure in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to just log for now. However that's probably not sufficient if receivers are assuming when they call that they are going to be restarted (by virtue of the process dying and something like k8s restarting the exited container). In the receiver creator case should we shut down the receiver that reported a fatal error? How often, how long, etc. It starts opening up a can of worms. Actually this applies to even the case where Start() returns an error. However for the receivers we write Start() will rarely fail as what it usually does is just kick off some kind of periodic asynchronous process[1].
I think we may need to introduce some kind of lifecycle for receivers if we need to handle these kinds of scenarios. It's something I considered adding to the smart agent but hadn't gotten around to it. I think we need a way to have receivers fail to start and be able to restart them with exponential retry, etc. (Although I understand for some receivers this is unlikely to help as detailed in https://github.com/open-telemetry/opentelemetry-collector/blob/master/CONTRIBUTING.md#bad-input-handling). But for many receivers it's possible they encounter some error that can be resolved by resetting to a good known state. In addition having these kinds of explicit status of receivers and whether they're running, crashing, healthy, etc. would enhance the telemetry as you could tell whether a receiver was in a good running state or not.
[1] This periodic process somewhat functions as a poor version of the lifecycle management. If an error is encountered it merely retries the same collection loop after N seconds. However it typically doesn't do anything fancy like reset to known good state. We will get some insight into this async collection loop through telemetry but I think more could be done by standardizing this collection loop instead of each receiver/monitor doing it on its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a github issue so that we can think this through in the future? I agree this needs more work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #178
@jrcamp ping me when this is ready for another round of review. |
@tigrannajaryan updated, thanks for the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one minor comment regarding naming.
receiver/receivercreator/receiver.go
Outdated
// TODO: Need to make sure this is unique (just endpoint is probably not totally sufficient). | ||
receiverConfig.SetName(fmt.Sprintf("%s/%s{endpoint=%q}", rc.cfg.Name(), staticSubConfig.fullName, mergedConfig.GetString("endpoint"))) | ||
return receiverConfig, nil | ||
// safeHost provides a safer version of host for receivers started at runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's calling this something like loggingHost
. "safety" can mean different things depending on the requirement. In this case the requirement is that fatal errors are logged instead of crashing the process. But crashing the process is the "safe" option in some other case (during startup).
return nil, err | ||
} | ||
|
||
if err := recvr.Start(run.ctx, run.host); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open a github issue so that we can think this through in the future? I agree this needs more work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Add observer notification interface (k8s observer will be in separate PR) * Refactor receiver_creator to be more easily testable and organized * receiver.go mostly implements OT interface and delegates to the new files * observerhandler.go responds to observer events and manages the starting/stopping of receivers * rules.go implements rules evaluation (not currently implemented) * runner.go contains a runner interface that handles the details of how to start and stop a receiver instance that the observer handler wants to start/stop * Implement basic add/remove/change response in receiver_creator to observer events
Missed the linux path on the copy step
that the observer handler wants to start/stop