diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index 4accfab0d8c..04c67080df9 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -17,6 +17,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/componenttest/statuswatcher_extension.go b/component/componenttest/statuswatcher_extension.go new file mode 100644 index 00000000000..ffc01a76296 --- /dev/null +++ b/component/componenttest/statuswatcher_extension.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" +) + +// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. +func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { + return component.ExtensionCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type statusWatcherExtensionConfig struct { + config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. +func NewStatusWatcherExtensionFactory( + onStatusChanged func(source component.StatusSource, event *component.StatusEvent), +) component.ExtensionFactory { + return component.NewExtensionFactory( + "statuswatcher", + func() component.ExtensionConfig { + return &statusWatcherExtensionConfig{ + ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), + } + }, + func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil + }, + component.StabilityLevelStable) +} + +// statusWatcherExtension stores consumed traces and metrics for testing purposes. +type statusWatcherExtension struct { + nopComponent + onStatusChanged func(source component.StatusSource, event *component.StatusEvent) +} + +func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { + e.onStatusChanged(source, event) +} diff --git a/component/componenttest/unhealthy_processor.go b/component/componenttest/unhealthy_processor.go new file mode 100644 index 00000000000..2a84874fa7b --- /dev/null +++ b/component/componenttest/unhealthy_processor.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { + return component.ProcessorCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type unhealthyProcessorConfig struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +func NewUnhealthyProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory( + "unhealthy", + func() component.ProcessorConfig { + return &unhealthyProcessorConfig{ + ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + } + }, + component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), + component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + ) +} + +func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +var unhealthyProcessorInstance = &unhealthyProcessor{ + Consumer: consumertest.NewNop(), +} + +// unhealthyProcessor stores consumed traces and metrics for testing purposes. +type unhealthyProcessor struct { + nopComponent + consumertest.Consumer +} + +func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { + go func() { + evt, _ := component.NewStatusEvent(component.StatusError) + host.ReportComponentStatus(evt) + }() + return nil +} diff --git a/component/host.go b/component/host.go index ea3825d743f..79e653a19a2 100644 --- a/component/host.go +++ b/component/host.go @@ -12,8 +12,17 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + // May be called by the component any time after Component.Start is called or while + // Component.Start call is executing. + // May be called concurrently with itself. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..42de901445f --- /dev/null +++ b/component/status.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" +) + +type Status int32 + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // ComponentStatusChanged notifies about a change in the source component status. + // Extensions that implement this interface must be ready that the ComponentStatusChanged + // may be called before, after or concurrently with Component.Shutdown() call. + // The function may be called concurrently with itself. + ComponentStatusChanged(source StatusSource, event *StatusEvent) +} diff --git a/component/status_test.go b/component/status_test.go new file mode 100644 index 00000000000..65603b04555 --- /dev/null +++ b/component/status_test.go @@ -0,0 +1,11 @@ +package component + +import ( + "fmt" + "testing" + "unsafe" +) + +func TestStatusEventSize(t *testing.T) { + fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) +} diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml new file mode 100644 index 00000000000..34e6ea80063 --- /dev/null +++ b/otelcol/testdata/otelcol-statuswatcher.yaml @@ -0,0 +1,31 @@ +receivers: + nop: + +processors: + nop: + unhealthy: + +exporters: + nop: + +extensions: + statuswatcher: + +service: + telemetry: + metrics: + address: localhost:8888 + extensions: [statuswatcher] + pipelines: + traces: + receivers: [nop] + processors: [nop,unhealthy,unhealthy] + exporters: [nop] + metrics: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + logs: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 5ff92cd5852..fc67925cabc 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -26,13 +27,26 @@ type Extensions struct { extMap map[component.ID]extension.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -84,6 +98,16 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err return errs } +func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.ComponentStatusChanged(source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index d216ae94adb..ff0d6a07aa2 100644 --- a/service/host.go +++ b/service/host.go @@ -12,9 +12,10 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -33,10 +34,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyComponentStatusChange(source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 2d386ddad67..cd6397164bd 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -9,17 +9,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -30,6 +34,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 62b7a744681..282701e6a21 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -7,12 +7,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -func Test_newHostWrapper(_ *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) +func Test_newHostWrapper(t *testing.T) { + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +}