From 2ac7d388cf548e538601796ec9fff25ae5eb6a78 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 15 Nov 2022 16:02:53 -0500 Subject: [PATCH] Introduce component status reporting This is an alternate to https://github.com/open-telemetry/opentelemetry-collector/pull/6550 ## Summary of changes - Add component status concept. Components can report their status via Host.ReportComponentStatus(). Interested extensions can watch status events if they implement StatusWatcher interface. This is similar to how PipelineWatcher works today. - Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus(). ## TODO after this is merged - healthcheck extension must implement StatusWatcher. - Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib. ## Open Questions - StatusWatchers need to be able to tell if all current components are healthy. It is assumed that the listeners need to maintain a map of components and track the status of each component. This works only if we assume that the set of components cannot change during the lifetime of the listener. This assumption is true today but can change in the future if we introduce partial pipeline restarts where only modified/added/removed components are recreated (this will break listener's assumption and the map will become invalid). Should we instead keep track of this entire status map in the Host and broadcast the entire status to the listeners as a whole instead of (or in addition to) individual component events? --- component/componenttest/nop_host.go | 2 + .../componenttest/statuswatcher_extension.go | 61 +++++++++++++ .../componenttest/unhealthy_processor.go | 81 +++++++++++++++++ component/host.go | 6 ++ component/status.go | 87 +++++++++++++++++++ component/status_test.go | 11 +++ service/collector.go | 4 +- service/extensions/extensions.go | 28 +++++- service/host.go | 8 +- service/internal/components/host_wrapper.go | 14 ++- .../internal/components/host_wrapper_test.go | 9 +- service/internal/pipelines/pipelines.go | 29 +++++-- service/internal/pipelines/pipelines_test.go | 9 +- service/internal/servicehost/host.go | 38 ++++++++ service/internal/servicehost/nop_host.go | 45 ++++++++++ service/service_test.go | 72 +++++++++++++++ service/testdata/otelcol-statuswatcher.yaml | 31 +++++++ 17 files changed, 517 insertions(+), 18 deletions(-) create mode 100644 component/componenttest/statuswatcher_extension.go create mode 100644 component/componenttest/unhealthy_processor.go create mode 100644 component/status.go create mode 100644 component/status_test.go create mode 100644 service/internal/servicehost/host.go create mode 100644 service/internal/servicehost/nop_host.go create mode 100644 service/testdata/otelcol-statuswatcher.yaml diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..c24584b2b2a 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -28,6 +28,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/componenttest/statuswatcher_extension.go b/component/componenttest/statuswatcher_extension.go new file mode 100644 index 00000000000..ffc01a76296 --- /dev/null +++ b/component/componenttest/statuswatcher_extension.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" +) + +// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. +func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { + return component.ExtensionCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type statusWatcherExtensionConfig struct { + config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. +func NewStatusWatcherExtensionFactory( + onStatusChanged func(source component.StatusSource, event *component.StatusEvent), +) component.ExtensionFactory { + return component.NewExtensionFactory( + "statuswatcher", + func() component.ExtensionConfig { + return &statusWatcherExtensionConfig{ + ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), + } + }, + func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil + }, + component.StabilityLevelStable) +} + +// statusWatcherExtension stores consumed traces and metrics for testing purposes. +type statusWatcherExtension struct { + nopComponent + onStatusChanged func(source component.StatusSource, event *component.StatusEvent) +} + +func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { + e.onStatusChanged(source, event) +} diff --git a/component/componenttest/unhealthy_processor.go b/component/componenttest/unhealthy_processor.go new file mode 100644 index 00000000000..2a84874fa7b --- /dev/null +++ b/component/componenttest/unhealthy_processor.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { + return component.ProcessorCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type unhealthyProcessorConfig struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +func NewUnhealthyProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory( + "unhealthy", + func() component.ProcessorConfig { + return &unhealthyProcessorConfig{ + ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + } + }, + component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), + component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + ) +} + +func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +var unhealthyProcessorInstance = &unhealthyProcessor{ + Consumer: consumertest.NewNop(), +} + +// unhealthyProcessor stores consumed traces and metrics for testing purposes. +type unhealthyProcessor struct { + nopComponent + consumertest.Consumer +} + +func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { + go func() { + evt, _ := component.NewStatusEvent(component.StatusError) + host.ReportComponentStatus(evt) + }() + return nil +} diff --git a/component/host.go b/component/host.go index 92c395b0306..12d703cf542 100644 --- a/component/host.go +++ b/component/host.go @@ -23,8 +23,14 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..23db8899a22 --- /dev/null +++ b/component/status.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" +) + +type Status int32 + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // ComponentStatusChanged notifies about a change in the source component status. + ComponentStatusChanged(source StatusSource, event *StatusEvent) +} diff --git a/component/status_test.go b/component/status_test.go new file mode 100644 index 00000000000..65603b04555 --- /dev/null +++ b/component/status_test.go @@ -0,0 +1,11 @@ +package component + +import ( + "fmt" + "testing" + "unsafe" +) + +func TestStatusEventSize(t *testing.T) { + fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) +} diff --git a/service/collector.go b/service/collector.go index 4250e9874a5..abcaeaa4422 100644 --- a/service/collector.go +++ b/service/collector.go @@ -28,9 +28,9 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/internal/grpclog" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // State defines Collector's state. @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) { col.state.Store(int32(state)) } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { var ballastSize uint64 extensions := host.GetExtensions() for _, extension := range extensions { diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..f1a47b0bc5c 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -36,13 +37,26 @@ type Extensions struct { extMap map[component.ID]component.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -83,6 +97,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.ComponentStatusChanged(source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { result := make(map[component.ID]component.Extension, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index 31be010608d..195b6e6be0a 100644 --- a/service/host.go +++ b/service/host.go @@ -18,9 +18,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/pipelines" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -34,10 +35,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyComponentStatusChange(source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 78428e4e2f0..67beaf0b981 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -20,17 +20,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -41,6 +45,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 720669bd847..0738df63600 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -18,12 +18,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) func Test_newHostWrapper(t *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..bcb6e0879a0 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -65,18 +66,32 @@ type Pipelines struct { pipelines map[component.ID]*builtPipeline } +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + // StartAll starts all pipelines. // // Start with exporters, processors (in reverse configured order), then receivers. // This is important so that components that are earlier in the pipeline and reference components that are // later in the pipeline do not start sending data to later components which are not yet started. -func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { +func (bps *Pipelines) StartAll(ctx context.Context, host servicehost.Host) error { bps.telemetry.Logger.Info("Starting exporters...") for dt, expByID := range bps.allExporters { for expID, exp := range expByID { expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindExporter, expID} + if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil { return err } expLogger.Info("Exporter started.") @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting processors...") for pipelineID, bp := range bps.pipelines { for i := len(bp.processors) - 1; i >= 0; i-- { - procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + processor := bp.processors[i] + procID := processor.id + procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID) procLogger.Info("Processor is starting...") - if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindProcessor, procID} + if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil { return err } procLogger.Info("Processor started.") @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for recvID, recv := range recvByID { recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) recvLogger.Info("Receiver is starting...") - if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindReceiver, recvID} + if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil { return err } recvLogger.Info("Receiver started.") diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 2bc2ed63096..d92f05db19f 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -32,6 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/service/internal/configunmarshaler" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" ) @@ -95,7 +96,7 @@ func TestBuild(t *testing.T) { pipelines, err := Build(context.Background(), toSettings(factories, cfg)) assert.NoError(t, err) - assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) // Verify exporters created, started and empty. for _, expID := range test.exporterIDs { @@ -311,7 +312,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -325,7 +326,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -339,7 +340,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +} diff --git a/service/service_test.go b/service/service_test.go index c65d322a80b..248b7dc4432 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -19,7 +19,9 @@ import ( "fmt" "net/http" "path/filepath" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -223,3 +225,73 @@ func createExampleService(t *testing.T, factories component.Factories) *service }) return srv } + +func TestComponentStatusWatcher(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + + // Use a processor factory that creates "unhealthy" processor: one that + // always reports StatusError after successful Start. + unhealthyProcessorFactory := componenttest.NewUnhealthyProcessorFactory() + factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory + + // Keep track of all status changes in a map. + changedComponents := map[component.StatusSource]component.Status{} + var mux sync.Mutex + onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) { + mux.Lock() + defer mux.Unlock() + changedComponents[source] = event.Status() + } + + // Add a "statuswatcher" extension that will receive notifications when processor + // status changes. + factory := componenttest.NewStatusWatcherExtensionFactory(onStatusChanged) + factories.Extensions[factory.Type()] = factory + + // Read config from file. This config uses 4 "unhealthy" processors. + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) + require.NoError(t, err) + validCfg, err := validProvider.Get(context.Background(), factories) + require.NoError(t, err) + + // Create a service + telemetry := newColTelemetry(featuregate.NewRegistry()) + srv, err := newService(&settings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + Config: validCfg, + telemetry: telemetry, + }) + require.NoError(t, err) + + // Start the service. + require.NoError(t, srv.Start(context.Background())) + + // The "unhealthy" processors will now begin to asynchronously report StatusError. + // We expect to see these reports. + + assert.Eventually(t, func() bool { + mux.Lock() + defer mux.Unlock() + + for k, v := range changedComponents { + // All processors must report a status change with the same ID + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID()) + // And all must be in StatusError + assert.EqualValues(t, component.StatusError, v) + } + + // We have 4 processors with exactly the same ID in otelcol-statuswatcher.yaml + // We must have exactly 4 items in our map. This ensures that the "source" argument + // passed to status change func is unique per instance of source component despite + // components having the same IDs (having same ID for different component instances + // is a normal situation for processors, but also not prohibited for other component types). + return len(changedComponents) == 4 + }, time.Second, time.Millisecond*10) + + t.Cleanup(func() { + assert.NoError(t, telemetry.shutdown()) + assert.NoError(t, srv.Shutdown(context.Background())) + }) +} diff --git a/service/testdata/otelcol-statuswatcher.yaml b/service/testdata/otelcol-statuswatcher.yaml new file mode 100644 index 00000000000..34e6ea80063 --- /dev/null +++ b/service/testdata/otelcol-statuswatcher.yaml @@ -0,0 +1,31 @@ +receivers: + nop: + +processors: + nop: + unhealthy: + +exporters: + nop: + +extensions: + statuswatcher: + +service: + telemetry: + metrics: + address: localhost:8888 + extensions: [statuswatcher] + pipelines: + traces: + receivers: [nop] + processors: [nop,unhealthy,unhealthy] + exporters: [nop] + metrics: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + logs: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop]