Skip to content

Commit

Permalink
Fix initialization of the MetricProvider (#5571)
Browse files Browse the repository at this point in the history
The problem was that the MetricProvider is initialized into the "service.telemetry.MetricProvider" after components were created. This change was not a trivial change because the process telemetry initialization requires the ballast size, which is available after the extensions are initialized, because of that I split the initialization of the MetricProvider/oc.Registry from the initialization of the process telemetry.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 29, 2022
1 parent bb45d00 commit d63aea3
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 103 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Deprecate `service.ConfigServiceTelemetry`, `service.ConfigServiceTelemetryLogs`, and `service.ConfigServiceTelemetryMetrics` (#5565)

### 🧰 Bug fixes 🧰

- Fix initialization of the OpenTelemetry MetricProvider. (#5571)

## v0.54.0 Beta

### 🛑 Breaking changes 🛑
Expand Down
30 changes: 13 additions & 17 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (col *Collector) Shutdown() {

// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen.
func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error {
col.service.telemetry.Logger.Info("Everything is ready. Begin running and processing data.")
col.service.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")

col.signalsChannel = make(chan os.Signal, 1)
// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
Expand All @@ -139,11 +139,11 @@ LOOP:
select {
case err := <-col.set.ConfigProvider.Watch():
if err != nil {
col.service.telemetry.Logger.Error("Config watch failed", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err))
break LOOP
}

col.service.telemetry.Logger.Warn("Config updated, restart service")
col.service.telemetrySettings.Logger.Warn("Config updated, restart service")
col.setCollectorState(Closing)

if err = col.service.Shutdown(ctx); err != nil {
Expand All @@ -153,16 +153,16 @@ LOOP:
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
col.service.telemetry.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
break LOOP
case s := <-col.signalsChannel:
col.service.telemetry.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
break LOOP
case <-col.shutdownChan:
col.service.telemetry.Logger.Info("Received shutdown request")
col.service.telemetrySettings.Logger.Info("Received shutdown request")
break LOOP
case <-ctx.Done():
col.service.telemetry.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))
col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))

// Call shutdown with background context as the passed in context has been canceled
return col.shutdown(context.Background())
Expand All @@ -187,19 +187,14 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Config: cfg,
AsyncErrorChannel: col.asyncErrorChannel,
LoggingOptions: col.set.LoggingOptions,
telemetry: col.set.telemetry,
})
if err != nil {
return err
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetry.Logger, cfg.Service.Telemetry.Logs.Level)
}

// TODO: Move this to the service initialization.
// It is called once because that is how it is implemented using sync.Once.
if err = col.set.telemetry.init(col.service); err != nil {
return err
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand All @@ -217,7 +212,7 @@ func (col *Collector) Run(ctx context.Context) error {
return err
}

col.service.telemetry.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
col.service.telemetrySettings.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
zap.String("Version", col.set.BuildInfo.Version),
zap.Int("NumCPU", runtime.NumCPU()),
)
Expand All @@ -233,7 +228,7 @@ func (col *Collector) shutdown(ctx context.Context) error {
var errs error

// Begin shutdown sequence.
col.service.telemetry.Logger.Info("Starting shutdown...")
col.service.telemetrySettings.Logger.Info("Starting shutdown...")

if err := col.set.ConfigProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
Expand All @@ -243,7 +238,8 @@ func (col *Collector) shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown service: %w", err))
}

if err := col.set.telemetry.shutdown(); err != nil {
// TODO: Move this as part of the service shutdown.
if err := col.service.telemetryInitializer.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}

Expand Down
17 changes: 4 additions & 13 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func TestCollectorReportError(t *testing.T) {
}

func TestCollectorFailedShutdown(t *testing.T) {
t.Skip("This test was using telemetry shutdown failure, switch to use a component that errors on shutdown.")
factories, err := componenttest.NopFactories()
require.NoError(t, err)

Expand All @@ -143,7 +144,7 @@ func TestCollectorFailedShutdown(t *testing.T) {
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: &mockColTelemetry{},
telemetry: newColTelemetry(featuregate.NewRegistry()),
})
require.NoError(t, err)

