Skip to content

Commit

Permalink
Extend expand capability to support expanding values from Provider
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 9, 2022
1 parent 55829e2 commit b9ea5db
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 32 deletions.
26 changes: 19 additions & 7 deletions confmap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,51 @@ The `Resolver` receives as input a set of `Providers`, a list of `Converters`, a
`configURI` that will be used to generate the resulting, or effective, configuration in the form of a `Conf`,
that can be used by code that is oblivious to the usage of `Providers` and `Converters`.

`Providers` are used to provide an entire configuration when the `configURI` is given directly to the `Resolver`,
or an individual value (partial configuration) when the `configURI` is embedded into the `Conf` as a values using
the syntax `${configURI}`.

```terminal
Resolver Provider
│ │
Resolve │ │
────────────────►│ │
│ │
┌─ │ Retrieve │
│ ├─────────────────────────►│
│ │
│ │ Conf
│ │◄─────────────────────────┤
foreach │ │ │
foreach │ │ │
configURI │ ├───┐ │
│ │ │Merge │
│ │◄──┘ │
└─ │ │
┌─ │ Retrieve │
│ ├─────────────────────────►│
│ │ Partial Conf Value │
│ │◄─────────────────────────┤
foreach │ │ │
embedded │ │ │
configURI │ ├───┐ │
│ │ │Replace │
│ │◄──┘ │
└─ │ │
│ Converter │
│ │ │
┌─ │ Convert │ │
│ ├───────────────►│ │
foreach │ │ │ │
Converter │ │◄───────────────┤ │
└─ │ │
│ │
◄────────────────┤ │
│ │
```

The `Resolve` method proceeds in the following steps:

1. Start with an empty "result" of `Conf` type.
2. For each config URI retrieves individual configurations, and merges it into the "result".
2. For each "Converter", call "Convert" for the "result".
4. Return the "result", aka effective, configuration.
3. For each embedded config URI retrieves individual value, and replaces it into the "result".
4. For each "Converter", call "Convert" for the "result".
5. Return the "result", aka effective, configuration.

### Watching for Updates
After the configuration was processed, the `Resolver` can be used as a single point to watch for updates in the
Expand Down
59 changes: 54 additions & 5 deletions confmap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package confmap // import "go.opentelemetry.io/collector/confmap"

import (
"context"
"fmt"

"go.uber.org/multierr"
)

