diff --git a/config/configtest/configtest.go b/config/configtest/configtest.go index 0e26eef61d7..99189b7708c 100644 --- a/config/configtest/configtest.go +++ b/config/configtest/configtest.go @@ -36,11 +36,7 @@ func LoadConfigMap(fileName string) (*config.Map, error) { if err != nil { return nil, err } - retMap := config.NewMap() - if err = ret.MergeTo(retMap, ""); err != nil { - return nil, err - } - return retMap, err + return ret.AsMap() } // CheckConfigStruct enforces that given configuration object is following the patterns diff --git a/config/mapprovider.go b/config/mapprovider.go index 73c8292a2a0..9abf85121c9 100644 --- a/config/mapprovider.go +++ b/config/mapprovider.go @@ -81,7 +81,8 @@ type ChangeEvent struct { // Retrieved holds the result of a call to the Retrieve method of a Provider object. type Retrieved interface { - MergeTo(dest *Map, destPath string) error + AsMap() (*Map, error) + AsValue() (interface{}, error) Close(ctx context.Context) error } @@ -100,8 +101,12 @@ func NewRetrievedFromMap(cfgMap *Map, close CloseFunc) Retrieved { return &mapRetrieved{cfgMap: cfgMap, CloseFunc: close} } -func (r *mapRetrieved) MergeTo(dest *Map, destPath string) error { - return dest.k.MergeAt(r.cfgMap.k, destPath) +func (r *mapRetrieved) AsMap() (*Map, error) { + return r.cfgMap, nil +} + +func (r *mapRetrieved) AsValue() (interface{}, error) { + return r.cfgMap.ToStringMap(), nil } // CloseFunc a function to close and release any watchers that it may have created. diff --git a/config/mapprovider/envmapprovider/mapprovider_test.go b/config/mapprovider/envmapprovider/mapprovider_test.go index 32a119f6498..32b21ccd8a2 100644 --- a/config/mapprovider/envmapprovider/mapprovider_test.go +++ b/config/mapprovider/envmapprovider/mapprovider_test.go @@ -62,8 +62,8 @@ func TestEnv(t *testing.T) { env := New() ret, err := env.Retrieve(context.Background(), envSchemePrefix+envName, nil) require.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) expectedMap := config.NewMapFromStringMap(map[string]interface{}{ "processors::batch": nil, "exporters::otlp::endpoint": "localhost:4317", diff --git a/config/mapprovider/filemapprovider/mapprovider_test.go b/config/mapprovider/filemapprovider/mapprovider_test.go index db2ad60c8a6..07d79d7fb82 100644 --- a/config/mapprovider/filemapprovider/mapprovider_test.go +++ b/config/mapprovider/filemapprovider/mapprovider_test.go @@ -64,8 +64,8 @@ func TestRelativePath(t *testing.T) { fp := New() ret, err := fp.Retrieve(context.Background(), fileSchemePrefix+filepath.Join("testdata", "default-config.yaml"), nil) require.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) expectedMap := config.NewMapFromStringMap(map[string]interface{}{ "processors::batch": nil, "exporters::otlp::endpoint": "localhost:4317", @@ -78,8 +78,8 @@ func TestAbsolutePath(t *testing.T) { fp := New() ret, err := fp.Retrieve(context.Background(), fileSchemePrefix+absolutePath(t, filepath.Join("testdata", "default-config.yaml")), nil) require.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) expectedMap := config.NewMapFromStringMap(map[string]interface{}{ "processors::batch": nil, "exporters::otlp::endpoint": "localhost:4317", diff --git a/config/mapprovider/yamlmapprovider/mapprovider_test.go b/config/mapprovider/yamlmapprovider/mapprovider_test.go index d199247d3b6..b28c48bd219 100644 --- a/config/mapprovider/yamlmapprovider/mapprovider_test.go +++ b/config/mapprovider/yamlmapprovider/mapprovider_test.go @@ -19,8 +19,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/config" + "github.com/stretchr/testify/require" ) func TestEmpty(t *testing.T) { @@ -41,8 +40,8 @@ func TestOneValue(t *testing.T) { sp := New() ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch::timeout: 2s", nil) assert.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) assert.Equal(t, map[string]interface{}{ "processors": map[string]interface{}{ "batch": map[string]interface{}{ @@ -57,8 +56,8 @@ func TestNamedComponent(t *testing.T) { sp := New() ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch/foo::timeout: 3s", nil) assert.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) assert.Equal(t, map[string]interface{}{ "processors": map[string]interface{}{ "batch/foo": map[string]interface{}{ @@ -73,8 +72,8 @@ func TestMapEntry(t *testing.T) { sp := New() ret, err := sp.Retrieve(context.Background(), "yaml:processors: {batch/foo::timeout: 3s, batch::timeout: 2s}", nil) assert.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) assert.Equal(t, map[string]interface{}{ "processors": map[string]interface{}{ "batch/foo": map[string]interface{}{ @@ -92,8 +91,8 @@ func TestNewLine(t *testing.T) { sp := New() ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch/foo::timeout: 3s\nprocessors::batch::timeout: 2s", nil) assert.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) assert.Equal(t, map[string]interface{}{ "processors": map[string]interface{}{ "batch/foo": map[string]interface{}{ @@ -111,8 +110,8 @@ func TestDotSeparator(t *testing.T) { sp := New() ret, err := sp.Retrieve(context.Background(), "yaml:processors.batch.timeout: 4s", nil) assert.NoError(t, err) - retMap := config.NewMap() - assert.NoError(t, ret.MergeTo(retMap, "")) + retMap, err := ret.AsMap() + require.NoError(t, err) assert.Equal(t, map[string]interface{}{"processors.batch.timeout": "4s"}, retMap.ToStringMap()) assert.NoError(t, sp.Shutdown(context.Background())) } diff --git a/service/config_provider.go b/service/config_provider.go index 73717b015ae..9379f9c1be5 100644 --- a/service/config_provider.go +++ b/service/config_provider.go @@ -74,7 +74,7 @@ type configProvider struct { configUnmarshaler configunmarshaler.ConfigUnmarshaler sync.Mutex - closer config.CloseFunc + closers []config.CloseFunc watcher chan error } @@ -91,6 +91,14 @@ type ConfigProviderSettings struct { // MapConverters is a slice of config.MapConverterFunc. MapConverters []config.MapConverterFunc + // ExpandConfigLocations if true, it expands string values in the provided config.Map (merging all retrieved maps + // from the given `locations`) right before calling the converters. This means that the converters will be executed + // on the expanded map. + // + // A value is expended if and only if it follows the format "${scheme:opaque_value}". + // If there is a config.MapProvider with the associated "scheme", otherwise an error is returned. + ExpandConfigLocations bool + // Deprecated: [v0.50.0] because providing custom ConfigUnmarshaler is not necessary since users can wrap/implement // ConfigProvider if needed to change the resulted config. This functionality will be kept for at least 2 minor versions, // and if nobody express a need for it will be removed. @@ -144,12 +152,12 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories return nil, fmt.Errorf("cannot close previous watch: %w", err) } - retMap, closer, err := cm.mergeRetrieve(ctx) + retMap, closers, err := cm.mergeRetrieve(ctx) if err != nil { return nil, fmt.Errorf("cannot retrieve the configuration: %w", err) } - cm.closer = closer + cm.closers = closers // Apply all converters. for _, cfgMapConv := range cm.configMapConverters { @@ -182,10 +190,11 @@ func (cm *configProvider) onChange(event *config.ChangeEvent) { } func (cm *configProvider) closeIfNeeded(ctx context.Context) error { - if cm.closer != nil { - return cm.closer(ctx) + var err error + for _, ret := range cm.closers { + err = multierr.Append(err, ret(ctx)) } - return nil + return err } func (cm *configProvider) Shutdown(ctx context.Context) error { @@ -204,7 +213,7 @@ func (cm *configProvider) Shutdown(ctx context.Context) error { // https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax var driverLetterRegexp = regexp.MustCompile("^[A-z]:") -func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, config.CloseFunc, error) { +func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, []config.CloseFunc, error) { var closers []config.CloseFunc retCfgMap := config.NewMap() for _, location := range cm.locations { @@ -225,19 +234,86 @@ func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, confi if err != nil { return nil, nil, err } - if err = ret.MergeTo(retCfgMap, ""); err != nil { + retMap, err := ret.AsMap() + if err != nil { + return nil, nil, err + } + if err = retCfgMap.Merge(retMap); err != nil { return nil, nil, err } closers = append(closers, ret.Close) } - return retCfgMap, - func(ctxF context.Context) error { - var err error - for _, ret := range closers { - err = multierr.Append(err, ret(ctxF)) + return retCfgMap, closers, nil +} + +func (cm *configProvider) expandValueLocations(ctx context.Context, cfgMap *config.Map) ([]config.CloseFunc, error) { + var closers []config.CloseFunc + for _, k := range cfgMap.AllKeys() { + val, clsrs, err := cm.expandStringValues(ctx, cfgMap.Get(k)) + if err != nil { + return nil, err + } + closers = append(closers, clsrs...) + cfgMap.Set(k, val) + } + return closers, nil +} + +func (cm *configProvider) expandStringValues(ctx context.Context, value interface{}) (interface{}, []config.CloseFunc, error) { + switch v := value.(type) { + case string: + // If it doesn't have the format "${scheme:opaque}" no need to expand. + if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") { + return value, nil, nil + } + location := v[2 : len(v)-1] + // For backwards compatibility: + // - empty scheme means "env". + scheme := "env" + if idx := strings.Index(location, ":"); idx != -1 { + scheme = location[:idx] + } else { + location = scheme + ":" + location + } + p, ok := cm.configMapProviders[scheme] + if !ok { + return nil, nil, fmt.Errorf("scheme %v is not supported for location %v", scheme, location) + } + ret, err := p.Retrieve(ctx, location, cm.onChange) + if err != nil { + return nil, nil, err + } + value, err = ret.AsValue() + if err != nil { + return nil, nil, err + } + return value, []config.CloseFunc{ret.Close}, nil + case []interface{}: + nslice := make([]interface{}, 0, len(v)) + var closers []config.CloseFunc + for _, vint := range v { + val, clsrs, err := cm.expandStringValues(ctx, vint) + if err != nil { + return nil, nil, err } - return err - }, nil + nslice = append(nslice, val) + closers = append(closers, clsrs...) + } + return nslice, closers, nil + case map[string]interface{}: + nmap := map[string]interface{}{} + var closers []config.CloseFunc + for mk, mv := range v { + val, clsrs, err := cm.expandStringValues(ctx, mv) + if err != nil { + return nil, nil, err + } + nmap[mk] = val + closers = append(closers, clsrs...) + } + return nmap, closers, nil + } + return value, nil, nil } func makeConfigMapProviderMap(providers ...config.MapProvider) map[string]config.MapProvider {