Expand Down Expand Up @@ -265,7 +266,7 @@ func ownMetricsTestCases(version string) []ownMetricsTestCase {
}}
}

func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter, tc ownMetricsTestCase) {
func testCollectorStartHelper(t *testing.T, telemetry *telemetryInitializer, tc ownMetricsTestCase) {
factories, err := componenttest.NopFactories()
zpagesExt := zpagesextension.NewFactory()
factories.Extensions[zpagesExt.Type()] = zpagesExt
Expand All @@ -288,7 +289,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
// Set the metrics address to expose own metrics on.
"service::telemetry::metrics::address": metricsAddr,
}
// Also include resource attributes under the service.telemetry.resource key.
// Also include resource attributes under the service::telemetry::resource key.
for k, v := range tc.userDefinedResource {
extraCfgAsProps["service::telemetry::resource::"+k] = v
}
Expand Down Expand Up @@ -396,16 +397,6 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {
assert.Equal(t, Closed, col.GetState())
}

type mockColTelemetry struct{}

func (tel *mockColTelemetry) init(*service) error {
return nil
}

func (tel *mockColTelemetry) shutdown() error {
return errors.New("err1")
}

func assertMetrics(t *testing.T, metricsAddr string, expectedLabels map[string]labelValue) {
client := &http.Client{}
resp, err := client.Get("http://" + metricsAddr + "/metrics")
Expand Down
2 changes: 1 addition & 1 deletion service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestConfigValidate(t *testing.T) {
expected: nil,
},
{
name: "custom-service-telemetry-encoding",
name: "custom-service-telemetrySettings-encoding",
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Service.Telemetry.Logs.Encoding = "test_encoding"
Expand Down
35 changes: 25 additions & 10 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ import (
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/extensions"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry component.TelemetrySettings
host *serviceHost
buildInfo component.BuildInfo
config *Config
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
}

func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: component.TelemetrySettings{
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand All @@ -54,26 +58,37 @@ func newService(set *settings) (*service, error) {
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
},
telemetryInitializer: set.telemetry,
}

var err error
if srv.telemetry.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp

extensionsSettings := extensions.Settings{
Telemetry: srv.telemetry,
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Configs: srv.config.Extensions,
Factories: srv.host.factories.Extensions,
ServiceExtensions: srv.config.Service.Extensions,
}
if srv.host.extensions, err = extensions.Build(context.Background(), extensionsSettings); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
return nil, fmt.Errorf("failed build extensions: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetrySettings, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("failed build pipelines: %w", err)
}

if srv.host.pipelines, err = pipelines.Build(context.Background(), srv.telemetry, srv.buildInfo, srv.config, srv.host.factories); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}

return srv, nil
Expand Down
10 changes: 8 additions & 2 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/service/featuregate"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func TestService_GetFactory(t *testing.T) {
assert.Nil(t, srv.host.GetFactory(42, "nop"))
}

func TestService_GetExtensions(t *testing.T) {
func TestServiceGetExtensions(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -71,7 +72,7 @@ func TestService_GetExtensions(t *testing.T) {
assert.Contains(t, extMap, config.NewComponentID("nop"))
}

func TestService_GetExporters(t *testing.T) {
func TestServiceGetExporters(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)
srv := createExampleService(t, factories)
Expand All @@ -98,11 +99,16 @@ func createExampleService(t *testing.T, factories component.Factories) *service
cfg, err := configunmarshaler.New().Unmarshal(conf, factories)
require.NoError(t, err)

telemetry := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: cfg,
telemetry: telemetry,
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, telemetry.shutdown())
})
return srv
}
5 changes: 4 additions & 1 deletion service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type settings struct {

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option

// For testing purpose only.
telemetry *telemetryInitializer
}

// CollectorSettings holds configuration for creating a new Collector.
Expand Down Expand Up @@ -63,5 +66,5 @@ type CollectorSettings struct {
SkipSettingGRPCLogger bool

// For testing purpose only.
telemetry collectorTelemetryExporter
telemetry *telemetryInitializer
}
Loading

0 comments on commit d63aea3

Please sign in to comment.