// Provider is an interface that helps to retrieve a config map and watch for any
Expand Down Expand Up @@ -81,7 +84,7 @@ type ChangeEvent struct {

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
type Retrieved struct {
conf *Conf
rawConf interface{}
closeFunc CloseFunc
}

Expand All @@ -101,12 +104,19 @@ func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption {
}

// NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config.
func NewRetrieved(rawConf map[string]interface{}, opts ...RetrievedOption) (Retrieved, error) {
// The rawConf can be one of the following types:
// * Primitives: int, int64, float32, float64, bool, string;
// * []interface{} - every member follows the same rules as the given interface{};
// * map[string]interface{} - every value follows the same rules as the given interface{};
func NewRetrieved(rawConf interface{}, opts ...RetrievedOption) (Retrieved, error) {
if err := checkRawConfType(rawConf); err != nil {
return Retrieved{}, err
}
set := retrievedSettings{}
for _, opt := range opts {
opt(&set)
}
return Retrieved{conf: NewFromStringMap(rawConf), closeFunc: set.closeFunc}, nil
return Retrieved{rawConf: rawConf, closeFunc: set.closeFunc}, nil
}

// Deprecated: [v0.53.0] Use AsConf.
Expand All @@ -115,8 +125,23 @@ func (r *Retrieved) AsMap() (*Conf, error) {
}

// AsConf returns the retrieved configuration parsed as a Conf.
func (r Retrieved) AsConf() (*Conf, error) {
return r.conf, nil
func (r *Retrieved) AsConf() (*Conf, error) {
if r.rawConf == nil {
return New(), nil
}
val, ok := r.rawConf.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf)
}
return NewFromStringMap(val), nil
}

// AsRaw returns the retrieved configuration parsed as an interface{} which can be one of the following types:
// * Primitive float/int/bool/string;
// * []interface{} - every member follows the same rules as the given interface{};
// * map[string]interface{} - every value follows the same rules as the given interface{};
func (r *Retrieved) AsRaw() (interface{}, error) {
return r.rawConf, nil
}

// Close and release any watchers that Provider.Retrieve may have created.
Expand All @@ -134,3 +159,27 @@ func (r Retrieved) Close(ctx context.Context) error {

// CloseFunc a function equivalent to Retrieved.Close.
type CloseFunc func(context.Context) error

func checkRawConfType(rawConf interface{}) error {
if rawConf == nil {
return nil
}
switch value := rawConf.(type) {
case int, int64, float32, float64, bool, string:
return nil
case []interface{}:
var err error
for _, val := range value {
err = multierr.Append(err, checkRawConfType(val))
}
return err
case map[string]interface{}:
var err error
for _, val := range value {
err = multierr.Append(err, checkRawConfType(val))
}
return err
default:
return fmt.Errorf("unsupported type=%T for retrieved config", value)
}
}
2 changes: 1 addition & 1 deletion confmap/provider/internal/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// * yamlBytes the yaml bytes that will be deserialized.
// * opts specifies options associated with this Retrieved value, such as CloseFunc.
func NewRetrievedFromYAML(yamlBytes []byte, opts ...confmap.RetrievedOption) (confmap.Retrieved, error) {
var rawConf map[string]interface{}
var rawConf interface{}
if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil {
return confmap.Retrieved{}, err
}
Expand Down
9 changes: 6 additions & 3 deletions confmap/provider/internal/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ func TestNewRetrievedFromYAMLWithOptions(t *testing.T) {

func TestNewRetrievedFromYAMLInvalidYAMLBytes(t *testing.T) {
_, err := NewRetrievedFromYAML([]byte("[invalid:,"))
require.Error(t, err)
assert.Error(t, err)
}

func TestNewRetrievedFromYAMLInvalidAsMap(t *testing.T) {
_, err := NewRetrievedFromYAML([]byte("string"))
require.Error(t, err)
ret, err := NewRetrievedFromYAML([]byte("string"))
require.NoError(t, err)

_, err = ret.AsConf()
assert.Error(t, err)
}
81 changes: 69 additions & 12 deletions confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,28 +115,21 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
// For backwards compatibility:
// - empty url scheme means "file".
// - "^[A-z]:" also means "file"
scheme := "file"
if idx := strings.Index(uri, ":"); idx != -1 && !driverLetterRegexp.MatchString(uri) {
scheme = uri[:idx]
} else {
uri = scheme + ":" + uri
if driverLetterRegexp.MatchString(uri) {
uri = "file:" + uri
}
p, ok := mr.providers[scheme]
if !ok {
return nil, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri)
}
ret, err := p.Retrieve(ctx, uri, mr.onChange)
ret, err := mr.retrieveValue(ctx, uri, "file")
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot retrieve the configuration: %w", err)
}
mr.closers = append(mr.closers, ret.Close)
retCfgMap, err := ret.AsConf()
if err != nil {
return nil, err
}
if err = retMap.Merge(retCfgMap); err != nil {
return nil, err
}
mr.closers = append(mr.closers, ret.Close)
}

// Apply the converters in the given order.
Expand All @@ -146,6 +139,14 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
}
}

for _, k := range retMap.AllKeys() {
val, err := mr.expandValue(ctx, retMap.Get(k))
if err != nil {
return nil, err
}
retMap.Set(k, val)
}

return retMap, nil
}

Expand Down Expand Up @@ -187,3 +188,59 @@ func (mr *Resolver) closeIfNeeded(ctx context.Context) error {
}
return err
}

func (mr *Resolver) expandValue(ctx context.Context, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
// If it doesn't have the format "${scheme:opaque}" no need to expand.
if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") {
return value, nil
}
location := v[2 : len(v)-1]
// For backwards compatibility:
// - empty scheme means "env".
ret, err := mr.retrieveValue(ctx, location, "env")
if err != nil {
return nil, err
}
mr.closers = append(mr.closers, ret.Close)
return ret.AsRaw()
case []interface{}:
nslice := make([]interface{}, 0, len(v))
for _, vint := range v {
val, err := mr.expandValue(ctx, vint)
if err != nil {
return nil, err
}
nslice = append(nslice, val)
}
return nslice, nil
case map[string]interface{}:
nmap := map[string]interface{}{}
for mk, mv := range v {
val, err := mr.expandValue(ctx, mv)
if err != nil {
return nil, err
}
nmap[mk] = val
}
return nmap, nil
}
return value, nil
}

