From 40b95570a7f2282561028dae9cc8d5750fde135e Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 15 Nov 2022 16:02:53 -0500 Subject: [PATCH] Introduce component status reporting This is an alternate to https://github.com/open-telemetry/opentelemetry-collector/pull/6550 - Add component status concept. Components can report their status via Host.ReportComponentStatus(). Interested extensions can watch status events if they implement StatusWatcher interface. This is similar to how PipelineWatcher works today. - Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus(). - healthcheck extension must implement StatusWatcher. - Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib. - StatusWatchers need to be able to tell if all current components are healthy. It is assumed that the listeners need to maintain a map of components and track the status of each component. This works only if we assume that the set of components cannot change during the lifetime of the listener. This assumption is true today but can change in the future if we introduce partial pipeline restarts where only modified/added/removed components are recreated (this will break listener's assumption and the map will become invalid). Should we instead keep track of this entire status map in the Host and broadcast the entire status to the listeners as a whole instead of (or in addition to) individual component events? --- component/componenttest/nop_host.go | 2 + .../componenttest/statuswatcher_extension.go | 61 +++++++++++++ .../componenttest/unhealthy_processor.go | 81 +++++++++++++++++ component/host.go | 9 ++ component/status.go | 90 +++++++++++++++++++ component/status_test.go | 11 +++ otelcol/testdata/otelcol-statuswatcher.yaml | 31 +++++++ service/extensions/extensions.go | 28 +++++- service/host.go | 8 +- service/internal/components/host_wrapper.go | 14 ++- .../internal/components/host_wrapper_test.go | 11 ++- service/internal/servicehost/host.go | 38 ++++++++ service/internal/servicehost/nop_host.go | 45 ++++++++++ 13 files changed, 421 insertions(+), 8 deletions(-) create mode 100644 component/componenttest/statuswatcher_extension.go create mode 100644 component/componenttest/unhealthy_processor.go create mode 100644 component/status.go create mode 100644 component/status_test.go create mode 100644 otelcol/testdata/otelcol-statuswatcher.yaml create mode 100644 service/internal/servicehost/host.go create mode 100644 service/internal/servicehost/nop_host.go diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index 4accfab0d8c..04c67080df9 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -17,6 +17,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/componenttest/statuswatcher_extension.go b/component/componenttest/statuswatcher_extension.go new file mode 100644 index 00000000000..ffc01a76296 --- /dev/null +++ b/component/componenttest/statuswatcher_extension.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" +) + +// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. +func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { + return component.ExtensionCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type statusWatcherExtensionConfig struct { + config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. +func NewStatusWatcherExtensionFactory( + onStatusChanged func(source component.StatusSource, event *component.StatusEvent), +) component.ExtensionFactory { + return component.NewExtensionFactory( + "statuswatcher", + func() component.ExtensionConfig { + return &statusWatcherExtensionConfig{ + ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), + } + }, + func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil + }, + component.StabilityLevelStable) +} + +// statusWatcherExtension stores consumed traces and metrics for testing purposes. +type statusWatcherExtension struct { + nopComponent + onStatusChanged func(source component.StatusSource, event *component.StatusEvent) +} + +func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { + e.onStatusChanged(source, event) +} diff --git a/component/componenttest/unhealthy_processor.go b/component/componenttest/unhealthy_processor.go new file mode 100644 index 00000000000..2a84874fa7b --- /dev/null +++ b/component/componenttest/unhealthy_processor.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { + return component.ProcessorCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type unhealthyProcessorConfig struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +func NewUnhealthyProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory( + "unhealthy", + func() component.ProcessorConfig { + return &unhealthyProcessorConfig{ + ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + } + }, + component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), + component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + ) +} + +func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +var unhealthyProcessorInstance = &unhealthyProcessor{ + Consumer: consumertest.NewNop(), +} + +// unhealthyProcessor stores consumed traces and metrics for testing purposes. +type unhealthyProcessor struct { + nopComponent + consumertest.Consumer +} + +func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { + go func() { + evt, _ := component.NewStatusEvent(component.StatusError) + host.ReportComponentStatus(evt) + }() + return nil +} diff --git a/component/host.go b/component/host.go index ea3825d743f..79e653a19a2 100644 --- a/component/host.go +++ b/component/host.go @@ -12,8 +12,17 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + // May be called by the component any time after Component.Start is called or while + // Component.Start call is executing. + // May be called concurrently with itself. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..42de901445f --- /dev/null +++ b/component/status.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" +) + +type Status int32 + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // ComponentStatusChanged notifies about a change in the source component status. + // Extensions that implement this interface must be ready that the ComponentStatusChanged + // may be called before, after or concurrently with Component.Shutdown() call. + // The function may be called concurrently with itself. + ComponentStatusChanged(source StatusSource, event *StatusEvent) +} diff --git a/component/status_test.go b/component/status_test.go new file mode 100644 index 00000000000..65603b04555 --- /dev/null +++ b/component/status_test.go @@ -0,0 +1,11 @@ +package component + +import ( + "fmt" + "testing" + "unsafe" +) + +func TestStatusEventSize(t *testing.T) { + fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) +} diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml new file mode 100644 index 00000000000..34e6ea80063 --- /dev/null +++ b/otelcol/testdata/otelcol-statuswatcher.yaml @@ -0,0 +1,31 @@ +receivers: + nop: + +processors: + nop: + unhealthy: + +exporters: + nop: + +extensions: + statuswatcher: + +service: + telemetry: + metrics: + address: localhost:8888 + extensions: [statuswatcher] + pipelines: + traces: + receivers: [nop] + processors: [nop,unhealthy,unhealthy] + exporters: [nop] + metrics: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + logs: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 5ff92cd5852..fc67925cabc 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -26,13 +27,26 @@ type Extensions struct { extMap map[component.ID]extension.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -84,6 +98,16 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err return errs } +func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.ComponentStatusChanged(source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index d216ae94adb..ff0d6a07aa2 100644 --- a/service/host.go +++ b/service/host.go @@ -12,9 +12,10 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -33,10 +34,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyComponentStatusChange(source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 2d386ddad67..cd6397164bd 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -9,17 +9,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -30,6 +34,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 62b7a744681..282701e6a21 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -7,12 +7,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -func Test_newHostWrapper(_ *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) +func Test_newHostWrapper(t *testing.T) { + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +}