Skip to content

Commit

Permalink
Notify extensions of effective config
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Dec 19, 2022
1 parent 3efa2bc commit 97a0a3c
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 65 deletions.
16 changes: 16 additions & 0 deletions .chloggen/extension-effective-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow extensions to be notified of the effective config

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 4 additions & 0 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type PipelineWatcher interface {
NotReady() error
}

type ConfigWatcher interface {
ConfigResolved(data map[string]interface{}) error
}

// CreateSettings is passed to Factory.Create(...) function.
type CreateSettings = component.ExtensionCreateSettings //nolint:staticcheck

Expand Down
5 changes: 5 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return multierr.Combine(err, col.service.Shutdown(ctx))
}
col.setCollectorState(StateRunning)

if err = col.service.ConfigResolved(ctx, cfg.ToStringMap()); err != nil {
return multierr.Combine(err, col.service.Shutdown(ctx))
}

return nil
}

Expand Down
26 changes: 26 additions & 0 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/service"
)

Expand All @@ -42,6 +43,26 @@ type Config struct {
Extensions map[component.ID]component.Config

Service service.Config

// stringMap is the map used to create this Config
stringMap map[string]interface{}
}

func NewConfig(conf *confmap.Conf, factories component.Factories) (*Config, error) {
var err error
var cfg *configSettings
if cfg, err = unmarshal(conf, factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}

return &Config{
Receivers: cfg.Receivers.Configs(),
Processors: cfg.Processors.Configs(),
Exporters: cfg.Exporters.Configs(),
Extensions: cfg.Extensions.Configs(),
Service: cfg.Service,
stringMap: conf.ToStringMap(),
}, nil
}

// Validate returns an error if the config is invalid.
Expand Down Expand Up @@ -131,3 +152,8 @@ func (cfg *Config) Validate() error {

return nil
}

// ToStringMap returns the map used to create this Config
func (cfg *Config) ToStringMap() map[string]interface{} {
return cfg.stringMap
}
13 changes: 1 addition & 12 deletions otelcol/configprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,7 @@ func (cm *configProvider) Get(ctx context.Context, factories Factories) (*Config
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}

var cfg *configSettings
if cfg, err = unmarshal(conf, factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}

return &Config{
Receivers: cfg.Receivers.Configs(),
Processors: cfg.Processors.Configs(),
Exporters: cfg.Exporters.Configs(),
Extensions: cfg.Extensions.Configs(),
Service: cfg.Service,
}, nil
return NewConfig(conf, factories)
}

func (cm *configProvider) Watch() <-chan error {
Expand Down
76 changes: 23 additions & 53 deletions otelcol/configprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,66 +22,29 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/telemetry"
)

var configNop = &Config{
Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()},
Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()},
Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()},
Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()},
Service: service.Config{
Extensions: []component.ID{component.NewID("nop")},
Pipelines: map[component.ID]*service.PipelineConfig{
component.NewID("traces"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewID("metrics"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewID("logs"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Initial: 100,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]interface{}(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "localhost:8888",
},
},
},
func newConfigNop(factories component.Factories) (*Config, error) {
yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))

if err != nil {
return nil, err
}

stringMap := map[string]interface{}{}
err = yaml.Unmarshal(yamlBytes, stringMap)

if err != nil {
return nil, err
}

return NewConfig(confmap.NewFromStringMap(stringMap), factories)
}