func (mr *Resolver) retrieveValue(ctx context.Context, uri string, defaultScheme string) (Retrieved, error) {
// For backwards compatibility:
// - empty scheme means "env".
scheme := defaultScheme
if idx := strings.Index(uri, ":"); idx != -1 {
scheme = uri[:idx]
} else {
uri = scheme + ":" + uri
}
p, ok := mr.providers[scheme]
if !ok {
return Retrieved{}, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri)
}
return p.Retrieve(ctx, uri, mr.onChange)
}
70 changes: 66 additions & 4 deletions confmap/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

type mockProvider struct {
scheme string
retM map[string]interface{}
retM interface{}
errR error
errS error
errW error
Expand All @@ -41,9 +41,8 @@ func (m *mockProvider) Retrieve(_ context.Context, _ string, watcher WatcherFunc
if m.retM == nil {
return NewRetrieved(nil)
}
if watcher != nil {
watcher(&ChangeEvent{Error: m.errW})
}

watcher(&ChangeEvent{Error: m.errW})
return NewRetrieved(m.retM, WithRetrievedClose(func(ctx context.Context) error { return m.errC }))
}

Expand Down Expand Up @@ -306,6 +305,69 @@ func TestResolverShutdownClosesWatch(t *testing.T) {
watcherWG.Wait()
}

func TestResolverExpandEnvVars(t *testing.T) {
var testCases = []struct {
name string // test case name (also file name containing config yaml)
}{
{name: "expand-with-no-env.yaml"},
{name: "expand-with-partial-env.yaml"},
{name: "expand-with-all-env.yaml"},
{name: "expand-with-all-env-with-source.yaml"},
}

envs := map[string]string{
"EXTRA": "some string",
"EXTRA_MAP_VALUE_1": "some map value_1",
"EXTRA_MAP_VALUE_2": "some map value_2",
"EXTRA_LIST_MAP_VALUE_1": "some list map value_1",
"EXTRA_LIST_MAP_VALUE_2": "some list map value_2",
"EXTRA_LIST_VALUE_1": "some list value_1",
"EXTRA_LIST_VALUE_2": "some list value_2",
}

expectedCfgMap := newConfFromFile(t, filepath.Join("testdata", "expand-with-no-env.yaml"))
fileProvider := newFakeProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) {
return NewRetrieved(newConfFromFile(t, uri[5:]))
})
envProvider := newFakeProvider("env", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) {
return NewRetrieved(envs[uri[4:]])
})

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
resolver, err := NewResolver(ResolverSettings{URIs: []string{filepath.Join("testdata", test.name)}, Providers: makeMapProvidersMap(fileProvider, envProvider), Converters: nil})
require.NoError(t, err)
// Test that expanded configs are the same with the simple config with no env vars.
cfgMap, err := resolver.Resolve(context.Background())
require.NoError(t, err)
assert.Equal(t, expectedCfgMap, cfgMap.ToStringMap())
})
}
}

func TestResolverExpandMapAndSliceValues(t *testing.T) {
provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) {
return NewRetrieved(map[string]interface{}{
"test_map": map[string]interface{}{"recv": "${test:MAP_VALUE}"},
"test_slice": []interface{}{"${test:MAP_VALUE}"}})
})

const receiverExtraMapValue = "some map value"
testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) {
return NewRetrieved(receiverExtraMapValue)
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

cfgMap, err := resolver.Resolve(context.Background())
require.NoError(t, err)
expectedMap := map[string]interface{}{
"test_map": map[string]interface{}{"recv": receiverExtraMapValue},
"test_slice": []interface{}{receiverExtraMapValue}}
assert.Equal(t, expectedMap, cfgMap.ToStringMap())
}

func makeMapProvidersMap(providers ...Provider) map[string]Provider {
ret := make(map[string]Provider, len(providers))
for _, provider := range providers {
Expand Down
11 changes: 11 additions & 0 deletions confmap/testdata/expand-with-all-env-with-source.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
test_map:
extra: "${env:EXTRA}"
extra_map:
recv.1: "${env:EXTRA_MAP_VALUE_1}"
recv.2: "${env:EXTRA_MAP_VALUE_2}"
extra_list_map:
- { recv.1: "${env:EXTRA_LIST_MAP_VALUE_1}",recv.2: "${env:EXTRA_LIST_MAP_VALUE_2}" }
extra_list:
- "${env:EXTRA_LIST_VALUE_1}"
- "${env:EXTRA_LIST_VALUE_2}"

Loading

0 comments on commit b9ea5db

Please sign in to comment.