diff --git a/.chloggen/extension-effective-config.yaml b/.chloggen/extension-effective-config.yaml new file mode 100755 index 00000000000..5bcc93e5529 --- /dev/null +++ b/.chloggen/extension-effective-config.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow extensions to be notified of the effective config + +# One or more tracking issues or pull requests related to the change +issues: [6596] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/extension/extension.go b/extension/extension.go index 391f23a6674..693e9559122 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -43,6 +43,10 @@ type PipelineWatcher interface { NotReady() error } +type ConfigWatcher interface { + ConfigResolved(data map[string]interface{}) error +} + // CreateSettings is passed to Factory.Create(...) function. type CreateSettings = component.ExtensionCreateSettings //nolint:staticcheck diff --git a/otelcol/collector.go b/otelcol/collector.go index f695637da13..d6a8d4a5e1c 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -181,6 +181,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return multierr.Combine(err, col.service.Shutdown(ctx)) } col.setCollectorState(StateRunning) + + if err = col.service.ConfigResolved(ctx, cfg.ToStringMap()); err != nil { + return multierr.Combine(err, col.service.Shutdown(ctx)) + } + return nil } diff --git a/otelcol/config.go b/otelcol/config.go index ed196e2381a..5e28b096127 100644 --- a/otelcol/config.go +++ b/otelcol/config.go @@ -19,6 +19,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/service" ) @@ -42,6 +43,26 @@ type Config struct { Extensions map[component.ID]component.Config Service service.Config + + // stringMap is the map used to create this Config + stringMap map[string]interface{} +} + +func NewConfig(conf *confmap.Conf, factories component.Factories) (*Config, error) { + var err error + var cfg *configSettings + if cfg, err = unmarshal(conf, factories); err != nil { + return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err) + } + + return &Config{ + Receivers: cfg.Receivers.Configs(), + Processors: cfg.Processors.Configs(), + Exporters: cfg.Exporters.Configs(), + Extensions: cfg.Extensions.Configs(), + Service: cfg.Service, + stringMap: conf.ToStringMap(), + }, nil } // Validate returns an error if the config is invalid. @@ -131,3 +152,8 @@ func (cfg *Config) Validate() error { return nil } + +// ToStringMap returns the map used to create this Config +func (cfg *Config) ToStringMap() map[string]interface{} { + return cfg.stringMap +} diff --git a/otelcol/configprovider.go b/otelcol/configprovider.go index 411b11d6444..3caf50901b8 100644 --- a/otelcol/configprovider.go +++ b/otelcol/configprovider.go @@ -93,18 +93,7 @@ func (cm *configProvider) Get(ctx context.Context, factories Factories) (*Config return nil, fmt.Errorf("cannot resolve the configuration: %w", err) } - var cfg *configSettings - if cfg, err = unmarshal(conf, factories); err != nil { - return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err) - } - - return &Config{ - Receivers: cfg.Receivers.Configs(), - Processors: cfg.Processors.Configs(), - Exporters: cfg.Exporters.Configs(), - Extensions: cfg.Extensions.Configs(), - Service: cfg.Service, - }, nil + return NewConfig(conf, factories) } func (cm *configProvider) Watch() <-chan error { diff --git a/otelcol/configprovider_test.go b/otelcol/configprovider_test.go index 573cdde5981..e278e25e8ec 100644 --- a/otelcol/configprovider_test.go +++ b/otelcol/configprovider_test.go @@ -22,66 +22,29 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" + "gopkg.in/yaml.v3" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/confmap/provider/yamlprovider" - "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/extension/extensiontest" - "go.opentelemetry.io/collector/processor/processortest" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/collector/service" - "go.opentelemetry.io/collector/service/telemetry" ) -var configNop = &Config{ - Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()}, - Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()}, - Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()}, - Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()}, - Service: service.Config{ - Extensions: []component.ID{component.NewID("nop")}, - Pipelines: map[component.ID]*service.PipelineConfig{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewID("metrics"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewID("logs"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - Telemetry: telemetry.Config{ - Logs: telemetry.LogsConfig{ - Level: zapcore.InfoLevel, - Development: false, - Encoding: "console", - Sampling: &telemetry.LogsSamplingConfig{ - Initial: 100, - Thereafter: 100, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - DisableCaller: false, - DisableStacktrace: false, - InitialFields: map[string]interface{}(nil), - }, - Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "localhost:8888", - }, - }, - }, +func newConfigNop(factories component.Factories) (*Config, error) { + yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml")) + + if err != nil { + return nil, err + } + + stringMap := map[string]interface{}{} + err = yaml.Unmarshal(yamlBytes, stringMap) + + if err != nil { + return nil, err + } + + return NewConfig(confmap.NewFromStringMap(stringMap), factories) } func TestConfigProviderYaml(t *testing.T) { @@ -103,6 +66,9 @@ func TestConfigProviderYaml(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) + configNop, err := newConfigNop(factories) + require.NoError(t, err) + cfg, err := cp.Get(context.Background(), factories) require.NoError(t, err) assert.EqualValues(t, configNop, cfg) @@ -124,7 +90,11 @@ func TestConfigProviderFile(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) + configNop, err := newConfigNop(factories) + require.NoError(t, err) + cfg, err := cp.Get(context.Background(), factories) require.NoError(t, err) + assert.EqualValues(t, configNop, cfg) } diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 5ddc87ce239..82edbb0e892 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -84,6 +84,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyConfigResolved(data map[string]interface{}) error { + var errs error + for _, ext := range bes.extMap { + if cw, ok := ext.(extension.ConfigWatcher); ok { + errs = multierr.Append(errs, cw.ConfigResolved(data)) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index b3090247c26..1e293fc3f20 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -102,6 +102,132 @@ func TestBuildExtensions(t *testing.T) { } } +func TestNotifyConfigResovled(t *testing.T) { + notificationError := errors.New("Error processing config") + nopExtensionFactory := extensiontest.NewNopFactory() + nopExtensionConfig := nopExtensionFactory.CreateDefaultConfig() + n1ExtensionFactory := newConfigWatcherExtensionFactory("notifiable1", func() error { return nil }) + n1ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig() + n2ExtensionFactory := newConfigWatcherExtensionFactory("notifiable2", func() error { return nil }) + n2ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig() + nErrExtensionFactory := newConfigWatcherExtensionFactory("notifiableErr", func() error { return notificationError }) + nErrExtensionConfig := n1ExtensionFactory.CreateDefaultConfig() + + tests := []struct { + name string + factories map[component.Type]extension.Factory + extensionsConfigs map[component.ID]component.Config + serviceExtensions []component.ID + wantErrMsg string + want error + }{ + { + name: "No notifiable extensions", + factories: map[component.Type]extension.Factory{ + "nop": nopExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("nop"): nopExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("nop"), + }, + }, + { + name: "One notifiable extension", + factories: map[component.Type]extension.Factory{ + "notifiable1": n1ExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiable1"): n1ExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiable1"), + }, + }, + { + name: "Multiple notifiable extensions", + factories: map[component.Type]extension.Factory{ + "notifiable1": n1ExtensionFactory, + "notifiable2": n2ExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiable1"): n1ExtensionConfig, + component.NewID("notifiable2"): n2ExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiable1"), + component.NewID("notifiable2"), + }, + }, + { + name: "Errors in extension notification", + factories: map[component.Type]extension.Factory{ + "notifiableErr": nErrExtensionFactory, + }, + extensionsConfigs: map[component.ID]component.Config{ + component.NewID("notifiableErr"): nErrExtensionConfig, + }, + serviceExtensions: []component.ID{ + component.NewID("notifiableErr"), + }, + want: notificationError, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + extensions, err := New(context.Background(), Settings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + Configs: tt.extensionsConfigs, + Factories: tt.factories, + }, tt.serviceExtensions) + assert.NoError(t, err) + errs := extensions.NotifyConfigResolved(map[string]interface{}{}) + assert.Equal(t, errs, tt.want) + }) + } +} + +type configWatcherExtension struct { + fn func() error +} + +func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error { + return comp.fn() +} + +func (comp *configWatcherExtension) Shutdown(_ context.Context) error { + return comp.fn() +} + +func (comp *configWatcherExtension) ConfigResolved(data map[string]interface{}) error { + return comp.fn() +} + +func newConfigWatcherExtension(fn func() error) *configWatcherExtension { + comp := &configWatcherExtension{ + fn: fn, + } + + return comp + +} + +func newConfigWatcherExtensionFactory(name component.Type, fn func() error) extension.Factory { + return extension.NewFactory( + name, + func() component.Config { + return &struct{}{} + }, + func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) { + return newConfigWatcherExtension(fn), nil + }, + component.StabilityLevelDevelopment, + ) +} + func newBadExtensionFactory() extension.Factory { return extension.NewFactory( "bf", diff --git a/service/service.go b/service/service.go index 1848c306856..8557a02534a 100644 --- a/service/service.go +++ b/service/service.go @@ -172,6 +172,13 @@ func (srv *Service) Shutdown(ctx context.Context) error { return errs } +func (srv *Service) ConfigResolved(_ context.Context, effectiveConfig map[string]interface{}) error { + if err := srv.host.serviceExtensions.NotifyConfigResolved(effectiveConfig); err != nil { + return err + } + return nil +} + func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error { var err error extensionsSettings := extensions.Settings{