Skip to content

Commit

Permalink
Extend Expand converter to support expanding values from any provider
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Apr 19, 2022
1 parent 265a50a commit efecc0c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 41 deletions.
6 changes: 1 addition & 5 deletions config/configtest/configtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ func LoadConfigMap(fileName string) (*config.Map, error) {
if err != nil {
return nil, err
}
retMap := config.NewMap()
if err = ret.MergeTo(retMap, ""); err != nil {
return nil, err
}
return retMap, err
return ret.AsMap()
}

// CheckConfigStruct enforces that given configuration object is following the patterns
Expand Down
11 changes: 8 additions & 3 deletions config/mapprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type ChangeEvent struct {

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
type Retrieved interface {
MergeTo(dest *Map, destPath string) error
AsMap() (*Map, error)
AsValue() (interface{}, error)
Close(ctx context.Context) error
}

Expand All @@ -100,8 +101,12 @@ func NewRetrievedFromMap(cfgMap *Map, close CloseFunc) Retrieved {
return &mapRetrieved{cfgMap: cfgMap, CloseFunc: close}
}

func (r *mapRetrieved) MergeTo(dest *Map, destPath string) error {
return dest.k.MergeAt(r.cfgMap.k, destPath)
func (r *mapRetrieved) AsMap() (*Map, error) {
return r.cfgMap, nil
}

func (r *mapRetrieved) AsValue() (interface{}, error) {
return r.cfgMap.ToStringMap(), nil
}

// CloseFunc a function to close and release any watchers that it may have created.
Expand Down
4 changes: 2 additions & 2 deletions config/mapprovider/envmapprovider/mapprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func TestEnv(t *testing.T) {
env := New()
ret, err := env.Retrieve(context.Background(), envSchemePrefix+envName, nil)
require.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
expectedMap := config.NewMapFromStringMap(map[string]interface{}{
"processors::batch": nil,
"exporters::otlp::endpoint": "localhost:4317",
Expand Down
8 changes: 4 additions & 4 deletions config/mapprovider/filemapprovider/mapprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func TestRelativePath(t *testing.T) {
fp := New()
ret, err := fp.Retrieve(context.Background(), fileSchemePrefix+filepath.Join("testdata", "default-config.yaml"), nil)
require.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
expectedMap := config.NewMapFromStringMap(map[string]interface{}{
"processors::batch": nil,
"exporters::otlp::endpoint": "localhost:4317",
Expand All @@ -78,8 +78,8 @@ func TestAbsolutePath(t *testing.T) {
fp := New()
ret, err := fp.Retrieve(context.Background(), fileSchemePrefix+absolutePath(t, filepath.Join("testdata", "default-config.yaml")), nil)
require.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
expectedMap := config.NewMapFromStringMap(map[string]interface{}{
"processors::batch": nil,
"exporters::otlp::endpoint": "localhost:4317",
Expand Down
23 changes: 11 additions & 12 deletions config/mapprovider/yamlmapprovider/mapprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config"
"github.com/stretchr/testify/require"
)

func TestEmpty(t *testing.T) {
Expand All @@ -41,8 +40,8 @@ func TestOneValue(t *testing.T) {
sp := New()
ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch::timeout: 2s", nil)
assert.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"processors": map[string]interface{}{
"batch": map[string]interface{}{
Expand All @@ -57,8 +56,8 @@ func TestNamedComponent(t *testing.T) {
sp := New()
ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch/foo::timeout: 3s", nil)
assert.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"processors": map[string]interface{}{
"batch/foo": map[string]interface{}{
Expand All @@ -73,8 +72,8 @@ func TestMapEntry(t *testing.T) {
sp := New()
ret, err := sp.Retrieve(context.Background(), "yaml:processors: {batch/foo::timeout: 3s, batch::timeout: 2s}", nil)
assert.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"processors": map[string]interface{}{
"batch/foo": map[string]interface{}{
Expand All @@ -92,8 +91,8 @@ func TestNewLine(t *testing.T) {
sp := New()
ret, err := sp.Retrieve(context.Background(), "yaml:processors::batch/foo::timeout: 3s\nprocessors::batch::timeout: 2s", nil)
assert.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{
"processors": map[string]interface{}{
"batch/foo": map[string]interface{}{
Expand All @@ -111,8 +110,8 @@ func TestDotSeparator(t *testing.T) {
sp := New()
ret, err := sp.Retrieve(context.Background(), "yaml:processors.batch.timeout: 4s", nil)
assert.NoError(t, err)
retMap := config.NewMap()
assert.NoError(t, ret.MergeTo(retMap, ""))
retMap, err := ret.AsMap()
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{"processors.batch.timeout": "4s"}, retMap.ToStringMap())
assert.NoError(t, sp.Shutdown(context.Background()))
}
106 changes: 91 additions & 15 deletions service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type configProvider struct {
configUnmarshaler configunmarshaler.ConfigUnmarshaler

sync.Mutex
closer config.CloseFunc
closers []config.CloseFunc
watcher chan error
}

Expand All @@ -91,6 +91,14 @@ type ConfigProviderSettings struct {
// MapConverters is a slice of config.MapConverterFunc.
MapConverters []config.MapConverterFunc

// ExpandConfigLocations if true, it expands string values in the provided config.Map (merging all retrieved maps
// from the given `locations`) right before calling the converters. This means that the converters will be executed
// on the expanded map.
//
// A value is expended if and only if it follows the format "${scheme:opaque_value}".
// If there is a config.MapProvider with the associated "scheme", otherwise an error is returned.
ExpandConfigLocations bool

// Deprecated: [v0.50.0] because providing custom ConfigUnmarshaler is not necessary since users can wrap/implement
// ConfigProvider if needed to change the resulted config. This functionality will be kept for at least 2 minor versions,
// and if nobody express a need for it will be removed.
Expand Down Expand Up @@ -144,12 +152,12 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories
return nil, fmt.Errorf("cannot close previous watch: %w", err)
}

retMap, closer, err := cm.mergeRetrieve(ctx)
retMap, closers, err := cm.mergeRetrieve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot retrieve the configuration: %w", err)
}

cm.closer = closer
cm.closers = closers

// Apply all converters.
for _, cfgMapConv := range cm.configMapConverters {
Expand Down Expand Up @@ -182,10 +190,11 @@ func (cm *configProvider) onChange(event *config.ChangeEvent) {
}

func (cm *configProvider) closeIfNeeded(ctx context.Context) error {
if cm.closer != nil {
return cm.closer(ctx)
var err error
for _, ret := range cm.closers {
err = multierr.Append(err, ret(ctx))
}
return nil
return err
}

func (cm *configProvider) Shutdown(ctx context.Context) error {
Expand All @@ -204,7 +213,7 @@ func (cm *configProvider) Shutdown(ctx context.Context) error {
// https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax
var driverLetterRegexp = regexp.MustCompile("^[A-z]:")

func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, config.CloseFunc, error) {
func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, []config.CloseFunc, error) {
var closers []config.CloseFunc
retCfgMap := config.NewMap()
for _, location := range cm.locations {
Expand All @@ -225,19 +234,86 @@ func (cm *configProvider) mergeRetrieve(ctx context.Context) (*config.Map, confi
if err != nil {
return nil, nil, err
}
if err = ret.MergeTo(retCfgMap, ""); err != nil {
retMap, err := ret.AsMap()
if err != nil {
return nil, nil, err
}
if err = retCfgMap.Merge(retMap); err != nil {
return nil, nil, err
}
closers = append(closers, ret.Close)
}
return retCfgMap,
func(ctxF context.Context) error {
var err error
for _, ret := range closers {
err = multierr.Append(err, ret(ctxF))
return retCfgMap, closers, nil
}

func (cm *configProvider) expandValueLocations(ctx context.Context, cfgMap *config.Map) ([]config.CloseFunc, error) {
var closers []config.CloseFunc
for _, k := range cfgMap.AllKeys() {
val, clsrs, err := cm.expandStringValues(ctx, cfgMap.Get(k))
if err != nil {
return nil, err
}
closers = append(closers, clsrs...)
cfgMap.Set(k, val)
}
return closers, nil
}

func (cm *configProvider) expandStringValues(ctx context.Context, value interface{}) (interface{}, []config.CloseFunc, error) {
switch v := value.(type) {
case string:
// If it doesn't have the format "${scheme:opaque}" no need to expand.
if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") {
return value, nil, nil
}
location := v[2 : len(v)-1]
// For backwards compatibility:
// - empty scheme means "env".
scheme := "env"
if idx := strings.Index(location, ":"); idx != -1 {
scheme = location[:idx]
} else {
location = scheme + ":" + location
}
p, ok := cm.configMapProviders[scheme]
if !ok {
return nil, nil, fmt.Errorf("scheme %v is not supported for location %v", scheme, location)
}
ret, err := p.Retrieve(ctx, location, cm.onChange)
if err != nil {
return nil, nil, err
}
value, err = ret.AsValue()
if err != nil {
return nil, nil, err
}
return value, []config.CloseFunc{ret.Close}, nil
case []interface{}:
nslice := make([]interface{}, 0, len(v))
var closers []config.CloseFunc
for _, vint := range v {
val, clsrs, err := cm.expandStringValues(ctx, vint)
if err != nil {
return nil, nil, err
}
return err
}, nil
nslice = append(nslice, val)
closers = append(closers, clsrs...)
}
return nslice, closers, nil
case map[string]interface{}:
nmap := map[string]interface{}{}
var closers []config.CloseFunc
for mk, mv := range v {
val, clsrs, err := cm.expandStringValues(ctx, mv)
if err != nil {
return nil, nil, err
}
nmap[mk] = val
closers = append(closers, clsrs...)
}
return nmap, closers, nil
}
return value, nil, nil
}

func makeConfigMapProviderMap(providers ...config.MapProvider) map[string]config.MapProvider {
Expand Down

0 comments on commit efecc0c

Please sign in to comment.