From d73c8048a4e8838f5e9867e2a913256645958495 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 15 Nov 2022 14:12:25 -0500 Subject: [PATCH] Introduce component status reporting ## Summary of changes - Add component/status package. This introduces the concepts of component status event and pipeline readiness status. - Introduce Host.RegisterStatusListener() to allow components to listen for status changes. - Deprecate Host.ReportFatalError() in favour of Host.ReportComponentStatus(). - Move component ID and Type to a separate component/id package. This is necessary to avoid dependency cycle with the new component/status package. Large part of the diff in this commit is because of this particular change. If this is too large and difficult to review we can split this commit into 2 parts where this particular change is a separate commit. - Deprecated component.ID and component.Type in favour of id.ID and id.Type. ## TODO after this is merged - healthcheck extension must register and listen to component statuses. - Replace all ReportFatalError() calls by ReportComponentStatus() calls in core and contrib. ## Open Questions - Do we want to name component/id package component/componentid instead? - The pipelines readiness status is also implemented in the component/status package. We can split the status, componentstatus and pipelinestatus into separate packages but not sure if it warrants creation of a new top-level package just to be able to stay pure in the component package. We can try this if think it is a better approach. - Listeners need to be able to tell if all current components are healthy. It is assumed that the listeners need to maintain a map of components and track the status of each component. This works only if we assume that the set of components cannot change during the lifetime of the listener. This assumption is true today but can change in the future if we introduce partial pipeline restarts where only modified/added/removed components are recreated (this will break listener's assumption and the map will become invalid). Should we instead keep track of this entire status map in the Host and broadcast the entire status to the listeners as a whole instead of (or in addition to) individual component events? --- component/component.go | 8 +- component/componenttest/nop_exporter.go | 3 +- component/componenttest/nop_exporter_test.go | 6 +- component/componenttest/nop_extension.go | 3 +- component/componenttest/nop_extension_test.go | 6 +- component/componenttest/nop_host.go | 15 ++- component/componenttest/nop_host_test.go | 3 + component/componenttest/nop_processor.go | 3 +- component/componenttest/nop_processor_test.go | 6 +- component/componenttest/nop_receiver.go | 3 +- component/componenttest/nop_receiver_test.go | 6 +- component/config.go | 6 +- component/experimental/component/factory.go | 3 +- component/exporter.go | 3 +- component/exporter_test.go | 5 +- component/extension.go | 4 +- component/extension_test.go | 3 +- component/factories.go | 26 ++-- component/factories_test.go | 18 +-- component/host.go | 26 +++- component/identifiable.go | 82 ++---------- component/identifiable_test.go | 86 ------------ component/processor.go | 3 +- component/processor_test.go | 5 +- component/receiver.go | 3 +- component/receiver_test.go | 5 +- service/config.go | 13 +- service/config_provider_test.go | 37 +++--- service/config_test.go | 79 +++++------ service/extensions/config.go | 6 +- service/extensions/extensions.go | 32 +++-- service/extensions/extensions_test.go | 41 +++--- service/host.go | 41 +++++- service/internal/components/host_wrapper.go | 21 ++- .../internal/components/host_wrapper_test.go | 7 +- service/internal/configunmarshaler/error.go | 6 +- .../internal/configunmarshaler/exporters.go | 13 +- .../configunmarshaler/exporters_test.go | 7 +- .../internal/configunmarshaler/extensions.go | 13 +- .../configunmarshaler/extensions_test.go | 7 +- .../internal/configunmarshaler/processors.go | 13 +- .../configunmarshaler/processors_test.go | 7 +- .../internal/configunmarshaler/receivers.go | 13 +- .../configunmarshaler/receivers_test.go | 7 +- service/internal/pipelines/pipelines.go | 109 +++++++++------- service/internal/pipelines/pipelines_test.go | 123 +++++++++--------- .../testcomponents/example_exporter.go | 3 +- .../testcomponents/example_factories.go | 7 +- .../testcomponents/example_processor.go | 3 +- .../testcomponents/example_receiver.go | 5 +- service/service.go | 24 +++- service/service_test.go | 100 +++++++++++++- service/servicetest/configprovider_test.go | 28 ++-- service/unmarshaler_test.go | 4 +- 54 files changed, 611 insertions(+), 498 deletions(-) delete mode 100644 component/identifiable_test.go diff --git a/component/component.go b/component/component.go index 277bf938fa8..3b6e6b7824a 100644 --- a/component/component.go +++ b/component/component.go @@ -17,6 +17,8 @@ package component // import "go.opentelemetry.io/collector/component" import ( "context" "errors" + + "go.opentelemetry.io/collector/component/id" ) var ( @@ -160,17 +162,17 @@ func (sl StabilityLevel) LogMessage() string { // use the factory helpers for the appropriate component type. type Factory interface { // Type gets the type of the component created by this factory. - Type() Type + Type() id.Type unexportedFactoryFunc() } type baseFactory struct { - cfgType Type + cfgType id.Type } func (baseFactory) unexportedFactoryFunc() {} -func (bf baseFactory) Type() Type { +func (bf baseFactory) Type() id.Type { return bf.cfgType } diff --git a/component/componenttest/nop_exporter.go b/component/componenttest/nop_exporter.go index 2617d7c29d1..edbbf931dfb 100644 --- a/component/componenttest/nop_exporter.go +++ b/component/componenttest/nop_exporter.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumertest" ) @@ -40,7 +41,7 @@ func NewNopExporterFactory() component.ExporterFactory { "nop", func() component.ExporterConfig { return &nopExporterConfig{ - ExporterSettings: config.NewExporterSettings(component.NewID("nop")), + ExporterSettings: config.NewExporterSettings(id.NewID("nop")), } }, component.WithTracesExporter(createTracesExporter, component.StabilityLevelStable), diff --git a/component/componenttest/nop_exporter_test.go b/component/componenttest/nop_exporter_test.go index 19694f3dd4e..f299ef8c639 100644 --- a/component/componenttest/nop_exporter_test.go +++ b/component/componenttest/nop_exporter_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -31,9 +31,9 @@ import ( func TestNewNopExporterFactory(t *testing.T) { factory := NewNopExporterFactory() require.NotNil(t, factory) - assert.Equal(t, component.Type("nop"), factory.Type()) + assert.Equal(t, id.Type("nop"), factory.Type()) cfg := factory.CreateDefaultConfig() - assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(component.NewID("nop"))}, cfg) + assert.Equal(t, &nopExporterConfig{ExporterSettings: config.NewExporterSettings(id.NewID("nop"))}, cfg) traces, err := factory.CreateTracesExporter(context.Background(), NewNopExporterCreateSettings(), cfg) require.NoError(t, err) diff --git a/component/componenttest/nop_extension.go b/component/componenttest/nop_extension.go index 8960e530012..e7c0a3037ca 100644 --- a/component/componenttest/nop_extension.go +++ b/component/componenttest/nop_extension.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) @@ -39,7 +40,7 @@ func NewNopExtensionFactory() component.ExtensionFactory { "nop", func() component.ExtensionConfig { return &nopExtensionConfig{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("nop")), + ExtensionSettings: config.NewExtensionSettings(id.NewID("nop")), } }, func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { diff --git a/component/componenttest/nop_extension_test.go b/component/componenttest/nop_extension_test.go index 9587cc302a5..aa7108a2c20 100644 --- a/component/componenttest/nop_extension_test.go +++ b/component/componenttest/nop_extension_test.go @@ -21,16 +21,16 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) func TestNewNopExtensionFactory(t *testing.T) { factory := NewNopExtensionFactory() require.NotNil(t, factory) - assert.Equal(t, component.Type("nop"), factory.Type()) + assert.Equal(t, id.Type("nop"), factory.Type()) cfg := factory.CreateDefaultConfig() - assert.Equal(t, &nopExtensionConfig{ExtensionSettings: config.NewExtensionSettings(component.NewID("nop"))}, cfg) + assert.Equal(t, &nopExtensionConfig{ExtensionSettings: config.NewExtensionSettings(id.NewID("nop"))}, cfg) traces, err := factory.CreateExtension(context.Background(), NewNopExtensionCreateSettings(), cfg) require.NoError(t, err) diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..d26c5845646 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -16,6 +16,8 @@ package componenttest // import "go.opentelemetry.io/collector/component/compone import ( "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" + "go.opentelemetry.io/collector/component/status" ) // nopHost mocks a receiver.ReceiverHost for test purposes. @@ -28,14 +30,21 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} -func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { +func (nh *nopHost) GetFactory(_ component.Kind, _ id.Type) component.Factory { return nil } -func (nh *nopHost) GetExtensions() map[component.ID]component.Extension { +func (nh *nopHost) GetExtensions() map[id.ID]component.Extension { return nil } -func (nh *nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (nh *nopHost) GetExporters() map[component.DataType]map[id.ID]component.Exporter { + return nil +} + +func (nh *nopHost) ReportComponentStatus(event *status.ComponentEvent) { +} + +func (nh *nopHost) RegisterStatusListener(options ...status.ListenerOption) component.StatusListenerUnregisterFunc { return nil } diff --git a/component/componenttest/nop_host_test.go b/component/componenttest/nop_host_test.go index a3057c3b97e..66af1aae126 100644 --- a/component/componenttest/nop_host_test.go +++ b/component/componenttest/nop_host_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/status" ) func TestNewNopHost(t *testing.T) { @@ -30,6 +31,8 @@ func TestNewNopHost(t *testing.T) { require.IsType(t, &nopHost{}, nh) nh.ReportFatalError(errors.New("TestError")) + nh.ReportComponentStatus(&status.ComponentEvent{}) + assert.Nil(t, nh.RegisterStatusListener()) assert.Nil(t, nh.GetExporters()) assert.Nil(t, nh.GetExtensions()) assert.Nil(t, nh.GetFactory(component.KindReceiver, "test")) diff --git a/component/componenttest/nop_processor.go b/component/componenttest/nop_processor.go index 6fa0a4052b9..4af95b89fe7 100644 --- a/component/componenttest/nop_processor.go +++ b/component/componenttest/nop_processor.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -41,7 +42,7 @@ func NewNopProcessorFactory() component.ProcessorFactory { "nop", func() component.ProcessorConfig { return &nopProcessorConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + ProcessorSettings: config.NewProcessorSettings(id.NewID("nop")), } }, component.WithTracesProcessor(createTracesProcessor, component.StabilityLevelStable), diff --git a/component/componenttest/nop_processor_test.go b/component/componenttest/nop_processor_test.go index 8601a89bbc0..b152790f981 100644 --- a/component/componenttest/nop_processor_test.go +++ b/component/componenttest/nop_processor_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -33,9 +33,9 @@ import ( func TestNewNopProcessorFactory(t *testing.T) { factory := NewNopProcessorFactory() require.NotNil(t, factory) - assert.Equal(t, component.Type("nop"), factory.Type()) + assert.Equal(t, id.Type("nop"), factory.Type()) cfg := factory.CreateDefaultConfig() - assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(component.NewID("nop"))}, cfg) + assert.Equal(t, &nopProcessorConfig{ProcessorSettings: config.NewProcessorSettings(id.NewID("nop"))}, cfg) traces, err := factory.CreateTracesProcessor(context.Background(), NewNopProcessorCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) diff --git a/component/componenttest/nop_receiver.go b/component/componenttest/nop_receiver.go index 8adc4b4615d..b8faf2c8116 100644 --- a/component/componenttest/nop_receiver.go +++ b/component/componenttest/nop_receiver.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" ) @@ -40,7 +41,7 @@ func NewNopReceiverFactory() component.ReceiverFactory { "nop", func() component.ReceiverConfig { return &nopReceiverConfig{ - ReceiverSettings: config.NewReceiverSettings(component.NewID("nop")), + ReceiverSettings: config.NewReceiverSettings(id.NewID("nop")), } }, component.WithTracesReceiver(createTracesReceiver, component.StabilityLevelStable), diff --git a/component/componenttest/nop_receiver_test.go b/component/componenttest/nop_receiver_test.go index 723f49f6314..fa4b08ab388 100644 --- a/component/componenttest/nop_receiver_test.go +++ b/component/componenttest/nop_receiver_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumertest" ) @@ -29,9 +29,9 @@ import ( func TestNewNopReceiverFactory(t *testing.T) { factory := NewNopReceiverFactory() require.NotNil(t, factory) - assert.Equal(t, component.Type("nop"), factory.Type()) + assert.Equal(t, id.Type("nop"), factory.Type()) cfg := factory.CreateDefaultConfig() - assert.Equal(t, &nopReceiverConfig{ReceiverSettings: config.NewReceiverSettings(component.NewID("nop"))}, cfg) + assert.Equal(t, &nopReceiverConfig{ReceiverSettings: config.NewReceiverSettings(id.NewID("nop"))}, cfg) traces, err := factory.CreateTracesReceiver(context.Background(), NewNopReceiverCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) diff --git a/component/config.go b/component/config.go index 3fb147b78e1..c685ff28ade 100644 --- a/component/config.go +++ b/component/config.go @@ -15,12 +15,10 @@ package component // import "go.opentelemetry.io/collector/component" import ( + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) -// Type is the component type as it is used in the config. -type Type string - // validatable defines the interface for the configuration validation. type validatable interface { // Validate validates the configuration and returns an error if invalid. @@ -29,7 +27,7 @@ type validatable interface { // DataType is a special Type that represents the data types supported by the collector. We currently support // collecting metrics, traces and logs, this can expand in the future. -type DataType = Type +type DataType = id.Type // Currently supported data types. Add new data types here when new types are supported in the future. const ( diff --git a/component/experimental/component/factory.go b/component/experimental/component/factory.go index cb15b114ca1..a833487d6e2 100644 --- a/component/experimental/component/factory.go +++ b/component/experimental/component/factory.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config/experimental/config" "go.opentelemetry.io/collector/config/experimental/configsource" ) @@ -56,4 +57,4 @@ type ConfigSourceFactory interface { } // ConfigSourceFactories maps the type of a ConfigSource to the respective factory object. -type ConfigSourceFactories map[component.Type]ConfigSourceFactory +type ConfigSourceFactories map[id.Type]ConfigSourceFactory diff --git a/component/exporter.go b/component/exporter.go index e824c022ffe..afccc5396bc 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component" import ( "context" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" ) @@ -213,7 +214,7 @@ func WithLogsExporter(createLogsExporter CreateLogsExporterFunc, sl StabilityLev } // NewExporterFactory returns a ExporterFactory. -func NewExporterFactory(cfgType Type, createDefaultConfig ExporterCreateDefaultConfigFunc, options ...ExporterFactoryOption) ExporterFactory { +func NewExporterFactory(cfgType id.Type, createDefaultConfig ExporterCreateDefaultConfigFunc, options ...ExporterFactoryOption) ExporterFactory { f := &exporterFactory{ baseFactory: baseFactory{cfgType: cfgType}, ExporterCreateDefaultConfigFunc: createDefaultConfig, diff --git a/component/exporter_test.go b/component/exporter_test.go index ef541cec19b..6fc4c5752a1 100644 --- a/component/exporter_test.go +++ b/component/exporter_test.go @@ -23,12 +23,13 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) func TestNewExporterFactory(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewExporterSettings(component.NewID(typeStr)) + defaultCfg := config.NewExporterSettings(id.NewID(typeStr)) factory := component.NewExporterFactory( typeStr, func() component.ExporterConfig { return &defaultCfg }) @@ -44,7 +45,7 @@ func TestNewExporterFactory(t *testing.T) { func TestNewExporterFactory_WithOptions(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewExporterSettings(component.NewID(typeStr)) + defaultCfg := config.NewExporterSettings(id.NewID(typeStr)) factory := component.NewExporterFactory( typeStr, func() component.ExporterConfig { return &defaultCfg }, diff --git a/component/extension.go b/component/extension.go index c6f2cdf0024..372750ce4ff 100644 --- a/component/extension.go +++ b/component/extension.go @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component" import ( "context" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -47,6 +48,7 @@ type Extension interface { // Collector that is to be implemented by extensions interested in changes to pipeline // states. Typically this will be used by extensions that change their behavior if data is // being ingested or not, e.g.: a k8s readiness probe. +// Deprecated: [0.65.0] Use Host.RegisterStatusListener() instead. type PipelineWatcher interface { // Ready notifies the Extension that all pipelines were built and the // receivers were started, i.e.: the service is ready to receive data @@ -117,7 +119,7 @@ func (ef *extensionFactory) ExtensionStability() StabilityLevel { // NewExtensionFactory returns a new ExtensionFactory based on this configuration. func NewExtensionFactory( - cfgType Type, + cfgType id.Type, createDefaultConfig ExtensionCreateDefaultConfigFunc, createServiceExtension CreateExtensionFunc, sl StabilityLevel) ExtensionFactory { diff --git a/component/extension_test.go b/component/extension_test.go index 24c34386c44..710faf42644 100644 --- a/component/extension_test.go +++ b/component/extension_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) @@ -33,7 +34,7 @@ type nopExtension struct { func TestNewExtensionFactory(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewExtensionSettings(component.NewID(typeStr)) + defaultCfg := config.NewExtensionSettings(id.NewID(typeStr)) nopExtensionInstance := new(nopExtension) factory := component.NewExtensionFactory( diff --git a/component/factories.go b/component/factories.go index e666afaf73c..3ca83e56486 100644 --- a/component/factories.go +++ b/component/factories.go @@ -16,29 +16,31 @@ package component // import "go.opentelemetry.io/collector/component" import ( "fmt" + + "go.opentelemetry.io/collector/component/id" ) // Factories struct holds in a single type all component factories that // can be handled by the Config. type Factories struct { // Receivers maps receiver type names in the config to the respective factory. - Receivers map[Type]ReceiverFactory + Receivers map[id.Type]ReceiverFactory // Processors maps processor type names in the config to the respective factory. - Processors map[Type]ProcessorFactory + Processors map[id.Type]ProcessorFactory // Exporters maps exporter type names in the config to the respective factory. - Exporters map[Type]ExporterFactory + Exporters map[id.Type]ExporterFactory // Extensions maps extension type names in the config to the respective factory. - Extensions map[Type]ExtensionFactory + Extensions map[id.Type]ExtensionFactory } // MakeReceiverFactoryMap takes a list of receiver factories and returns a map // with factory type as keys. It returns a non-nil error when more than one factories // have the same type. -func MakeReceiverFactoryMap(factories ...ReceiverFactory) (map[Type]ReceiverFactory, error) { - fMap := map[Type]ReceiverFactory{} +func MakeReceiverFactoryMap(factories ...ReceiverFactory) (map[id.Type]ReceiverFactory, error) { + fMap := map[id.Type]ReceiverFactory{} for _, f := range factories { if _, ok := fMap[f.Type()]; ok { return fMap, fmt.Errorf("duplicate receiver factory %q", f.Type()) @@ -51,8 +53,8 @@ func MakeReceiverFactoryMap(factories ...ReceiverFactory) (map[Type]ReceiverFact // MakeProcessorFactoryMap takes a list of processor factories and returns a map // with factory type as keys. It returns a non-nil error when more than one factories // have the same type. -func MakeProcessorFactoryMap(factories ...ProcessorFactory) (map[Type]ProcessorFactory, error) { - fMap := map[Type]ProcessorFactory{} +func MakeProcessorFactoryMap(factories ...ProcessorFactory) (map[id.Type]ProcessorFactory, error) { + fMap := map[id.Type]ProcessorFactory{} for _, f := range factories { if _, ok := fMap[f.Type()]; ok { return fMap, fmt.Errorf("duplicate processor factory %q", f.Type()) @@ -65,8 +67,8 @@ func MakeProcessorFactoryMap(factories ...ProcessorFactory) (map[Type]ProcessorF // MakeExporterFactoryMap takes a list of exporter factories and returns a map // with factory type as keys. It returns a non-nil error when more than one factories // have the same type. -func MakeExporterFactoryMap(factories ...ExporterFactory) (map[Type]ExporterFactory, error) { - fMap := map[Type]ExporterFactory{} +func MakeExporterFactoryMap(factories ...ExporterFactory) (map[id.Type]ExporterFactory, error) { + fMap := map[id.Type]ExporterFactory{} for _, f := range factories { if _, ok := fMap[f.Type()]; ok { return fMap, fmt.Errorf("duplicate exporter factory %q", f.Type()) @@ -79,8 +81,8 @@ func MakeExporterFactoryMap(factories ...ExporterFactory) (map[Type]ExporterFact // MakeExtensionFactoryMap takes a list of extension factories and returns a map // with factory type as keys. It returns a non-nil error when more than one factories // have the same type. -func MakeExtensionFactoryMap(factories ...ExtensionFactory) (map[Type]ExtensionFactory, error) { - fMap := map[Type]ExtensionFactory{} +func MakeExtensionFactoryMap(factories ...ExtensionFactory) (map[id.Type]ExtensionFactory, error) { + fMap := map[id.Type]ExtensionFactory{} for _, f := range factories { if _, ok := fMap[f.Type()]; ok { return fMap, fmt.Errorf("duplicate extension factory %q", f.Type()) diff --git a/component/factories_test.go b/component/factories_test.go index 7b1972cd34b..582f7bd4a4b 100644 --- a/component/factories_test.go +++ b/component/factories_test.go @@ -18,13 +18,15 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/component/id" ) func TestMakeExtensionFactoryMap(t *testing.T) { type testCase struct { name string in []ExtensionFactory - out map[Type]ExtensionFactory + out map[id.Type]ExtensionFactory } p1 := NewExtensionFactory("p1", nil, nil, StabilityLevelAlpha) @@ -33,7 +35,7 @@ func TestMakeExtensionFactoryMap(t *testing.T) { { name: "different names", in: []ExtensionFactory{p1, p2}, - out: map[Type]ExtensionFactory{ + out: map[id.Type]ExtensionFactory{ p1.Type(): p1, p2.Type(): p2, }, @@ -61,7 +63,7 @@ func TestMakeReceiverFactoryMap(t *testing.T) { type testCase struct { name string in []ReceiverFactory - out map[Type]ReceiverFactory + out map[id.Type]ReceiverFactory } p1 := NewReceiverFactory("p1", nil) @@ -70,7 +72,7 @@ func TestMakeReceiverFactoryMap(t *testing.T) { { name: "different names", in: []ReceiverFactory{p1, p2}, - out: map[Type]ReceiverFactory{ + out: map[id.Type]ReceiverFactory{ p1.Type(): p1, p2.Type(): p2, }, @@ -99,7 +101,7 @@ func TestMakeProcessorFactoryMap(t *testing.T) { type testCase struct { name string in []ProcessorFactory - out map[Type]ProcessorFactory + out map[id.Type]ProcessorFactory } p1 := NewProcessorFactory("p1", nil) @@ -108,7 +110,7 @@ func TestMakeProcessorFactoryMap(t *testing.T) { { name: "different names", in: []ProcessorFactory{p1, p2}, - out: map[Type]ProcessorFactory{ + out: map[id.Type]ProcessorFactory{ p1.Type(): p1, p2.Type(): p2, }, @@ -137,7 +139,7 @@ func TestMakeExporterFactoryMap(t *testing.T) { type testCase struct { name string in []ExporterFactory - out map[Type]ExporterFactory + out map[id.Type]ExporterFactory } p1 := NewExporterFactory("p1", nil) @@ -146,7 +148,7 @@ func TestMakeExporterFactoryMap(t *testing.T) { { name: "different names", in: []ExporterFactory{p1, p2}, - out: map[Type]ExporterFactory{ + out: map[id.Type]ExporterFactory{ p1.Type(): p1, p2.Type(): p2, }, diff --git a/component/host.go b/component/host.go index 92c395b0306..3a617dddd9d 100644 --- a/component/host.go +++ b/component/host.go @@ -13,6 +13,10 @@ // limitations under the License. package component // import "go.opentelemetry.io/collector/component" +import ( + "go.opentelemetry.io/collector/component/id" + "go.opentelemetry.io/collector/component/status" +) // Host represents the entity that is hosting a Component. It is used to allow communication // between the Component and its host (normally the service.Collector is the host). @@ -23,6 +27,7 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) // GetFactory of the specified kind. Returns the factory for a component type. @@ -36,7 +41,7 @@ type Host interface { // GetFactory can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. Note that the component is responsible for destroying // other components that it creates. - GetFactory(kind Kind, componentType Type) Factory + GetFactory(kind Kind, componentType id.Type) Factory // GetExtensions returns the map of extensions. Only enabled and created extensions will be returned. // Typically is used to find an extension by type or by full config name. Both cases @@ -45,7 +50,7 @@ type Host interface { // // GetExtensions can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. - GetExtensions() map[ID]Extension + GetExtensions() map[id.ID]Extension // GetExporters returns the map of exporters. Only enabled and created exporters will be returned. // Typically is used to find exporters by type or by full config name. Both cases @@ -58,5 +63,20 @@ type Host interface { // // GetExporters can be called by the component anytime after Component.Start() begins and // until Component.Shutdown() ends. - GetExporters() map[DataType]map[ID]Exporter + GetExporters() map[DataType]map[id.ID]Exporter + + // ReportComponentStatus can be used by a component to communicate its status to registered status + // listeners. Components can use this function to indicate that they are functioning properly, + // or are in an error state. + ReportComponentStatus(event *status.ComponentEvent) + + // RegisterStatusListener allows interested components to register status listeners to receive + // the following events: + // - Events reported by via the ReportComponentStatus function, + // - Notifications about pipeline status changes (ready, not ready). + RegisterStatusListener(options ...status.ListenerOption) StatusListenerUnregisterFunc } + +// StatusListenerUnregisterFunc is a function to be called to unregister a component that has previously +// registered using Host.RegisterStatusListener(). +type StatusListenerUnregisterFunc func() error diff --git a/component/identifiable.go b/component/identifiable.go index 33d9ff095dd..f96f00bb22e 100644 --- a/component/identifiable.go +++ b/component/identifiable.go @@ -15,89 +15,29 @@ package component // import "go.opentelemetry.io/collector/component" import ( - "errors" - "fmt" - "strings" + "go.opentelemetry.io/collector/component/id" ) -// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys. -const typeAndNameSeparator = "/" - // identifiable is an interface that all components configurations MUST embed. type identifiable interface { // ID returns the ID of the component that this configuration belongs to. - ID() ID + ID() id.ID // SetIDName updates the name part of the ID for the component that this configuration belongs to. SetIDName(idName string) } -// ID represents the identity for a component. It combines two values: -// * type - the Type of the component. -// * name - the name of that component. -// The component ID (combination type + name) is unique for a given component.Kind. -type ID struct { - typeVal Type `mapstructure:"-"` - nameVal string `mapstructure:"-"` -} +// Deprecated: [0.65.0] Use id.ID instead. +type ID = id.ID + +// Deprecated: [0.65.0] Use id.Type instead. +type Type = id.Type -// NewID returns a new ID with the given Type and empty name. +// Deprecated: [0.65.0] Use id.NewID instead. func NewID(typeVal Type) ID { - return ID{typeVal: typeVal} + return id.NewID(typeVal) } -// NewIDWithName returns a new ID with the given Type and name. +// Deprecated: [0.65.0] Use id.NewIDWithName instead. func NewIDWithName(typeVal Type, nameVal string) ID { - return ID{typeVal: typeVal, nameVal: nameVal} -} - -// Type returns the type of the component. -func (id ID) Type() Type { - return id.typeVal -} - -// Name returns the custom name of the component. -func (id ID) Name() string { - return id.nameVal -} - -// MarshalText implements the encoding.TextMarshaler interface. -// This marshals the type and name as one string in the config. -func (id ID) MarshalText() (text []byte, err error) { - return []byte(id.String()), nil -} - -// UnmarshalText implements the encoding.TextUnmarshaler interface. -func (id *ID) UnmarshalText(text []byte) error { - idStr := string(text) - items := strings.SplitN(idStr, typeAndNameSeparator, 2) - if len(items) >= 1 { - id.typeVal = Type(strings.TrimSpace(items[0])) - } - - if len(items) == 1 && id.typeVal == "" { - return errors.New("id must not be empty") - } - - if id.typeVal == "" { - return fmt.Errorf("in %q id: the part before %s should not be empty", idStr, typeAndNameSeparator) - } - - if len(items) > 1 { - // "name" part is present. - id.nameVal = strings.TrimSpace(items[1]) - if id.nameVal == "" { - return fmt.Errorf("in %q id: the part after %s should not be empty", idStr, typeAndNameSeparator) - } - } - - return nil -} - -// String returns the ID string representation as "type[/name]" format. -func (id ID) String() string { - if id.nameVal == "" { - return string(id.typeVal) - } - - return string(id.typeVal) + typeAndNameSeparator + id.nameVal + return id.NewIDWithName(typeVal, nameVal) } diff --git a/component/identifiable_test.go b/component/identifiable_test.go deleted file mode 100644 index 15b32178360..00000000000 --- a/component/identifiable_test.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package component - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMarshalText(t *testing.T) { - id := NewIDWithName("test", "name") - got, err := id.MarshalText() - assert.NoError(t, err) - assert.Equal(t, id.String(), string(got)) -} - -func TestUnmarshalText(t *testing.T) { - var testCases = []struct { - idStr string - expectedErr bool - expectedID ID - }{ - { - idStr: "valid_type", - expectedID: ID{typeVal: "valid_type", nameVal: ""}, - }, - { - idStr: "valid_type/valid_name", - expectedID: ID{typeVal: "valid_type", nameVal: "valid_name"}, - }, - { - idStr: " valid_type / valid_name ", - expectedID: ID{typeVal: "valid_type", nameVal: "valid_name"}, - }, - { - idStr: "/valid_name", - expectedErr: true, - }, - { - idStr: " /valid_name", - expectedErr: true, - }, - { - idStr: "valid_type/", - expectedErr: true, - }, - { - idStr: "valid_type/ ", - expectedErr: true, - }, - { - idStr: " ", - expectedErr: true, - }, - } - - for _, test := range testCases { - t.Run(test.idStr, func(t *testing.T) { - id := ID{} - err := id.UnmarshalText([]byte(test.idStr)) - if test.expectedErr { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - assert.Equal(t, test.expectedID, id) - assert.Equal(t, test.expectedID.Type(), id.Type()) - assert.Equal(t, test.expectedID.Name(), id.Name()) - assert.Equal(t, test.expectedID.String(), id.String()) - }) - } -} diff --git a/component/processor.go b/component/processor.go index e3c78fa6f7d..acc718fcda8 100644 --- a/component/processor.go +++ b/component/processor.go @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component" import ( "context" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" ) @@ -228,7 +229,7 @@ func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc, sl Stability } // NewProcessorFactory returns a ProcessorFactory. -func NewProcessorFactory(cfgType Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory { +func NewProcessorFactory(cfgType id.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory { f := &processorFactory{ baseFactory: baseFactory{cfgType: cfgType}, ProcessorCreateDefaultConfigFunc: createDefaultConfig, diff --git a/component/processor_test.go b/component/processor_test.go index 2fa697f1f1f..f680ca42c60 100644 --- a/component/processor_test.go +++ b/component/processor_test.go @@ -23,13 +23,14 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" ) func TestNewProcessorFactory(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewProcessorSettings(component.NewID(typeStr)) + defaultCfg := config.NewProcessorSettings(id.NewID(typeStr)) factory := component.NewProcessorFactory( typeStr, func() component.ProcessorConfig { return &defaultCfg }) @@ -45,7 +46,7 @@ func TestNewProcessorFactory(t *testing.T) { func TestNewProcessorFactory_WithOptions(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewProcessorSettings(component.NewID(typeStr)) + defaultCfg := config.NewProcessorSettings(id.NewID(typeStr)) factory := component.NewProcessorFactory( typeStr, func() component.ProcessorConfig { return &defaultCfg }, diff --git a/component/receiver.go b/component/receiver.go index dcf4bf7e108..05b2ab5405a 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -17,6 +17,7 @@ package component // import "go.opentelemetry.io/collector/component" import ( "context" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/consumer" ) @@ -273,7 +274,7 @@ func WithLogsReceiver(createLogsReceiver CreateLogsReceiverFunc, sl StabilityLev } // NewReceiverFactory returns a ReceiverFactory. -func NewReceiverFactory(cfgType Type, createDefaultConfig ReceiverCreateDefaultConfigFunc, options ...ReceiverFactoryOption) ReceiverFactory { +func NewReceiverFactory(cfgType id.Type, createDefaultConfig ReceiverCreateDefaultConfigFunc, options ...ReceiverFactoryOption) ReceiverFactory { f := &receiverFactory{ baseFactory: baseFactory{cfgType: cfgType}, ReceiverCreateDefaultConfigFunc: createDefaultConfig, diff --git a/component/receiver_test.go b/component/receiver_test.go index 01d03e7f7f4..a1e6672b29d 100644 --- a/component/receiver_test.go +++ b/component/receiver_test.go @@ -23,13 +23,14 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" ) func TestNewReceiverFactory(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewReceiverSettings(component.NewID(typeStr)) + defaultCfg := config.NewReceiverSettings(id.NewID(typeStr)) factory := component.NewReceiverFactory( typeStr, func() component.ReceiverConfig { return &defaultCfg }) @@ -45,7 +46,7 @@ func TestNewReceiverFactory(t *testing.T) { func TestNewReceiverFactory_WithOptions(t *testing.T) { const typeStr = "test" - defaultCfg := config.NewReceiverSettings(component.NewID(typeStr)) + defaultCfg := config.NewReceiverSettings(id.NewID(typeStr)) factory := component.NewReceiverFactory( typeStr, func() component.ReceiverConfig { return &defaultCfg }, diff --git a/service/config.go b/service/config.go index 5900012770d..b8f97c3d67b 100644 --- a/service/config.go +++ b/service/config.go @@ -19,6 +19,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/service/telemetry" ) @@ -32,16 +33,16 @@ var ( // Config defines the configuration for the various elements of collector or agent. type Config struct { // Receivers is a map of ComponentID to Receivers. - Receivers map[component.ID]component.ReceiverConfig + Receivers map[id.ID]component.ReceiverConfig // Exporters is a map of ComponentID to Exporters. - Exporters map[component.ID]component.ExporterConfig + Exporters map[id.ID]component.ExporterConfig // Processors is a map of ComponentID to Processors. - Processors map[component.ID]component.ProcessorConfig + Processors map[id.ID]component.ProcessorConfig // Extensions is a map of ComponentID to extensions. - Extensions map[component.ID]component.ExtensionConfig + Extensions map[id.ID]component.ExtensionConfig Service ConfigService } @@ -163,10 +164,10 @@ type ConfigService struct { Telemetry telemetry.Config `mapstructure:"telemetry"` // Extensions are the ordered list of extensions configured for the service. - Extensions []component.ID `mapstructure:"extensions"` + Extensions []id.ID `mapstructure:"extensions"` // Pipelines are the set of data pipelines configured for the service. - Pipelines map[component.ID]*ConfigServicePipeline `mapstructure:"pipelines"` + Pipelines map[id.ID]*ConfigServicePipeline `mapstructure:"pipelines"` } type ConfigServicePipeline = config.Pipeline diff --git a/service/config_provider_test.go b/service/config_provider_test.go index 2bbfba4568a..e2299157cb6 100644 --- a/service/config_provider_test.go +++ b/service/config_provider_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" @@ -34,27 +35,27 @@ import ( ) var configNop = &Config{ - Receivers: map[component.ID]component.ReceiverConfig{component.NewID("nop"): componenttest.NewNopReceiverFactory().CreateDefaultConfig()}, - Processors: map[component.ID]component.ProcessorConfig{component.NewID("nop"): componenttest.NewNopProcessorFactory().CreateDefaultConfig()}, - Exporters: map[component.ID]component.ExporterConfig{component.NewID("nop"): componenttest.NewNopExporterFactory().CreateDefaultConfig()}, - Extensions: map[component.ID]component.ExtensionConfig{component.NewID("nop"): componenttest.NewNopExtensionFactory().CreateDefaultConfig()}, + Receivers: map[id.ID]component.ReceiverConfig{id.NewID("nop"): componenttest.NewNopReceiverFactory().CreateDefaultConfig()}, + Processors: map[id.ID]component.ProcessorConfig{id.NewID("nop"): componenttest.NewNopProcessorFactory().CreateDefaultConfig()}, + Exporters: map[id.ID]component.ExporterConfig{id.NewID("nop"): componenttest.NewNopExporterFactory().CreateDefaultConfig()}, + Extensions: map[id.ID]component.ExtensionConfig{id.NewID("nop"): componenttest.NewNopExtensionFactory().CreateDefaultConfig()}, Service: ConfigService{ - Extensions: []component.ID{component.NewID("nop")}, - Pipelines: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + Extensions: []id.ID{id.NewID("nop")}, + Pipelines: map[id.ID]*ConfigServicePipeline{ + id.NewID("traces"): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + id.NewID("metrics"): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + id.NewID("logs"): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, }, Telemetry: telemetry.Config{ diff --git a/service/config_test.go b/service/config_test.go index 3718a71701c..312fad98696 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/service/telemetry" @@ -113,7 +114,7 @@ func TestConfigValidate(t *testing.T) { name: "invalid-extension-reference", cfgFn: func() *Config { cfg := generateConfig() - cfg.Service.Extensions = append(cfg.Service.Extensions, component.NewIDWithName("nop", "2")) + cfg.Service.Extensions = append(cfg.Service.Extensions, id.NewIDWithName("nop", "2")) return cfg }, expected: errors.New(`service references extension "nop/2" which does not exist`), @@ -122,8 +123,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-receiver-reference", cfgFn: func() *Config { cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] - pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "2")) + pipe := cfg.Service.Pipelines[id.NewID("traces")] + pipe.Receivers = append(pipe.Receivers, id.NewIDWithName("nop", "2")) return cfg }, expected: errors.New(`pipeline "traces" references receiver "nop/2" which does not exist`), @@ -132,8 +133,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-processor-reference", cfgFn: func() *Config { cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] - pipe.Processors = append(pipe.Processors, component.NewIDWithName("nop", "2")) + pipe := cfg.Service.Pipelines[id.NewID("traces")] + pipe.Processors = append(pipe.Processors, id.NewIDWithName("nop", "2")) return cfg }, expected: errors.New(`pipeline "traces" references processor "nop/2" which does not exist`), @@ -142,8 +143,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-exporter-reference", cfgFn: func() *Config { cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] - pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "2")) + pipe := cfg.Service.Pipelines[id.NewID("traces")] + pipe.Exporters = append(pipe.Exporters, id.NewIDWithName("nop", "2")) return cfg }, expected: errors.New(`pipeline "traces" references exporter "nop/2" which does not exist`), @@ -152,7 +153,7 @@ func TestConfigValidate(t *testing.T) { name: "missing-pipeline-receivers", cfgFn: func() *Config { cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe := cfg.Service.Pipelines[id.NewID("traces")] pipe.Receivers = nil return cfg }, @@ -162,7 +163,7 @@ func TestConfigValidate(t *testing.T) { name: "missing-pipeline-exporters", cfgFn: func() *Config { cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] + pipe := cfg.Service.Pipelines[id.NewID("traces")] pipe.Exporters = nil return cfg }, @@ -181,8 +182,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-receiver-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Receivers[component.NewID("nop")] = &nopRecvConfig{ - ReceiverSettings: config.NewReceiverSettings(component.NewID("nop")), + cfg.Receivers[id.NewID("nop")] = &nopRecvConfig{ + ReceiverSettings: config.NewReceiverSettings(id.NewID("nop")), validateErr: errInvalidRecvConfig, } return cfg @@ -193,8 +194,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-exporter-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Exporters[component.NewID("nop")] = &nopExpConfig{ - ExporterSettings: config.NewExporterSettings(component.NewID("nop")), + cfg.Exporters[id.NewID("nop")] = &nopExpConfig{ + ExporterSettings: config.NewExporterSettings(id.NewID("nop")), validateErr: errInvalidExpConfig, } return cfg @@ -205,8 +206,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-processor-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Processors[component.NewID("nop")] = &nopProcConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + cfg.Processors[id.NewID("nop")] = &nopProcConfig{ + ProcessorSettings: config.NewProcessorSettings(id.NewID("nop")), validateErr: errInvalidProcConfig, } return cfg @@ -217,8 +218,8 @@ func TestConfigValidate(t *testing.T) { name: "invalid-extension-config", cfgFn: func() *Config { cfg := generateConfig() - cfg.Extensions[component.NewID("nop")] = &nopExtConfig{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("nop")), + cfg.Extensions[id.NewID("nop")] = &nopExtConfig{ + ExtensionSettings: config.NewExtensionSettings(id.NewID("nop")), validateErr: errInvalidExtConfig, } return cfg @@ -229,10 +230,10 @@ func TestConfigValidate(t *testing.T) { name: "invalid-service-pipeline-type", cfgFn: func() *Config { cfg := generateConfig() - cfg.Service.Pipelines[component.NewID("wrongtype")] = &ConfigServicePipeline{ - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + cfg.Service.Pipelines[id.NewID("wrongtype")] = &ConfigServicePipeline{ + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, } return cfg }, @@ -260,24 +261,24 @@ func TestConfigValidate(t *testing.T) { func generateConfig() *Config { return &Config{ - Receivers: map[component.ID]component.ReceiverConfig{ - component.NewID("nop"): &nopRecvConfig{ - ReceiverSettings: config.NewReceiverSettings(component.NewID("nop")), + Receivers: map[id.ID]component.ReceiverConfig{ + id.NewID("nop"): &nopRecvConfig{ + ReceiverSettings: config.NewReceiverSettings(id.NewID("nop")), }, }, - Exporters: map[component.ID]component.ExporterConfig{ - component.NewID("nop"): &nopExpConfig{ - ExporterSettings: config.NewExporterSettings(component.NewID("nop")), + Exporters: map[id.ID]component.ExporterConfig{ + id.NewID("nop"): &nopExpConfig{ + ExporterSettings: config.NewExporterSettings(id.NewID("nop")), }, }, - Processors: map[component.ID]component.ProcessorConfig{ - component.NewID("nop"): &nopProcConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + Processors: map[id.ID]component.ProcessorConfig{ + id.NewID("nop"): &nopProcConfig{ + ProcessorSettings: config.NewProcessorSettings(id.NewID("nop")), }, }, - Extensions: map[component.ID]component.ExtensionConfig{ - component.NewID("nop"): &nopExtConfig{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("nop")), + Extensions: map[id.ID]component.ExtensionConfig{ + id.NewID("nop"): &nopExtConfig{ + ExtensionSettings: config.NewExtensionSettings(id.NewID("nop")), }, }, Service: ConfigService{ @@ -297,12 +298,12 @@ func generateConfig() *Config { Address: ":8080", }, }, - Extensions: []component.ID{component.NewID("nop")}, - Pipelines: map[component.ID]*ConfigServicePipeline{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + Extensions: []id.ID{id.NewID("nop")}, + Pipelines: map[id.ID]*ConfigServicePipeline{ + id.NewID("traces"): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, }, }, diff --git a/service/extensions/config.go b/service/extensions/config.go index da7b851ee70..6985ab3c3b2 100644 --- a/service/extensions/config.go +++ b/service/extensions/config.go @@ -14,7 +14,9 @@ package extensions // import "go.opentelemetry.io/collector/service/extensions" -import "go.opentelemetry.io/collector/component" +import ( + "go.opentelemetry.io/collector/component/id" +) // Config represents the ordered list of extensions configured for the service. -type Config []component.ID +type Config []id.ID diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..f14a95d91e5 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -33,7 +34,19 @@ const zExtensionName = "zextensionname" // Extensions is a map of extensions created from extension configs. type Extensions struct { telemetry component.TelemetrySettings - extMap map[component.ID]component.Extension + extMap map[id.ID]component.Extension +} + +type statusReportingExtension struct { + id id.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() id.ID { + return s.id } // Start starts all extensions. @@ -42,7 +55,8 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { for extID, ext := range bes.extMap { extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -61,6 +75,7 @@ func (bes *Extensions) Shutdown(ctx context.Context) error { return errs } +// Deprecated: [0.65.0] Use Host.RegisterStatusListener() instead. func (bes *Extensions) NotifyPipelineReady() error { for extID, ext := range bes.extMap { if pw, ok := ext.(component.PipelineWatcher); ok { @@ -72,6 +87,7 @@ func (bes *Extensions) NotifyPipelineReady() error { return nil } +// Deprecated: [0.65.0] Use Host.RegisterStatusListener() instead. func (bes *Extensions) NotifyPipelineNotReady() error { // Notify extensions in reverse order. var errs error @@ -83,8 +99,8 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } -func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { - result := make(map[component.ID]component.Extension, len(bes.extMap)) +func (bes *Extensions) GetExtensions() map[id.ID]component.Extension { + result := make(map[id.ID]component.Extension, len(bes.extMap)) for extID, v := range bes.extMap { result[extID] = v } @@ -123,17 +139,17 @@ type Settings struct { BuildInfo component.BuildInfo // Configs is a map of component.ID to component.ExtensionConfig. - Configs map[component.ID]component.ExtensionConfig + Configs map[id.ID]component.ExtensionConfig // Factories maps extension type names in the config to the respective component.ExtensionFactory. - Factories map[component.Type]component.ExtensionFactory + Factories map[id.Type]component.ExtensionFactory } // New creates a new Extensions from Config. func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { exts := &Extensions{ telemetry: set.Telemetry, - extMap: make(map[component.ID]component.Extension), + extMap: make(map[id.ID]component.Extension), } for _, extID := range cfg { extCfg, existsCfg := set.Configs[extID] @@ -168,7 +184,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { return exts, nil } -func extensionLogger(logger *zap.Logger, id component.ID) *zap.Logger { +func extensionLogger(logger *zap.Logger, id id.ID) *zap.Logger { return logger.With( zap.String(components.ZapKindKey, components.ZapKindExtension), zap.String(components.ZapNameKey, id.String())) diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index 05e7622951f..0bc7f0d1c40 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) @@ -38,54 +39,54 @@ func TestBuildExtensions(t *testing.T) { tests := []struct { name string factories component.Factories - extensionsConfigs map[component.ID]component.ExtensionConfig - serviceExtensions []component.ID + extensionsConfigs map[id.ID]component.ExtensionConfig + serviceExtensions []id.ID wantErrMsg string }{ { name: "extension_not_configured", - serviceExtensions: []component.ID{ - component.NewID("myextension"), + serviceExtensions: []id.ID{ + id.NewID("myextension"), }, wantErrMsg: "extension \"myextension\" is not configured", }, { name: "missing_extension_factory", - extensionsConfigs: map[component.ID]component.ExtensionConfig{ - component.NewID("unknown"): nopExtensionConfig, + extensionsConfigs: map[id.ID]component.ExtensionConfig{ + id.NewID("unknown"): nopExtensionConfig, }, - serviceExtensions: []component.ID{ - component.NewID("unknown"), + serviceExtensions: []id.ID{ + id.NewID("unknown"), }, wantErrMsg: "extension factory for type \"unknown\" is not configured", }, { name: "error_on_create_extension", factories: component.Factories{ - Extensions: map[component.Type]component.ExtensionFactory{ + Extensions: map[id.Type]component.ExtensionFactory{ errExtensionFactory.Type(): errExtensionFactory, }, }, - extensionsConfigs: map[component.ID]component.ExtensionConfig{ - component.NewID(errExtensionFactory.Type()): errExtensionConfig, + extensionsConfigs: map[id.ID]component.ExtensionConfig{ + id.NewID(errExtensionFactory.Type()): errExtensionConfig, }, - serviceExtensions: []component.ID{ - component.NewID(errExtensionFactory.Type()), + serviceExtensions: []id.ID{ + id.NewID(errExtensionFactory.Type()), }, wantErrMsg: "failed to create extension \"err\": cannot create \"err\" extension type", }, { name: "bad_factory", factories: component.Factories{ - Extensions: map[component.Type]component.ExtensionFactory{ + Extensions: map[id.Type]component.ExtensionFactory{ badExtensionFactory.Type(): badExtensionFactory, }, }, - extensionsConfigs: map[component.ID]component.ExtensionConfig{ - component.NewID(badExtensionFactory.Type()): badExtensionCfg, + extensionsConfigs: map[id.ID]component.ExtensionConfig{ + id.NewID(badExtensionFactory.Type()): badExtensionCfg, }, - serviceExtensions: []component.ID{ - component.NewID(badExtensionFactory.Type()), + serviceExtensions: []id.ID{ + id.NewID(badExtensionFactory.Type()), }, wantErrMsg: "factory for \"bf\" produced a nil extension", }, @@ -112,7 +113,7 @@ func newBadExtensionFactory() component.ExtensionFactory { return &struct { config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("bf")), + ExtensionSettings: config.NewExtensionSettings(id.NewID("bf")), } }, func(ctx context.Context, set component.ExtensionCreateSettings, extension component.ExtensionConfig) (component.Extension, error) { @@ -129,7 +130,7 @@ func newCreateErrorExtensionFactory() component.ExtensionFactory { return &struct { config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("err")), + ExtensionSettings: config.NewExtensionSettings(id.NewID("err")), } }, func(ctx context.Context, set component.ExtensionCreateSettings, extension component.ExtensionConfig) (component.Extension, error) { diff --git a/service/host.go b/service/host.go index 31be010608d..a8ac54a65cb 100644 --- a/service/host.go +++ b/service/host.go @@ -15,30 +15,48 @@ package service // import "go.opentelemetry.io/collector/service" import ( + "go.uber.org/zap" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" + "go.opentelemetry.io/collector/component/status" "go.opentelemetry.io/collector/service/extensions" + "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/pipelines" ) var _ component.Host = (*serviceHost)(nil) type serviceHost struct { - asyncErrorChannel chan error - factories component.Factories - buildInfo component.BuildInfo + asyncErrorChannel chan error + factories component.Factories + buildInfo component.BuildInfo + telemetry component.TelemetrySettings + statusNotifications *components.Notifications pipelines *pipelines.Pipelines extensions *extensions.Extensions } +func newServiceHost(set *settings, telemetrySettings component.TelemetrySettings) *serviceHost { + return &serviceHost{ + factories: set.Factories, + buildInfo: set.BuildInfo, + asyncErrorChannel: set.AsyncErrorChannel, + statusNotifications: components.NewNotifications(), + telemetry: telemetrySettings, + } +} + // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError). func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } -func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { +func (host *serviceHost) GetFactory(kind component.Kind, componentType id.Type) component.Factory { switch kind { case component.KindReceiver: return host.factories.Receivers[componentType] @@ -52,10 +70,21 @@ func (host *serviceHost) GetFactory(kind component.Kind, componentType component return nil } -func (host *serviceHost) GetExtensions() map[component.ID]component.Extension { +func (host *serviceHost) GetExtensions() map[id.ID]component.Extension { return host.extensions.GetExtensions() } -func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (host *serviceHost) GetExporters() map[component.DataType]map[id.ID]component.Exporter { return host.pipelines.GetExporters() } + +func (host *serviceHost) RegisterStatusListener(options ...status.ListenerOption) component.StatusListenerUnregisterFunc { + return host.statusNotifications.RegisterListener(options...) +} + +// ReportComponentStatus is an implementation of Host.ReportComponentStatus. +func (host *serviceHost) ReportComponentStatus(event *status.ComponentEvent) { + if err := host.statusNotifications.SetComponentStatus(event); err != nil { + host.telemetry.Logger.Warn("Service failed to report status", zap.Error(err)) + } +} diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 78428e4e2f0..5b22f0b505c 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -20,17 +20,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" + "go.opentelemetry.io/collector/component/status" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. type hostWrapper struct { component.Host + component status.Source *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host component.Host, component status.Source, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -41,6 +45,21 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = id.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *status.ComponentEvent) { + // sets default component id + if event.Source() == nil { + event, _ = status.NewComponentEvent( + event.Type(), + status.WithSource(hw.component), + status.WithTimestamp(event.Timestamp()), + status.WithError(event.Err()), + ) + } + hw.Host.ReportComponentStatus(event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 720669bd847..00ef213ee13 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -18,12 +18,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/status" ) func Test_newHostWrapper(t *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) + hw := NewHostWrapper(componenttest.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := status.NewComponentEvent(status.ComponentOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/configunmarshaler/error.go b/service/internal/configunmarshaler/error.go index 3518fbde12a..a2bfd4709f4 100644 --- a/service/internal/configunmarshaler/error.go +++ b/service/internal/configunmarshaler/error.go @@ -18,13 +18,13 @@ import ( "fmt" "reflect" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" ) -func errorUnknownType(component string, id component.ID, factories []reflect.Value) error { +func errorUnknownType(component string, id id.ID, factories []reflect.Value) error { return fmt.Errorf("unknown %s type: %q for id: %q (valid values: %v)", component, id.Type(), id, factories) } -func errorUnmarshalError(component string, id component.ID, err error) error { +func errorUnmarshalError(component string, id id.ID, err error) error { return fmt.Errorf("error reading %s configuration for %q: %w", component, id, err) } diff --git a/service/internal/configunmarshaler/exporters.go b/service/internal/configunmarshaler/exporters.go index e4064dfb028..3fb92185e50 100644 --- a/service/internal/configunmarshaler/exporters.go +++ b/service/internal/configunmarshaler/exporters.go @@ -18,6 +18,7 @@ import ( "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -25,23 +26,23 @@ import ( const exportersKeyName = "exporters" type Exporters struct { - exps map[component.ID]component.ExporterConfig + exps map[id.ID]component.ExporterConfig - factories map[component.Type]component.ExporterFactory + factories map[id.Type]component.ExporterFactory } -func NewExporters(factories map[component.Type]component.ExporterFactory) *Exporters { +func NewExporters(factories map[id.Type]component.ExporterFactory) *Exporters { return &Exporters{factories: factories} } func (e *Exporters) Unmarshal(conf *confmap.Conf) error { - rawExps := make(map[component.ID]map[string]interface{}) + rawExps := make(map[id.ID]map[string]interface{}) if err := conf.Unmarshal(&rawExps, confmap.WithErrorUnused()); err != nil { return err } // Prepare resulting map. - e.exps = make(map[component.ID]component.ExporterConfig) + e.exps = make(map[id.ID]component.ExporterConfig) // Iterate over Exporters and create a config for each. for id, value := range rawExps { @@ -67,6 +68,6 @@ func (e *Exporters) Unmarshal(conf *confmap.Conf) error { return nil } -func (e *Exporters) GetExporters() map[component.ID]component.ExporterConfig { +func (e *Exporters) GetExporters() map[id.ID]component.ExporterConfig { return e.exps } diff --git a/service/internal/configunmarshaler/exporters_test.go b/service/internal/configunmarshaler/exporters_test.go index 5f8ba6caad5..b6650e5d99e 100644 --- a/service/internal/configunmarshaler/exporters_test.go +++ b/service/internal/configunmarshaler/exporters_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -38,9 +39,9 @@ func TestExportersUnmarshal(t *testing.T) { cfgWithName := factories.Exporters["nop"].CreateDefaultConfig() cfgWithName.SetIDName("myexporter") - assert.Equal(t, map[component.ID]component.ExporterConfig{ - component.NewID("nop"): factories.Exporters["nop"].CreateDefaultConfig(), - component.NewIDWithName("nop", "myexporter"): cfgWithName, + assert.Equal(t, map[id.ID]component.ExporterConfig{ + id.NewID("nop"): factories.Exporters["nop"].CreateDefaultConfig(), + id.NewIDWithName("nop", "myexporter"): cfgWithName, }, exps.GetExporters()) } diff --git a/service/internal/configunmarshaler/extensions.go b/service/internal/configunmarshaler/extensions.go index cb3478a7725..4b404e1b0bc 100644 --- a/service/internal/configunmarshaler/extensions.go +++ b/service/internal/configunmarshaler/extensions.go @@ -18,6 +18,7 @@ import ( "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -25,23 +26,23 @@ import ( const extensionsKeyName = "extensions" type Extensions struct { - exts map[component.ID]component.ExtensionConfig + exts map[id.ID]component.ExtensionConfig - factories map[component.Type]component.ExtensionFactory + factories map[id.Type]component.ExtensionFactory } -func NewExtensions(factories map[component.Type]component.ExtensionFactory) *Extensions { +func NewExtensions(factories map[id.Type]component.ExtensionFactory) *Extensions { return &Extensions{factories: factories} } func (e *Extensions) Unmarshal(conf *confmap.Conf) error { - rawExts := make(map[component.ID]map[string]interface{}) + rawExts := make(map[id.ID]map[string]interface{}) if err := conf.Unmarshal(&rawExts, confmap.WithErrorUnused()); err != nil { return err } // Prepare resulting map. - e.exts = make(map[component.ID]component.ExtensionConfig) + e.exts = make(map[id.ID]component.ExtensionConfig) // Iterate over extensions and create a config for each. for id, value := range rawExts { @@ -67,6 +68,6 @@ func (e *Extensions) Unmarshal(conf *confmap.Conf) error { return nil } -func (e *Extensions) GetExtensions() map[component.ID]component.ExtensionConfig { +func (e *Extensions) GetExtensions() map[id.ID]component.ExtensionConfig { return e.exts } diff --git a/service/internal/configunmarshaler/extensions_test.go b/service/internal/configunmarshaler/extensions_test.go index 3f6a7e57eee..a80401661ec 100644 --- a/service/internal/configunmarshaler/extensions_test.go +++ b/service/internal/configunmarshaler/extensions_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -38,9 +39,9 @@ func TestExtensionsUnmarshal(t *testing.T) { cfgWithName := factories.Extensions["nop"].CreateDefaultConfig() cfgWithName.SetIDName("myextension") - assert.Equal(t, map[component.ID]component.ExtensionConfig{ - component.NewID("nop"): factories.Extensions["nop"].CreateDefaultConfig(), - component.NewIDWithName("nop", "myextension"): cfgWithName, + assert.Equal(t, map[id.ID]component.ExtensionConfig{ + id.NewID("nop"): factories.Extensions["nop"].CreateDefaultConfig(), + id.NewIDWithName("nop", "myextension"): cfgWithName, }, exts.GetExtensions()) } diff --git a/service/internal/configunmarshaler/processors.go b/service/internal/configunmarshaler/processors.go index 920876d7781..70158f741c3 100644 --- a/service/internal/configunmarshaler/processors.go +++ b/service/internal/configunmarshaler/processors.go @@ -18,6 +18,7 @@ import ( "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -25,23 +26,23 @@ import ( const processorsKeyName = "processors" type Processors struct { - procs map[component.ID]component.ProcessorConfig + procs map[id.ID]component.ProcessorConfig - factories map[component.Type]component.ProcessorFactory + factories map[id.Type]component.ProcessorFactory } -func NewProcessors(factories map[component.Type]component.ProcessorFactory) *Processors { +func NewProcessors(factories map[id.Type]component.ProcessorFactory) *Processors { return &Processors{factories: factories} } func (p *Processors) Unmarshal(conf *confmap.Conf) error { - rawProcs := make(map[component.ID]map[string]interface{}) + rawProcs := make(map[id.ID]map[string]interface{}) if err := conf.Unmarshal(&rawProcs, confmap.WithErrorUnused()); err != nil { return err } // Prepare resulting map. - p.procs = make(map[component.ID]component.ProcessorConfig) + p.procs = make(map[id.ID]component.ProcessorConfig) // Iterate over processors and create a config for each. for id, value := range rawProcs { // Find processor factory based on "type" that we read from config source. @@ -66,6 +67,6 @@ func (p *Processors) Unmarshal(conf *confmap.Conf) error { return nil } -func (p *Processors) GetProcessors() map[component.ID]component.ProcessorConfig { +func (p *Processors) GetProcessors() map[id.ID]component.ProcessorConfig { return p.procs } diff --git a/service/internal/configunmarshaler/processors_test.go b/service/internal/configunmarshaler/processors_test.go index 58583251053..3681ab1441d 100644 --- a/service/internal/configunmarshaler/processors_test.go +++ b/service/internal/configunmarshaler/processors_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -38,9 +39,9 @@ func TestProcessorsUnmarshal(t *testing.T) { cfgWithName := factories.Processors["nop"].CreateDefaultConfig() cfgWithName.SetIDName("myprocessor") - assert.Equal(t, map[component.ID]component.ProcessorConfig{ - component.NewID("nop"): factories.Processors["nop"].CreateDefaultConfig(), - component.NewIDWithName("nop", "myprocessor"): cfgWithName, + assert.Equal(t, map[id.ID]component.ProcessorConfig{ + id.NewID("nop"): factories.Processors["nop"].CreateDefaultConfig(), + id.NewIDWithName("nop", "myprocessor"): cfgWithName, }, procs.procs) } diff --git a/service/internal/configunmarshaler/receivers.go b/service/internal/configunmarshaler/receivers.go index a6affc72167..73b69a9ce94 100644 --- a/service/internal/configunmarshaler/receivers.go +++ b/service/internal/configunmarshaler/receivers.go @@ -18,6 +18,7 @@ import ( "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -25,23 +26,23 @@ import ( const receiversKeyName = "receivers" type Receivers struct { - recvs map[component.ID]component.ReceiverConfig + recvs map[id.ID]component.ReceiverConfig - factories map[component.Type]component.ReceiverFactory + factories map[id.Type]component.ReceiverFactory } -func NewReceivers(factories map[component.Type]component.ReceiverFactory) *Receivers { +func NewReceivers(factories map[id.Type]component.ReceiverFactory) *Receivers { return &Receivers{factories: factories} } func (r *Receivers) Unmarshal(conf *confmap.Conf) error { - rawRecvs := make(map[component.ID]map[string]interface{}) + rawRecvs := make(map[id.ID]map[string]interface{}) if err := conf.Unmarshal(&rawRecvs, confmap.WithErrorUnused()); err != nil { return err } // Prepare resulting map. - r.recvs = make(map[component.ID]component.ReceiverConfig) + r.recvs = make(map[id.ID]component.ReceiverConfig) // Iterate over input map and create a config for each. for id, value := range rawRecvs { @@ -67,6 +68,6 @@ func (r *Receivers) Unmarshal(conf *confmap.Conf) error { return nil } -func (r *Receivers) GetReceivers() map[component.ID]component.ReceiverConfig { +func (r *Receivers) GetReceivers() map[id.ID]component.ReceiverConfig { return r.recvs } diff --git a/service/internal/configunmarshaler/receivers_test.go b/service/internal/configunmarshaler/receivers_test.go index 41255118ba4..85299e7a063 100644 --- a/service/internal/configunmarshaler/receivers_test.go +++ b/service/internal/configunmarshaler/receivers_test.go @@ -22,6 +22,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" ) @@ -38,9 +39,9 @@ func TestReceiversUnmarshal(t *testing.T) { cfgWithName := factories.Receivers["nop"].CreateDefaultConfig() cfgWithName.SetIDName("myreceiver") - assert.Equal(t, map[component.ID]component.ReceiverConfig{ - component.NewID("nop"): factories.Receivers["nop"].CreateDefaultConfig(), - component.NewIDWithName("nop", "myreceiver"): cfgWithName, + assert.Equal(t, map[id.ID]component.ReceiverConfig{ + id.NewID("nop"): factories.Receivers["nop"].CreateDefaultConfig(), + id.NewIDWithName("nop", "myreceiver"): cfgWithName, }, recvs.GetReceivers()) } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..4b4f59ffa2c 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/service/internal/components" @@ -43,7 +44,7 @@ type baseConsumer interface { } type builtComponent struct { - id component.ID + id id.ID comp component.Component } @@ -59,10 +60,23 @@ type builtPipeline struct { type Pipelines struct { telemetry component.TelemetrySettings - allReceivers map[component.DataType]map[component.ID]component.Receiver - allExporters map[component.DataType]map[component.ID]component.Exporter + allReceivers map[component.DataType]map[id.ID]component.Receiver + allExporters map[component.DataType]map[id.ID]component.Exporter - pipelines map[component.ID]*builtPipeline + pipelines map[id.ID]*builtPipeline +} + +type statusReportingComponent struct { + kind component.Kind + id id.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() id.ID { + return s.id } // StartAll starts all pipelines. @@ -76,7 +90,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for expID, exp := range expByID { expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindExporter, expID} + if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil { return err } expLogger.Info("Exporter started.") @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting processors...") for pipelineID, bp := range bps.pipelines { for i := len(bp.processors) - 1; i >= 0; i-- { - procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + processor := bp.processors[i] + procID := processor.id + procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID) procLogger.Info("Processor is starting...") - if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindProcessor, procID} + if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil { return err } procLogger.Info("Processor started.") @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for recvID, recv := range recvByID { recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) recvLogger.Info("Receiver is starting...") - if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindReceiver, recvID} + if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil { return err } recvLogger.Info("Receiver started.") @@ -139,12 +158,12 @@ func (bps *Pipelines) ShutdownAll(ctx context.Context) error { return errs } -func (bps *Pipelines) GetExporters() map[component.DataType]map[component.ID]component.Exporter { - exportersMap := make(map[component.DataType]map[component.ID]component.Exporter) +func (bps *Pipelines) GetExporters() map[component.DataType]map[id.ID]component.Exporter { + exportersMap := make(map[component.DataType]map[id.ID]component.Exporter) - exportersMap[component.DataTypeTraces] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeTraces])) - exportersMap[component.DataTypeMetrics] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeMetrics])) - exportersMap[component.DataTypeLogs] = make(map[component.ID]component.Exporter, len(bps.allExporters[component.DataTypeLogs])) + exportersMap[component.DataTypeTraces] = make(map[id.ID]component.Exporter, len(bps.allExporters[component.DataTypeTraces])) + exportersMap[component.DataTypeMetrics] = make(map[id.ID]component.Exporter, len(bps.allExporters[component.DataTypeMetrics])) + exportersMap[component.DataTypeLogs] = make(map[id.ID]component.Exporter, len(bps.allExporters[component.DataTypeLogs])) for dt, expByID := range bps.allExporters { for expID, exp := range expByID { @@ -183,37 +202,37 @@ type Settings struct { BuildInfo component.BuildInfo // ReceiverFactories maps receiver type names in the config to the respective component.ReceiverFactory. - ReceiverFactories map[component.Type]component.ReceiverFactory + ReceiverFactories map[id.Type]component.ReceiverFactory // ReceiverConfigs is a map of component.ID to component.ReceiverConfig. - ReceiverConfigs map[component.ID]component.ReceiverConfig + ReceiverConfigs map[id.ID]component.ReceiverConfig // ProcessorFactories maps processor type names in the config to the respective component.ProcessorFactory. - ProcessorFactories map[component.Type]component.ProcessorFactory + ProcessorFactories map[id.Type]component.ProcessorFactory // ProcessorConfigs is a map of component.ID to component.ProcessorConfig. - ProcessorConfigs map[component.ID]component.ProcessorConfig + ProcessorConfigs map[id.ID]component.ProcessorConfig // ExporterFactories maps exporter type names in the config to the respective component.ExporterFactory. - ExporterFactories map[component.Type]component.ExporterFactory + ExporterFactories map[id.Type]component.ExporterFactory // ExporterConfigs is a map of component.ID to component.ExporterConfig. - ExporterConfigs map[component.ID]component.ExporterConfig + ExporterConfigs map[id.ID]component.ExporterConfig // PipelineConfigs is a map of component.ID to config.Pipeline. - PipelineConfigs map[component.ID]*config.Pipeline + PipelineConfigs map[id.ID]*config.Pipeline } // Build builds all pipelines from config. func Build(ctx context.Context, set Settings) (*Pipelines, error) { exps := &Pipelines{ telemetry: set.Telemetry, - allReceivers: make(map[component.DataType]map[component.ID]component.Receiver), - allExporters: make(map[component.DataType]map[component.ID]component.Exporter), - pipelines: make(map[component.ID]*builtPipeline, len(set.PipelineConfigs)), + allReceivers: make(map[component.DataType]map[id.ID]component.Receiver), + allExporters: make(map[component.DataType]map[id.ID]component.Exporter), + pipelines: make(map[id.ID]*builtPipeline, len(set.PipelineConfigs)), } - receiversConsumers := make(map[component.DataType]map[component.ID][]baseConsumer) + receiversConsumers := make(map[component.DataType]map[id.ID][]baseConsumer) // Iterate over all pipelines, and create exporters, then processors. // Receivers cannot be created since we need to know all consumers, a.k.a. we need all pipelines build up to the @@ -221,7 +240,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { for pipelineID, pipeline := range set.PipelineConfigs { // The data type of the pipeline defines what data type each exporter is expected to receive. if _, ok := exps.allExporters[pipelineID.Type()]; !ok { - exps.allExporters[pipelineID.Type()] = make(map[component.ID]component.Exporter) + exps.allExporters[pipelineID.Type()] = make(map[id.ID]component.Exporter) } expByID := exps.allExporters[pipelineID.Type()] @@ -293,7 +312,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { // The data type of the pipeline defines what data type each exporter is expected to receive. if _, ok := receiversConsumers[pipelineID.Type()]; !ok { - receiversConsumers[pipelineID.Type()] = make(map[component.ID][]baseConsumer) + receiversConsumers[pipelineID.Type()] = make(map[id.ID][]baseConsumer) } recvConsByID := receiversConsumers[pipelineID.Type()] // Iterate over all Receivers for this pipeline and just append the lastConsumer as a consumer for the receiver. @@ -306,7 +325,7 @@ func Build(ctx context.Context, set Settings) (*Pipelines, error) { for pipelineID, pipeline := range set.PipelineConfigs { // The data type of the pipeline defines what data type each exporter is expected to receive. if _, ok := exps.allReceivers[pipelineID.Type()]; !ok { - exps.allReceivers[pipelineID.Type()] = make(map[component.ID]component.Receiver) + exps.allReceivers[pipelineID.Type()] = make(map[id.ID]component.Receiver) } recvByID := exps.allReceivers[pipelineID.Type()] bp := exps.pipelines[pipelineID] @@ -335,10 +354,10 @@ func buildExporter( ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, - cfgs map[component.ID]component.ExporterConfig, - factories map[component.Type]component.ExporterFactory, - id component.ID, - pipelineID component.ID, + cfgs map[id.ID]component.ExporterConfig, + factories map[id.Type]component.ExporterFactory, + id id.ID, + pipelineID id.ID, ) (component.Exporter, error) { cfg, existsCfg := cfgs[id] if !existsCfg { @@ -365,7 +384,7 @@ func buildExporter( return exp, nil } -func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id component.ID, pipelineID component.ID, factory component.ExporterFactory) (component.Exporter, error) { +func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg component.ExporterConfig, id id.ID, pipelineID id.ID, factory component.ExporterFactory) (component.Exporter, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesExporter(ctx, set, cfg) @@ -406,7 +425,7 @@ func buildFanOutExportersLogsConsumer(exporters []builtComponent) consumer.Logs return fanoutconsumer.NewLogs(consumers) } -func exporterLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger { +func exporterLogger(logger *zap.Logger, id id.ID, dt component.DataType) *zap.Logger { return logger.With( zap.String(components.ZapKindKey, components.ZapKindExporter), zap.String(components.ZapDataTypeKey, string(dt)), @@ -428,10 +447,10 @@ func getExporterStabilityLevel(factory component.ExporterFactory, dt component.D func buildProcessor(ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, - cfgs map[component.ID]component.ProcessorConfig, - factories map[component.Type]component.ProcessorFactory, - id component.ID, - pipelineID component.ID, + cfgs map[id.ID]component.ProcessorConfig, + factories map[id.Type]component.ProcessorFactory, + id id.ID, + pipelineID id.ID, next baseConsumer, ) (component.Processor, error) { procCfg, existsCfg := cfgs[id] @@ -458,7 +477,7 @@ func buildProcessor(ctx context.Context, return proc, nil } -func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id component.ID, pipelineID component.ID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) { +func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg component.ProcessorConfig, id id.ID, pipelineID id.ID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) { switch pipelineID.Type() { case component.DataTypeTraces: return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) @@ -472,7 +491,7 @@ func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) } -func processorLogger(logger *zap.Logger, procID component.ID, pipelineID component.ID) *zap.Logger { +func processorLogger(logger *zap.Logger, procID id.ID, pipelineID id.ID) *zap.Logger { return logger.With( zap.String(components.ZapKindKey, components.ZapKindProcessor), zap.String(components.ZapNameKey, procID.String()), @@ -494,10 +513,10 @@ func getProcessorStabilityLevel(factory component.ProcessorFactory, dt component func buildReceiver(ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, - cfgs map[component.ID]component.ReceiverConfig, - factories map[component.Type]component.ReceiverFactory, - id component.ID, - pipelineID component.ID, + cfgs map[id.ID]component.ReceiverConfig, + factories map[id.Type]component.ReceiverFactory, + id id.ID, + pipelineID id.ID, nexts []baseConsumer, ) (component.Receiver, error) { cfg, existsCfg := cfgs[id] @@ -525,7 +544,7 @@ func buildReceiver(ctx context.Context, return recv, nil } -func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id component.ID, pipelineID component.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) { +func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg component.ReceiverConfig, id id.ID, pipelineID id.ID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) { switch pipelineID.Type() { case component.DataTypeTraces: var consumers []consumer.Traces @@ -549,7 +568,7 @@ func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, c return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) } -func receiverLogger(logger *zap.Logger, id component.ID, dt component.DataType) *zap.Logger { +func receiverLogger(logger *zap.Logger, id id.ID, dt component.DataType) *zap.Logger { return logger.With( zap.String(components.ZapKindKey, components.ZapKindReceiver), zap.String(components.ZapNameKey, id.String()), diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 2bc2ed63096..7e76d47ea92 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" @@ -38,48 +39,48 @@ import ( func TestBuild(t *testing.T) { tests := []struct { name string - receiverIDs []component.ID - processorIDs []component.ID - exporterIDs []component.ID + receiverIDs []id.ID + processorIDs []id.ID + exporterIDs []id.ID expectedRequests int }{ { name: "pipelines_simple.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver")}, - processorIDs: []component.ID{component.NewID("exampleprocessor")}, - exporterIDs: []component.ID{component.NewID("exampleexporter")}, + receiverIDs: []id.ID{id.NewID("examplereceiver")}, + processorIDs: []id.ID{id.NewID("exampleprocessor")}, + exporterIDs: []id.ID{id.NewID("exampleexporter")}, expectedRequests: 1, }, { name: "pipelines_simple_multi_proc.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver")}, - processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewID("exampleprocessor")}, - exporterIDs: []component.ID{component.NewID("exampleexporter")}, + receiverIDs: []id.ID{id.NewID("examplereceiver")}, + processorIDs: []id.ID{id.NewID("exampleprocessor"), id.NewID("exampleprocessor")}, + exporterIDs: []id.ID{id.NewID("exampleexporter")}, expectedRequests: 1, }, { name: "pipelines_simple_no_proc.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver")}, - exporterIDs: []component.ID{component.NewID("exampleexporter")}, + receiverIDs: []id.ID{id.NewID("examplereceiver")}, + exporterIDs: []id.ID{id.NewID("exampleexporter")}, expectedRequests: 1, }, { name: "pipelines_multi.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver"), component.NewIDWithName("examplereceiver", "1")}, - processorIDs: []component.ID{component.NewID("exampleprocessor"), component.NewIDWithName("exampleprocessor", "1")}, - exporterIDs: []component.ID{component.NewID("exampleexporter"), component.NewIDWithName("exampleexporter", "1")}, + receiverIDs: []id.ID{id.NewID("examplereceiver"), id.NewIDWithName("examplereceiver", "1")}, + processorIDs: []id.ID{id.NewID("exampleprocessor"), id.NewIDWithName("exampleprocessor", "1")}, + exporterIDs: []id.ID{id.NewID("exampleexporter"), id.NewIDWithName("exampleexporter", "1")}, expectedRequests: 2, }, { name: "pipelines_multi_no_proc.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver"), component.NewIDWithName("examplereceiver", "1")}, - exporterIDs: []component.ID{component.NewID("exampleexporter"), component.NewIDWithName("exampleexporter", "1")}, + receiverIDs: []id.ID{id.NewID("examplereceiver"), id.NewIDWithName("examplereceiver", "1")}, + exporterIDs: []id.ID{id.NewID("exampleexporter"), id.NewIDWithName("exampleexporter", "1")}, expectedRequests: 2, }, { name: "pipelines_exporter_multi_pipeline.yaml", - receiverIDs: []component.ID{component.NewID("examplereceiver")}, - exporterIDs: []component.ID{component.NewID("exampleexporter")}, + receiverIDs: []id.ID{id.NewID("examplereceiver")}, + exporterIDs: []id.ID{id.NewID("exampleexporter")}, expectedRequests: 2, }, } @@ -116,17 +117,17 @@ func TestBuild(t *testing.T) { // Verify processors created in the given order and started. for i, procID := range test.processorIDs { - traceProcessor := pipelines.pipelines[component.NewID(component.DataTypeTraces)].processors[i] + traceProcessor := pipelines.pipelines[id.NewID(component.DataTypeTraces)].processors[i] assert.Equal(t, procID, traceProcessor.id) assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Started) // Validate metrics. - metricsProcessor := pipelines.pipelines[component.NewID(component.DataTypeMetrics)].processors[i] + metricsProcessor := pipelines.pipelines[id.NewID(component.DataTypeMetrics)].processors[i] assert.Equal(t, procID, metricsProcessor.id) assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Started) // Validate logs. - logsProcessor := pipelines.pipelines[component.NewID(component.DataTypeLogs)].processors[i] + logsProcessor := pipelines.pipelines[id.NewID(component.DataTypeLogs)].processors[i] assert.Equal(t, procID, logsProcessor.id) assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Started) } @@ -165,15 +166,15 @@ func TestBuild(t *testing.T) { // Verify processors shutdown. for i := range test.processorIDs { - traceProcessor := pipelines.pipelines[component.NewID(component.DataTypeTraces)].processors[i] + traceProcessor := pipelines.pipelines[id.NewID(component.DataTypeTraces)].processors[i] assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) // Validate metrics. - metricsProcessor := pipelines.pipelines[component.NewID(component.DataTypeMetrics)].processors[i] + metricsProcessor := pipelines.pipelines[id.NewID(component.DataTypeMetrics)].processors[i] assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) // Validate logs. - logsProcessor := pipelines.pipelines[component.NewID(component.DataTypeLogs)].processors[i] + logsProcessor := pipelines.pipelines[id.NewID(component.DataTypeLogs)].processors[i] assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) } @@ -232,17 +233,17 @@ func TestBuildErrors(t *testing.T) { for _, test := range tests { t.Run(test.configFile, func(t *testing.T) { factories := component.Factories{ - Receivers: map[component.Type]component.ReceiverFactory{ + Receivers: map[id.Type]component.ReceiverFactory{ nopReceiverFactory.Type(): nopReceiverFactory, "unknown": nopReceiverFactory, badReceiverFactory.Type(): badReceiverFactory, }, - Processors: map[component.Type]component.ProcessorFactory{ + Processors: map[id.Type]component.ProcessorFactory{ nopProcessorFactory.Type(): nopProcessorFactory, "unknown": nopProcessorFactory, badProcessorFactory.Type(): badProcessorFactory, }, - Exporters: map[component.Type]component.ExporterFactory{ + Exporters: map[id.Type]component.ExporterFactory{ nopExporterFactory.Type(): nopExporterFactory, "unknown": nopExporterFactory, badExporterFactory.Type(): badExporterFactory, @@ -274,39 +275,39 @@ func TestFailToStartAndShutdown(t *testing.T) { set := Settings{ Telemetry: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - ReceiverFactories: map[component.Type]component.ReceiverFactory{ + ReceiverFactories: map[id.Type]component.ReceiverFactory{ nopReceiverFactory.Type(): nopReceiverFactory, errReceiverFactory.Type(): errReceiverFactory, }, - ReceiverConfigs: map[component.ID]component.ReceiverConfig{ - component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), - component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + ReceiverConfigs: map[id.ID]component.ReceiverConfig{ + id.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + id.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), }, - ProcessorFactories: map[component.Type]component.ProcessorFactory{ + ProcessorFactories: map[id.Type]component.ProcessorFactory{ nopProcessorFactory.Type(): nopProcessorFactory, errProcessorFactory.Type(): errProcessorFactory, }, - ProcessorConfigs: map[component.ID]component.ProcessorConfig{ - component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), - component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), + ProcessorConfigs: map[id.ID]component.ProcessorConfig{ + id.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), + id.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), }, - ExporterFactories: map[component.Type]component.ExporterFactory{ + ExporterFactories: map[id.Type]component.ExporterFactory{ nopExporterFactory.Type(): nopExporterFactory, errExporterFactory.Type(): errExporterFactory, }, - ExporterConfigs: map[component.ID]component.ExporterConfig{ - component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), - component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), + ExporterConfigs: map[id.ID]component.ExporterConfig{ + id.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), + id.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), }, } for _, dt := range []component.DataType{component.DataTypeTraces, component.DataTypeMetrics, component.DataTypeLogs} { t.Run(string(dt)+"/receiver", func(t *testing.T) { - set.PipelineConfigs = map[component.ID]*config.Pipeline{ - component.NewID(dt): { - Receivers: []component.ID{component.NewID("nop"), component.NewID("err")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + set.PipelineConfigs = map[id.ID]*config.Pipeline{ + id.NewID(dt): { + Receivers: []id.ID{id.NewID("nop"), id.NewID("err")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, } pipelines, err := Build(context.Background(), set) @@ -316,11 +317,11 @@ func TestFailToStartAndShutdown(t *testing.T) { }) t.Run(string(dt)+"/processor", func(t *testing.T) { - set.PipelineConfigs = map[component.ID]*config.Pipeline{ - component.NewID(dt): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop"), component.NewID("err")}, - Exporters: []component.ID{component.NewID("nop")}, + set.PipelineConfigs = map[id.ID]*config.Pipeline{ + id.NewID(dt): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop"), id.NewID("err")}, + Exporters: []id.ID{id.NewID("nop")}, }, } pipelines, err := Build(context.Background(), set) @@ -330,11 +331,11 @@ func TestFailToStartAndShutdown(t *testing.T) { }) t.Run(string(dt)+"/exporter", func(t *testing.T) { - set.PipelineConfigs = map[component.ID]*config.Pipeline{ - component.NewID(dt): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop"), component.NewID("err")}, + set.PipelineConfigs = map[id.ID]*config.Pipeline{ + id.NewID(dt): { + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop"), id.NewID("err")}, }, } pipelines, err := Build(context.Background(), set) @@ -350,7 +351,7 @@ func newBadReceiverFactory() component.ReceiverFactory { return &struct { config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ReceiverSettings: config.NewReceiverSettings(component.NewID("bf")), + ReceiverSettings: config.NewReceiverSettings(id.NewID("bf")), } }) } @@ -360,7 +361,7 @@ func newBadProcessorFactory() component.ProcessorFactory { return &struct { config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("bf")), + ProcessorSettings: config.NewProcessorSettings(id.NewID("bf")), } }) } @@ -370,7 +371,7 @@ func newBadExporterFactory() component.ExporterFactory { return &struct { config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ExporterSettings: config.NewExporterSettings(component.NewID("bf")), + ExporterSettings: config.NewExporterSettings(id.NewID("bf")), } }) } @@ -380,7 +381,7 @@ func newErrReceiverFactory() component.ReceiverFactory { return &struct { config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ReceiverSettings: config.NewReceiverSettings(component.NewID("bf")), + ReceiverSettings: config.NewReceiverSettings(id.NewID("bf")), } }, component.WithTracesReceiver(func(context.Context, component.ReceiverCreateSettings, component.ReceiverConfig, consumer.Traces) (component.TracesReceiver, error) { @@ -400,7 +401,7 @@ func newErrProcessorFactory() component.ProcessorFactory { return &struct { config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("bf")), + ProcessorSettings: config.NewProcessorSettings(id.NewID("bf")), } }, component.WithTracesProcessor(func(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { @@ -420,7 +421,7 @@ func newErrExporterFactory() component.ExporterFactory { return &struct { config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct }{ - ExporterSettings: config.NewExporterSettings(component.NewID("bf")), + ExporterSettings: config.NewExporterSettings(id.NewID("bf")), } }, component.WithTracesExporter(func(context.Context, component.ExporterCreateSettings, component.ExporterConfig) (component.TracesExporter, error) { @@ -474,7 +475,7 @@ type configSettings struct { } type serviceSettings struct { - Pipelines map[component.ID]*config.Pipeline `mapstructure:"pipelines"` + Pipelines map[id.ID]*config.Pipeline `mapstructure:"pipelines"` } func loadConfig(t *testing.T, fileName string, factories component.Factories) *configSettings { diff --git a/service/internal/testcomponents/example_exporter.go b/service/internal/testcomponents/example_exporter.go index 722dddc9b10..fe9809f9aa0 100644 --- a/service/internal/testcomponents/example_exporter.go +++ b/service/internal/testcomponents/example_exporter.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" @@ -46,7 +47,7 @@ var ExampleExporterFactory = component.NewExporterFactory( func createExporterDefaultConfig() component.ExporterConfig { return &ExampleExporterConfig{ - ExporterSettings: config.NewExporterSettings(component.NewID(typeStr)), + ExporterSettings: config.NewExporterSettings(id.NewID(typeStr)), } } diff --git a/service/internal/testcomponents/example_factories.go b/service/internal/testcomponents/example_factories.go index ac5f21f433f..f77acb0145f 100644 --- a/service/internal/testcomponents/example_factories.go +++ b/service/internal/testcomponents/example_factories.go @@ -16,18 +16,19 @@ package testcomponents // import "go.opentelemetry.io/collector/service/internal import ( "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" ) // ExampleComponents registers example factories. This is only used by tests. func ExampleComponents() (component.Factories, error) { return component.Factories{ - Receivers: map[component.Type]component.ReceiverFactory{ + Receivers: map[id.Type]component.ReceiverFactory{ ExampleReceiverFactory.Type(): ExampleReceiverFactory, }, - Processors: map[component.Type]component.ProcessorFactory{ + Processors: map[id.Type]component.ProcessorFactory{ ExampleProcessorFactory.Type(): ExampleProcessorFactory, }, - Exporters: map[component.Type]component.ExporterFactory{ + Exporters: map[id.Type]component.ExporterFactory{ ExampleExporterFactory.Type(): ExampleExporterFactory, }, }, nil diff --git a/service/internal/testcomponents/example_processor.go b/service/internal/testcomponents/example_processor.go index 8ea683c70c0..b8d3c32bddf 100644 --- a/service/internal/testcomponents/example_processor.go +++ b/service/internal/testcomponents/example_processor.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" ) @@ -40,7 +41,7 @@ var ExampleProcessorFactory = component.NewProcessorFactory( // CreateDefaultConfig creates the default configuration for the Processor. func createDefaultConfig() component.ProcessorConfig { return &ExampleProcessorConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID(procType)), + ProcessorSettings: config.NewProcessorSettings(id.NewID(procType)), } } diff --git a/service/internal/testcomponents/example_receiver.go b/service/internal/testcomponents/example_receiver.go index 4351f7b360d..6f6998f8c78 100644 --- a/service/internal/testcomponents/example_receiver.go +++ b/service/internal/testcomponents/example_receiver.go @@ -18,11 +18,12 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" ) -const receiverType = component.Type("examplereceiver") +const receiverType = id.Type("examplereceiver") // ExampleReceiverConfig config for ExampleReceiver. type ExampleReceiverConfig struct { @@ -39,7 +40,7 @@ var ExampleReceiverFactory = component.NewReceiverFactory( func createReceiverDefaultConfig() component.ReceiverConfig { return &ExampleReceiverConfig{ - ReceiverSettings: config.NewReceiverSettings(component.NewID(receiverType)), + ReceiverSettings: config.NewReceiverSettings(id.NewID(receiverType)), } } diff --git a/service/service.go b/service/service.go index 261ec7377e8..b1ca8e88c5f 100644 --- a/service/service.go +++ b/service/service.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/status" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/pipelines" @@ -43,13 +44,8 @@ type service struct { func newService(set *settings) (*service, error) { srv := &service{ - buildInfo: set.BuildInfo, - config: set.Config, - host: &serviceHost{ - factories: set.Factories, - buildInfo: set.BuildInfo, - asyncErrorChannel: set.AsyncErrorChannel, - }, + buildInfo: set.BuildInfo, + config: set.Config, telemetryInitializer: set.telemetry, } @@ -71,6 +67,8 @@ func newService(set *settings) (*service, error) { } srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp + srv.host = newServiceHost(set, srv.telemetrySettings) + // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(set); err != nil { // If pipeline initialization fails then shut down the telemetry server @@ -99,6 +97,11 @@ func (srv *service) Start(ctx context.Context) error { return fmt.Errorf("cannot start pipelines: %w", err) } + if err := srv.host.statusNotifications.SetPipelineStatus(status.PipelineReady); err != nil { + return fmt.Errorf("SetPipelineStatus failed: %w", err) + } + + // TODO: remove usage of NotifyPipelineReady() and replace by the new status notifications. if err := srv.host.extensions.NotifyPipelineReady(); err != nil { return err } @@ -114,6 +117,11 @@ func (srv *service) Shutdown(ctx context.Context) error { // Begin shutdown sequence. srv.telemetrySettings.Logger.Info("Starting shutdown...") + if err := srv.host.statusNotifications.SetPipelineStatus(status.PipelineNotReady); err != nil { + errs = multierr.Append(errs, fmt.Errorf("SetPipelineStatus failed: %w", err)) + } + + // TODO: remove usage of NotifyPipelineNotReady() and replace by the new status notifications. if err := srv.host.extensions.NotifyPipelineNotReady(); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } @@ -132,6 +140,8 @@ func (srv *service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err)) } + srv.host.statusNotifications.Shutdown() + // TODO: Shutdown MeterProvider. return errs } diff --git a/service/service_test.go b/service/service_test.go index c65d322a80b..3f4b65708cf 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -16,6 +16,7 @@ package service import ( "context" + "errors" "fmt" "net/http" "path/filepath" @@ -26,6 +27,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" + "go.opentelemetry.io/collector/component/status" "go.opentelemetry.io/collector/featuregate" ) @@ -68,7 +71,7 @@ func TestServiceGetExtensions(t *testing.T) { extMap := srv.host.GetExtensions() assert.Len(t, extMap, 1) - assert.Contains(t, extMap, component.NewID("nop")) + assert.Contains(t, extMap, id.NewID("nop")) } func TestServiceGetExporters(t *testing.T) { @@ -84,11 +87,11 @@ func TestServiceGetExporters(t *testing.T) { expMap := srv.host.GetExporters() assert.Len(t, expMap, 3) assert.Len(t, expMap[component.DataTypeTraces], 1) - assert.Contains(t, expMap[component.DataTypeTraces], component.NewID("nop")) + assert.Contains(t, expMap[component.DataTypeTraces], id.NewID("nop")) assert.Len(t, expMap[component.DataTypeMetrics], 1) - assert.Contains(t, expMap[component.DataTypeMetrics], component.NewID("nop")) + assert.Contains(t, expMap[component.DataTypeMetrics], id.NewID("nop")) assert.Len(t, expMap[component.DataTypeLogs], 1) - assert.Contains(t, expMap[component.DataTypeLogs], component.NewID("nop")) + assert.Contains(t, expMap[component.DataTypeLogs], id.NewID("nop")) } // TestServiceTelemetryCleanupOnError tests that if newService errors due to an invalid config telemetry is cleaned up @@ -223,3 +226,92 @@ func createExampleService(t *testing.T, factories component.Factories) *service }) return srv } + +func TestService_ReportStatus(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + srv := createExampleService(t, factories) + host := srv.host + + var readyHandlerCalled, notReadyHandlerCalled, statusEventHandlerCalled bool + + var statusEvent *status.ComponentEvent + + statusEventHandler := func(ev *status.ComponentEvent) error { + statusEvent = ev + statusEventHandlerCalled = true + return nil + } + + pipelineStatusHandler := func(s status.PipelineReadiness) error { + if s == status.PipelineReady { + readyHandlerCalled = true + } else { + notReadyHandlerCalled = true + } + return nil + } + + unregister := host.RegisterStatusListener( + status.WithComponentEventHandler(statusEventHandler), + status.WithPipelineStatusHandler(pipelineStatusHandler), + ) + + assert.False(t, statusEventHandlerCalled) + assert.False(t, readyHandlerCalled) + assert.False(t, notReadyHandlerCalled) + + assert.NoError(t, srv.Start(context.Background())) + assert.True(t, readyHandlerCalled) + + t.Cleanup(func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }) + + expectedError := errors.New("an error") + + ev, err := status.NewComponentEvent( + status.ComponentError, + status.WithError(expectedError), + ) + assert.NoError(t, err) + host.ReportComponentStatus(ev) + + assert.True(t, statusEventHandlerCalled) + assert.Equal(t, expectedError, statusEvent.Err()) + assert.NotNil(t, statusEvent.Timestamp) + assert.NoError(t, unregister()) +} + +func TestService_ReportStatusWithBuggyHandler(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + srv := createExampleService(t, factories) + host := srv.host + + var statusEventHandlerCalled bool + + statusEventHandler := func(ev *status.ComponentEvent) error { + statusEventHandlerCalled = true + return errors.New("an error") + } + + unregister := host.RegisterStatusListener( + status.WithComponentEventHandler(statusEventHandler), + ) + + assert.False(t, statusEventHandlerCalled) + assert.NoError(t, srv.Start(context.Background())) + t.Cleanup(func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }) + + // SetComponentStatus handles errors in handlers (by logging) and does not surface them back to callers + ev, err := status.NewComponentEvent( + status.ComponentOK, + ) + assert.NoError(t, err) + host.ReportComponentStatus(ev) + assert.True(t, statusEventHandlerCalled) + assert.NoError(t, unregister()) +} diff --git a/service/servicetest/configprovider_test.go b/service/servicetest/configprovider_test.go index e022eb76c14..b8d4dd87e72 100644 --- a/service/servicetest/configprovider_test.go +++ b/service/servicetest/configprovider_test.go @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/config" ) @@ -35,35 +35,35 @@ func TestLoadConfig(t *testing.T) { // Verify extensions. require.Len(t, cfg.Extensions, 2) - assert.Contains(t, cfg.Extensions, component.NewID("nop")) - assert.Contains(t, cfg.Extensions, component.NewIDWithName("nop", "myextension")) + assert.Contains(t, cfg.Extensions, id.NewID("nop")) + assert.Contains(t, cfg.Extensions, id.NewIDWithName("nop", "myextension")) // Verify receivers require.Len(t, cfg.Receivers, 2) - assert.Contains(t, cfg.Receivers, component.NewID("nop")) - assert.Contains(t, cfg.Receivers, component.NewIDWithName("nop", "myreceiver")) + assert.Contains(t, cfg.Receivers, id.NewID("nop")) + assert.Contains(t, cfg.Receivers, id.NewIDWithName("nop", "myreceiver")) // Verify exporters assert.Len(t, cfg.Exporters, 2) - assert.Contains(t, cfg.Exporters, component.NewID("nop")) - assert.Contains(t, cfg.Exporters, component.NewIDWithName("nop", "myexporter")) + assert.Contains(t, cfg.Exporters, id.NewID("nop")) + assert.Contains(t, cfg.Exporters, id.NewIDWithName("nop", "myexporter")) // Verify procs assert.Len(t, cfg.Processors, 2) - assert.Contains(t, cfg.Processors, component.NewID("nop")) - assert.Contains(t, cfg.Processors, component.NewIDWithName("nop", "myprocessor")) + assert.Contains(t, cfg.Processors, id.NewID("nop")) + assert.Contains(t, cfg.Processors, id.NewIDWithName("nop", "myprocessor")) // Verify service. require.Len(t, cfg.Service.Extensions, 1) - assert.Contains(t, cfg.Service.Extensions, component.NewID("nop")) + assert.Contains(t, cfg.Service.Extensions, id.NewID("nop")) require.Len(t, cfg.Service.Pipelines, 1) assert.Equal(t, &config.Pipeline{ - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, + Receivers: []id.ID{id.NewID("nop")}, + Processors: []id.ID{id.NewID("nop")}, + Exporters: []id.ID{id.NewID("nop")}, }, - cfg.Service.Pipelines[component.NewID("traces")], + cfg.Service.Pipelines[id.NewID("traces")], "Did not load pipeline config correctly") } diff --git a/service/unmarshaler_test.go b/service/unmarshaler_test.go index e6ea1179c42..9f7e5b2b318 100644 --- a/service/unmarshaler_test.go +++ b/service/unmarshaler_test.go @@ -21,8 +21,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component/id" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/service/telemetry" ) @@ -141,7 +141,7 @@ func TestConfigServicePipelineUnmarshalError(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - pipelines := make(map[component.ID]ConfigServicePipeline) + pipelines := make(map[id.ID]ConfigServicePipeline) err := tt.conf.Unmarshal(&pipelines, confmap.WithErrorUnused()) require.Error(t, err) assert.Contains(t, err.Error(), tt.expectError)