func TestConfigProviderYaml(t *testing.T) {
Expand All @@ -103,6 +66,9 @@ func TestConfigProviderYaml(t *testing.T) {
factories, err := nopFactories()
require.NoError(t, err)

configNop, err := newConfigNop(factories)
require.NoError(t, err)

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)
assert.EqualValues(t, configNop, cfg)
Expand All @@ -124,7 +90,11 @@ func TestConfigProviderFile(t *testing.T) {
factories, err := nopFactories()
require.NoError(t, err)

configNop, err := newConfigNop(factories)
require.NoError(t, err)

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)

assert.EqualValues(t, configNop, cfg)
}
10 changes: 10 additions & 0 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyConfigResolved(data map[string]interface{}) error {
var errs error
for _, ext := range bes.extMap {
if cw, ok := ext.(extension.ConfigWatcher); ok {
errs = multierr.Append(errs, cw.ConfigResolved(data))
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
result := make(map[component.ID]component.Component, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
126 changes: 126 additions & 0 deletions service/extensions/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,132 @@ func TestBuildExtensions(t *testing.T) {
}
}

func TestNotifyConfigResovled(t *testing.T) {
notificationError := errors.New("Error processing config")
nopExtensionFactory := extensiontest.NewNopFactory()
nopExtensionConfig := nopExtensionFactory.CreateDefaultConfig()
n1ExtensionFactory := newConfigWatcherExtensionFactory("notifiable1", func() error { return nil })
n1ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig()
n2ExtensionFactory := newConfigWatcherExtensionFactory("notifiable2", func() error { return nil })
n2ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig()
nErrExtensionFactory := newConfigWatcherExtensionFactory("notifiableErr", func() error { return notificationError })
nErrExtensionConfig := n1ExtensionFactory.CreateDefaultConfig()

tests := []struct {
name string
factories map[component.Type]extension.Factory
extensionsConfigs map[component.ID]component.Config
serviceExtensions []component.ID
wantErrMsg string
want error
}{
{
name: "No notifiable extensions",
factories: map[component.Type]extension.Factory{
"nop": nopExtensionFactory,
},
extensionsConfigs: map[component.ID]component.Config{
component.NewID("nop"): nopExtensionConfig,
},
serviceExtensions: []component.ID{
component.NewID("nop"),
},
},
{
name: "One notifiable extension",
factories: map[component.Type]extension.Factory{
"notifiable1": n1ExtensionFactory,
},
extensionsConfigs: map[component.ID]component.Config{
component.NewID("notifiable1"): n1ExtensionConfig,
},
serviceExtensions: []component.ID{
component.NewID("notifiable1"),
},
},
{
name: "Multiple notifiable extensions",
factories: map[component.Type]extension.Factory{
"notifiable1": n1ExtensionFactory,
"notifiable2": n2ExtensionFactory,
},
extensionsConfigs: map[component.ID]component.Config{
component.NewID("notifiable1"): n1ExtensionConfig,
component.NewID("notifiable2"): n2ExtensionConfig,
},
serviceExtensions: []component.ID{
component.NewID("notifiable1"),
component.NewID("notifiable2"),
},
},
{
name: "Errors in extension notification",
factories: map[component.Type]extension.Factory{
"notifiableErr": nErrExtensionFactory,
},
extensionsConfigs: map[component.ID]component.Config{
component.NewID("notifiableErr"): nErrExtensionConfig,
},
serviceExtensions: []component.ID{
component.NewID("notifiableErr"),
},
want: notificationError,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
extensions, err := New(context.Background(), Settings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
Configs: tt.extensionsConfigs,
Factories: tt.factories,
}, tt.serviceExtensions)
assert.NoError(t, err)
errs := extensions.NotifyConfigResolved(map[string]interface{}{})
assert.Equal(t, errs, tt.want)
})
}
}

type configWatcherExtension struct {
fn func() error
}

func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error {
return comp.fn()
}

func (comp *configWatcherExtension) Shutdown(_ context.Context) error {
return comp.fn()
}

func (comp *configWatcherExtension) ConfigResolved(data map[string]interface{}) error {
return comp.fn()
}

func newConfigWatcherExtension(fn func() error) *configWatcherExtension {
comp := &configWatcherExtension{
fn: fn,
}

return comp

}

func newConfigWatcherExtensionFactory(name component.Type, fn func() error) extension.Factory {
return extension.NewFactory(
name,
func() component.Config {
return &struct{}{}
},
func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) {
return newConfigWatcherExtension(fn), nil
},
component.StabilityLevelDevelopment,
)
}

func newBadExtensionFactory() extension.Factory {
return extension.NewFactory(
"bf",
Expand Down
7 changes: 7 additions & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ func (srv *Service) Shutdown(ctx context.Context) error {
return errs
}

func (srv *Service) ConfigResolved(_ context.Context, effectiveConfig map[string]interface{}) error {
if err := srv.host.serviceExtensions.NotifyConfigResolved(effectiveConfig); err != nil {
return err
}
return nil
}

func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings, cfg Config) error {
var err error
extensionsSettings := extensions.Settings{
Expand Down

0 comments on commit 97a0a3c

Please sign in to comment.