diff --git a/service/application.go b/service/application.go index b4b44a37336..1ed7a1f07bb 100644 --- a/service/application.go +++ b/service/application.go @@ -18,6 +18,7 @@ package service import ( "context" + "errors" "flag" "fmt" "os" @@ -32,6 +33,7 @@ import ( "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configloader" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/config/experimental/configsource" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/collector/telemetry" "go.opentelemetry.io/collector/service/internal/builder" @@ -254,6 +256,24 @@ func (app *Application) setupConfigurationComponents(ctx context.Context) error } app.service = service + + // If provider is watchable start a goroutine watching for updates. + if watchable, ok := app.parserProvider.(parserprovider.Watchable); ok { + go func() { + err := watchable.WatchForUpdate() + switch { + // TODO: Move configsource.ErrSessionClosed to providerparser package to avoid depending on configsource. + case errors.Is(err, configsource.ErrSessionClosed): + // This is the case of shutdown of the whole application, nothing to do. + app.logger.Info("Config WatchForUpdate closed", zap.Error(err)) + return + default: + app.logger.Warn("Config WatchForUpdated exited", zap.Error(err)) + app.reloadService(context.Background()) + } + }() + } + return nil } @@ -291,6 +311,12 @@ func (app *Application) execute(ctx context.Context) error { runtime.KeepAlive(ballast) app.logger.Info("Starting shutdown...") + if closable, ok := app.parserProvider.(parserprovider.Closeable); ok { + if err := closable.Close(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to close config: %w", err)) + } + } + if app.service != nil { if err := app.service.Shutdown(ctx); err != nil { errs = append(errs, fmt.Errorf("failed to shutdown service: %w", err)) @@ -319,10 +345,16 @@ func (app *Application) createMemoryBallast() ([]byte, uint64) { return nil, 0 } -// updateService shutdowns the current app.service and setups a new one according +// reloadService shutdowns the current app.service and setups a new one according // to the latest configuration. It requires that app.parserProvider and app.factories // are properly populated to finish successfully. -func (app *Application) updateService(ctx context.Context) error { +func (app *Application) reloadService(ctx context.Context) error { + if closeable, ok := app.parserProvider.(parserprovider.Closeable); ok { + if err := closeable.Close(ctx); err != nil { + return fmt.Errorf("failed close current config provider: %w", err) + } + } + if app.service != nil { retiringService := app.service app.service = nil diff --git a/service/application_test.go b/service/application_test.go index dab4550adf9..320d0f47a0d 100644 --- a/service/application_test.go +++ b/service/application_test.go @@ -89,7 +89,7 @@ func TestApplication_Start(t *testing.T) { assertMetrics(t, testPrefix, metricsPort, mandatoryLabels) // Trigger another configuration load. - require.NoError(t, app.updateService(context.Background())) + require.NoError(t, app.reloadService(context.Background())) require.True(t, isAppAvailable(t, "http://"+healthCheckEndpoint)) app.signalsChannel <- syscall.SIGTERM @@ -250,7 +250,7 @@ func (epl *errParserLoader) Get() (*config.Parser, error) { return nil, epl.err } -func TestApplication_updateService(t *testing.T) { +func TestApplication_reloadService(t *testing.T) { factories, err := defaultcomponents.Components() require.NoError(t, err) ctx := context.Background() @@ -304,7 +304,7 @@ func TestApplication_updateService(t *testing.T) { service: tt.service, } - err := app.updateService(ctx) + err := app.reloadService(ctx) if err != nil { assert.ErrorIs(t, err, sentinelError) diff --git a/service/parserprovider/provider.go b/service/parserprovider/provider.go index 303e79f7758..b296e2a2258 100644 --- a/service/parserprovider/provider.go +++ b/service/parserprovider/provider.go @@ -14,7 +14,11 @@ package parserprovider -import "go.opentelemetry.io/collector/config" +import ( + "context" + + "go.opentelemetry.io/collector/config" +) // ParserProvider is an interface that helps providing configuration's parser. // Implementations may load the parser from a file, a database or any other source. @@ -22,3 +26,15 @@ type ParserProvider interface { // Get returns the config.Parser if succeed or error otherwise. Get() (*config.Parser, error) } + +// Watchable is an extension for ParserProvider that is implemented if the given provider +// supports monitoring for configuration updates. +type Watchable interface { + // WatchForUpdate is used to monitor for updates on the retrieved value. + WatchForUpdate() error +} + +// Closeable is an extension interface for ParserProvider that should be added if they need to be closed. +type Closeable interface { + Close(ctx context.Context) error +}