-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathconfigprovider.go
148 lines (128 loc) · 4.94 KB
/
configprovider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelcol // import "go.opentelemetry.io/collector/otelcol"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
"go.opentelemetry.io/collector/confmap/provider/envprovider"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/httpprovider"
"go.opentelemetry.io/collector/confmap/provider/httpsprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
)
// ConfigProvider provides the service configuration.
//
// The typical usage is the following:
//
// cfgProvider.Get(...)
// cfgProvider.Watch() // wait for an event.
// cfgProvider.Get(...)
// cfgProvider.Watch() // wait for an event.
// // repeat Get/Watch cycle until it is time to shut down the Collector process.
// cfgProvider.Shutdown()
type ConfigProvider interface {
// Get returns the service configuration, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
Get(ctx context.Context, factories Factories) (*Config, error)
// Watch blocks until any configuration change was detected or an unrecoverable error
// happened during monitoring the configuration changes.
//
// Error is nil if the configuration is changed and needs to be re-fetched. Any non-nil
// error indicates that there was a problem with watching the config changes.
//
// Should never be called concurrently with itself or Get.
Watch() <-chan error
// Shutdown signals that the provider is no longer in use and the that should close
// and release any resources that it may have created.
//
// This function must terminate the Watch channel.
//
// Should never be called concurrently with itself or Get.
Shutdown(ctx context.Context) error
}
// ConfmapProvider is an optional interface to be implemented by ConfigProviders
// to provide confmap.Conf objects representing a marshaled version of the
// Collector's configuration.
//
// The purpose of this interface is that otelcol.ConfigProvider structs do not
// necessarily need to use confmap.Conf as their underlying config structure.
type ConfmapProvider interface {
// GetConfmap resolves the Collector's configuration and provides it as a confmap.Conf object.
//
// Should never be called concurrently with itself or any ConfigProvider method.
GetConfmap(ctx context.Context) (*confmap.Conf, error)
}
type configProvider struct {
mapResolver *confmap.Resolver
}
var _ ConfigProvider = (*configProvider)(nil)
var _ ConfmapProvider = (*configProvider)(nil)
// ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider.
type ConfigProviderSettings struct {
// ResolverSettings are the settings to configure the behavior of the confmap.Resolver.
ResolverSettings confmap.ResolverSettings
}
// NewConfigProvider returns a new ConfigProvider that provides the service configuration:
// * Initially it resolves the "configuration map":
// - Retrieve the confmap.Conf by merging all retrieved maps from the given `locations` in order.
// - Then applies all the confmap.Converter in the given order.
//
// * Then unmarshalls the confmap.Conf into the service Config.
func NewConfigProvider(set ConfigProviderSettings) (ConfigProvider, error) {
mr, err := confmap.NewResolver(set.ResolverSettings)
if err != nil {
return nil, err
}
return &configProvider{
mapResolver: mr,
}, nil
}
func (cm *configProvider) Get(ctx context.Context, factories Factories) (*Config, error) {
conf, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}
var cfg *configSettings
if cfg, err = unmarshal(conf, factories); err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}
return &Config{
Receivers: cfg.Receivers.Configs(),
Processors: cfg.Processors.Configs(),
Exporters: cfg.Exporters.Configs(),
Connectors: cfg.Connectors.Configs(),
Extensions: cfg.Extensions.Configs(),
Service: cfg.Service,
}, nil
}
func (cm *configProvider) Watch() <-chan error {
return cm.mapResolver.Watch()
}
func (cm *configProvider) Shutdown(ctx context.Context) error {
return cm.mapResolver.Shutdown(ctx)
}
func (cm *configProvider) GetConfmap(ctx context.Context) (*confmap.Conf, error) {
conf, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}
return conf, nil
}
func newDefaultConfigProviderSettings(uris []string) ConfigProviderSettings {
return ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: uris,
ProviderFactories: []confmap.ProviderFactory{
fileprovider.NewFactory(),
envprovider.NewFactory(),
yamlprovider.NewFactory(),
httpprovider.NewFactory(),
httpsprovider.NewFactory(),
},
ConverterFactories: []confmap.ConverterFactory{expandconverter.NewFactory()},
},
}
}