Skip to content

Commit

Permalink
Fixed issue where telemetryInitializer is not cleaned up when newServ…
Browse files Browse the repository at this point in the history
…ice errors (#6239)

* Fixed issue where telemetryInitializer is not cleaned up when newService errors

Signed-off-by: Corbin Phelps <[email protected]>

* Updated changelog

Signed-off-by: Corbin Phelps <[email protected]>

* Responding to PR feedback

Signed-off-by: Corbin Phelps <[email protected]>

* Fixed incorrect error check

Signed-off-by: Corbin Phelps <[email protected]>

* Restructured newService function to avoid defer cleanup and make things cleaner

Signed-off-by: Corbin Phelps <[email protected]>

* Removed extra newline

Signed-off-by: Corbin Phelps <[email protected]>

* Made suggested PR fixups

Signed-off-by: Corbin Phelps <[email protected]>

Signed-off-by: Corbin Phelps <[email protected]>
  • Loading branch information
Corbin Phelps authored Oct 7, 2022
1 parent 65a38a9 commit fca25f2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
- Add prometheus metric prefix and constant service attributes to Collector's own telemetry when using OpenTelemetry for internal telemetry (#6223)
- `exporter/logging`: Apply consistent rendering of map values (#6244)

### 🧰 Bug fixes 🧰

- Fixed bug where `telemetryInitializer` is not cleaned up when `newService` errors (#6239)

## v0.61.0 Beta

### 🛑 Breaking changes 🛑
Expand Down
73 changes: 44 additions & 29 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,36 +73,14 @@ func newService(set *settings) (*service, error) {
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp

extensionsSettings := extensions.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Configs: srv.config.Extensions,
Factories: srv.host.factories.Extensions,
}
if srv.host.extensions, err = extensions.New(context.Background(), extensionsSettings, srv.config.Service.Extensions); err != nil {
return nil, fmt.Errorf("failed build extensions: %w", err)
}

pipelinesSettings := pipelines.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
ReceiverFactories: srv.host.factories.Receivers,
ReceiverConfigs: srv.config.Receivers,
ProcessorFactories: srv.host.factories.Processors,
ProcessorConfigs: srv.config.Processors,
ExporterFactories: srv.host.factories.Exporters,
ExporterConfigs: srv.config.Exporters,
PipelineConfigs: srv.config.Service.Pipelines,
}
if srv.host.pipelines, err = pipelines.Build(context.Background(), pipelinesSettings); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
}

if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(set); err != nil {
// If pipeline initialization fails then shut down the telemetry server
if shutdownErr := srv.telemetryInitializer.shutdown(); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown collector telemetry: %w", shutdownErr))
}

return nil, err
}

return srv, nil
Expand Down Expand Up @@ -153,3 +131,40 @@ func (srv *service) Shutdown(ctx context.Context) error {
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.
return errs
}

func (srv *service) initExtensionsAndPipeline(set *settings) error {
var err error
extensionsSettings := extensions.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
Configs: srv.config.Extensions,
Factories: srv.host.factories.Extensions,
}
if srv.host.extensions, err = extensions.New(context.Background(), extensionsSettings, srv.config.Service.Extensions); err != nil {
return fmt.Errorf("failed build extensions: %w", err)
}

pipelinesSettings := pipelines.Settings{
Telemetry: srv.telemetrySettings,
BuildInfo: srv.buildInfo,
ReceiverFactories: srv.host.factories.Receivers,
ReceiverConfigs: srv.config.Receivers,
ProcessorFactories: srv.host.factories.Processors,
ProcessorConfigs: srv.config.Processors,
ExporterFactories: srv.host.factories.Exporters,
ExporterConfigs: srv.config.Exporters,
PipelineConfigs: srv.config.Service.Pipelines,
}
if srv.host.pipelines, err = pipelines.Build(context.Background(), pipelinesSettings); err != nil {
return fmt.Errorf("cannot build pipelines: %w", err)
}

if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}

return nil
}
47 changes: 47 additions & 0 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,53 @@ func TestServiceGetExporters(t *testing.T) {
assert.Contains(t, expMap[config.LogsDataType], config.NewComponentID("nop"))
}

// TestServiceTelemetryCleanupOnError tests that if newService errors due to an invalid config telemetry is cleaned up
// and another service with a valid config can be started right after.
func TestServiceTelemetryCleanupOnError(t *testing.T) {
factories, err := componenttest.NopFactories()
require.NoError(t, err)

// Read invalid yaml config from file
invalidConf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-invalid.yaml"))
require.NoError(t, err)
invalidCfg, err := configunmarshaler.New().Unmarshal(invalidConf, factories)
require.NoError(t, err)

// Read valid yaml config from file
validConf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-nop.yaml"))
require.NoError(t, err)
validCfg, err := configunmarshaler.New().Unmarshal(validConf, factories)
require.NoError(t, err)

// Create a service with an invalid config and expect an error
telemetryOne := newColTelemetry(featuregate.NewRegistry())
_, err = newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: invalidCfg,
telemetry: telemetryOne,
})
require.Error(t, err)

// Create a service with a valid config and expect no error
telemetryTwo := newColTelemetry(featuregate.NewRegistry())
srv, err := newService(&settings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
Config: validCfg,
telemetry: telemetryTwo,
})
require.NoError(t, err)

// For safety ensure everything is cleaned up
t.Cleanup(func() {
assert.NoError(t, telemetryOne.shutdown())
assert.NoError(t, telemetryTwo.shutdown())
assert.NoError(t, srv.Shutdown(context.Background()))
})

}

func createExampleService(t *testing.T, factories component.Factories) *service {
// Read yaml config from file
conf, err := confmaptest.LoadConf(filepath.Join("testdata", "otelcol-nop.yaml"))
Expand Down

0 comments on commit fca25f2

Please sign in to comment.