Skip to content

Commit

Permalink
Adding Watchable and Closeable to ParserProvider interface (#2954)
Browse files Browse the repository at this point in the history
* Adding Watchable and Closeable to ParserProvider interface

* PR Feedback
  • Loading branch information
pjanotti authored Apr 17, 2021
1 parent 7b5b893 commit 90b2dec
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
36 changes: 34 additions & 2 deletions service/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package service

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand All @@ -32,6 +33,7 @@ import (
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configloader"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal/builder"
Expand Down Expand Up @@ -254,6 +256,24 @@ func (app *Application) setupConfigurationComponents(ctx context.Context) error
}

app.service = service

// If provider is watchable start a goroutine watching for updates.
if watchable, ok := app.parserProvider.(parserprovider.Watchable); ok {
go func() {
err := watchable.WatchForUpdate()
switch {
// TODO: Move configsource.ErrSessionClosed to providerparser package to avoid depending on configsource.
case errors.Is(err, configsource.ErrSessionClosed):
// This is the case of shutdown of the whole application, nothing to do.
app.logger.Info("Config WatchForUpdate closed", zap.Error(err))
return
default:
app.logger.Warn("Config WatchForUpdated exited", zap.Error(err))
app.reloadService(context.Background())
}
}()
}

return nil
}

Expand Down Expand Up @@ -291,6 +311,12 @@ func (app *Application) execute(ctx context.Context) error {
runtime.KeepAlive(ballast)
app.logger.Info("Starting shutdown...")

if closable, ok := app.parserProvider.(parserprovider.Closeable); ok {
if err := closable.Close(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to close config: %w", err))
}
}

if app.service != nil {
if err := app.service.Shutdown(ctx); err != nil {
errs = append(errs, fmt.Errorf("failed to shutdown service: %w", err))
Expand Down Expand Up @@ -319,10 +345,16 @@ func (app *Application) createMemoryBallast() ([]byte, uint64) {
return nil, 0
}

// updateService shutdowns the current app.service and setups a new one according
// reloadService shutdowns the current app.service and setups a new one according
// to the latest configuration. It requires that app.parserProvider and app.factories
// are properly populated to finish successfully.
func (app *Application) updateService(ctx context.Context) error {
func (app *Application) reloadService(ctx context.Context) error {
if closeable, ok := app.parserProvider.(parserprovider.Closeable); ok {
if err := closeable.Close(ctx); err != nil {
return fmt.Errorf("failed close current config provider: %w", err)
}
}

if app.service != nil {
retiringService := app.service
app.service = nil
Expand Down
6 changes: 3 additions & 3 deletions service/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestApplication_Start(t *testing.T) {
assertMetrics(t, testPrefix, metricsPort, mandatoryLabels)

// Trigger another configuration load.
require.NoError(t, app.updateService(context.Background()))
require.NoError(t, app.reloadService(context.Background()))
require.True(t, isAppAvailable(t, "http://"+healthCheckEndpoint))

app.signalsChannel <- syscall.SIGTERM
Expand Down Expand Up @@ -250,7 +250,7 @@ func (epl *errParserLoader) Get() (*config.Parser, error) {
return nil, epl.err
}

func TestApplication_updateService(t *testing.T) {
func TestApplication_reloadService(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)
ctx := context.Background()
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestApplication_updateService(t *testing.T) {
service: tt.service,
}

err := app.updateService(ctx)
err := app.reloadService(ctx)

if err != nil {
assert.ErrorIs(t, err, sentinelError)
Expand Down
18 changes: 17 additions & 1 deletion service/parserprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@

package parserprovider

import "go.opentelemetry.io/collector/config"
import (
"context"

"go.opentelemetry.io/collector/config"
)

// ParserProvider is an interface that helps providing configuration's parser.
// Implementations may load the parser from a file, a database or any other source.
type ParserProvider interface {
// Get returns the config.Parser if succeed or error otherwise.
Get() (*config.Parser, error)
}

// Watchable is an extension for ParserProvider that is implemented if the given provider
// supports monitoring for configuration updates.
type Watchable interface {
// WatchForUpdate is used to monitor for updates on the retrieved value.
WatchForUpdate() error
}

// Closeable is an extension interface for ParserProvider that should be added if they need to be closed.
type Closeable interface {
Close(ctx context.Context) error
}

0 comments on commit 90b2dec

Please sign in to comment.