From fca25f20878b608e9a5b8e15643273085cb928f8 Mon Sep 17 00:00:00 2001 From: Corbin Phelps Date: Fri, 7 Oct 2022 15:04:24 -0400 Subject: [PATCH] Fixed issue where telemetryInitializer is not cleaned up when newService errors (#6239) * Fixed issue where telemetryInitializer is not cleaned up when newService errors Signed-off-by: Corbin Phelps * Updated changelog Signed-off-by: Corbin Phelps * Responding to PR feedback Signed-off-by: Corbin Phelps * Fixed incorrect error check Signed-off-by: Corbin Phelps * Restructured newService function to avoid defer cleanup and make things cleaner Signed-off-by: Corbin Phelps * Removed extra newline Signed-off-by: Corbin Phelps * Made suggested PR fixups Signed-off-by: Corbin Phelps Signed-off-by: Corbin Phelps --- CHANGELOG.md | 4 +++ service/service.go | 73 +++++++++++++++++++++++++---------------- service/service_test.go | 47 ++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02b196e5e5a..8d967c9ccae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,10 @@ - Add prometheus metric prefix and constant service attributes to Collector's own telemetry when using OpenTelemetry for internal telemetry (#6223) - `exporter/logging`: Apply consistent rendering of map values (#6244) +### 🧰 Bug fixes 🧰 + +- Fixed bug where `telemetryInitializer` is not cleaned up when `newService` errors (#6239) + ## v0.61.0 Beta ### 🛑 Breaking changes 🛑 diff --git a/service/service.go b/service/service.go index bbcbfdf38ef..6304fc472f2 100644 --- a/service/service.go +++ b/service/service.go @@ -73,36 +73,14 @@ func newService(set *settings) (*service, error) { } srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp - extensionsSettings := extensions.Settings{ - Telemetry: srv.telemetrySettings, - BuildInfo: srv.buildInfo, - Configs: srv.config.Extensions, - Factories: srv.host.factories.Extensions, - } - if srv.host.extensions, err = extensions.New(context.Background(), extensionsSettings, srv.config.Service.Extensions); err != nil { - return nil, fmt.Errorf("failed build extensions: %w", err) - } - - pipelinesSettings := pipelines.Settings{ - Telemetry: srv.telemetrySettings, - BuildInfo: srv.buildInfo, - ReceiverFactories: srv.host.factories.Receivers, - ReceiverConfigs: srv.config.Receivers, - ProcessorFactories: srv.host.factories.Processors, - ProcessorConfigs: srv.config.Processors, - ExporterFactories: srv.host.factories.Exporters, - ExporterConfigs: srv.config.Exporters, - PipelineConfigs: srv.config.Service.Pipelines, - } - if srv.host.pipelines, err = pipelines.Build(context.Background(), pipelinesSettings); err != nil { - return nil, fmt.Errorf("cannot build pipelines: %w", err) - } - - if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" { - // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { - return nil, fmt.Errorf("failed to register process metrics: %w", err) + // process the configuration and initialize the pipeline + if err = srv.initExtensionsAndPipeline(set); err != nil { + // If pipeline initialization fails then shut down the telemetry server + if shutdownErr := srv.telemetryInitializer.shutdown(); shutdownErr != nil { + err = multierr.Append(err, fmt.Errorf("failed to shutdown collector telemetry: %w", shutdownErr)) } + + return nil, err } return srv, nil @@ -153,3 +131,40 @@ func (srv *service) Shutdown(ctx context.Context) error { // TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger. return errs } + +func (srv *service) initExtensionsAndPipeline(set *settings) error { + var err error + extensionsSettings := extensions.Settings{ + Telemetry: srv.telemetrySettings, + BuildInfo: srv.buildInfo, + Configs: srv.config.Extensions, + Factories: srv.host.factories.Extensions, + } + if srv.host.extensions, err = extensions.New(context.Background(), extensionsSettings, srv.config.Service.Extensions); err != nil { + return fmt.Errorf("failed build extensions: %w", err) + } + + pipelinesSettings := pipelines.Settings{ + Telemetry: srv.telemetrySettings, + BuildInfo: srv.buildInfo, + ReceiverFactories: srv.host.factories.Receivers, + ReceiverConfigs: srv.config.Receivers, + ProcessorFactories: srv.host.factories.Processors, + ProcessorConfigs: srv.config.Processors, + ExporterFactories: srv.host.factories.Exporters, + ExporterConfigs: srv.config.Exporters, + PipelineConfigs: srv.config.Service.Pipelines, + } + if srv.host.pipelines, err = pipelines.Build(context.Background(), pipelinesSettings); err != nil { + return fmt.Errorf("cannot build pipelines: %w", err) + } + + if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" { + // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. + if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { + return fmt.Errorf("failed to register process metrics: %w", err) + } + } + + return nil +} diff --git a/service/service_test.go b/service/service_test.go index c5dcfb46c8f..2074a526a3b 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -92,6 +92,53 @@ func TestServiceGetExporters(t *testing.T) { assert.Contains(t, expMap[config.LogsDataType], config.NewComponentID("nop")) } +// TestServiceTelemetryCleanupOnError tests that if newService errors due to an invalid config telemetry is cleaned up +// and another service with a valid config can be started right after. +func TestServiceTelemetryCleanupOnError(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + + // Read invalid yaml config from file + invalidConf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-invalid.yaml")) + require.NoError(t, err) + invalidCfg, err := configunmarshaler.New().Unmarshal(invalidConf, factories) + require.NoError(t, err) + + // Read valid yaml config from file + validConf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-nop.yaml")) + require.NoError(t, err) + validCfg, err := configunmarshaler.New().Unmarshal(validConf, factories) + require.NoError(t, err) + + // Create a service with an invalid config and expect an error + telemetryOne := newColTelemetry(featuregate.NewRegistry()) + _, err = newService(&settings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + Config: invalidCfg, + telemetry: telemetryOne, + }) + require.Error(t, err) + + // Create a service with a valid config and expect no error + telemetryTwo := newColTelemetry(featuregate.NewRegistry()) + srv, err := newService(&settings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + Config: validCfg, + telemetry: telemetryTwo, + }) + require.NoError(t, err) + + // For safety ensure everything is cleaned up + t.Cleanup(func() { + assert.NoError(t, telemetryOne.shutdown()) + assert.NoError(t, telemetryTwo.shutdown()) + assert.NoError(t, srv.Shutdown(context.Background())) + }) + +} + func createExampleService(t *testing.T, factories component.Factories) *service { // Read yaml config from file conf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-nop.yaml"))