From 252c31744c3081b045312e7e27036884be6aabdf Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 15 Aug 2022 14:11:05 -0700 Subject: [PATCH] Move log statements to service when possible and cleaner (#5913) Signed-off-by: Bogdan Signed-off-by: Bogdan --- service/collector.go | 95 ++++++++++++++++++-------------------------- service/service.go | 17 +++++++- 2 files changed, 55 insertions(+), 57 deletions(-) diff --git a/service/collector.go b/service/collector.go index 18d3fb4c73f..1bde637760b 100644 --- a/service/collector.go +++ b/service/collector.go @@ -22,7 +22,6 @@ import ( "fmt" "os" "os/signal" - "runtime" "syscall" "go.uber.org/atomic" @@ -122,52 +121,6 @@ func (col *Collector) Shutdown() { } } -// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen. -func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error { - col.service.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.") - - // Only notify with SIGTERM and SIGINT if graceful shutdown is enabled. - if !col.set.DisableGracefulShutdown { - signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) - } - -LOOP: - for { - select { - case err := <-col.set.ConfigProvider.Watch(): - if err != nil { - col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err)) - break LOOP - } - - col.service.telemetrySettings.Logger.Warn("Config updated, restart service") - col.setCollectorState(Closing) - - if err = col.service.Shutdown(ctx); err != nil { - return fmt.Errorf("failed to shutdown the retiring config: %w", err) - } - if err = col.setupConfigurationComponents(ctx); err != nil { - return fmt.Errorf("failed to setup configuration components: %w", err) - } - case err := <-col.asyncErrorChannel: - col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err)) - break LOOP - case s := <-col.signalsChannel: - col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String())) - break LOOP - case <-col.shutdownChan: - col.service.telemetrySettings.Logger.Info("Received shutdown request") - break LOOP - case <-ctx.Done(): - col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err())) - - // Call shutdown with background context as the passed in context has been canceled - return col.shutdown(context.Background()) - } - } - return col.shutdown(ctx) -} - // setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it // sets the col.service with the service currently running. func (col *Collector) setupConfigurationComponents(ctx context.Context) error { @@ -209,13 +162,46 @@ func (col *Collector) Run(ctx context.Context) error { return err } - col.service.telemetrySettings.Logger.Info("Starting "+col.set.BuildInfo.Command+"...", - zap.String("Version", col.set.BuildInfo.Version), - zap.Int("NumCPU", runtime.NumCPU()), - ) + // Only notify with SIGTERM and SIGINT if graceful shutdown is enabled. + if !col.set.DisableGracefulShutdown { + signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM) + } - // Everything is ready, now run until an event requiring shutdown happens. - return col.runAndWaitForShutdownEvent(ctx) +LOOP: + for { + select { + case err := <-col.set.ConfigProvider.Watch(): + if err != nil { + col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err)) + break LOOP + } + + col.service.telemetrySettings.Logger.Warn("Config updated, restart service") + col.setCollectorState(Closing) + + if err = col.service.Shutdown(ctx); err != nil { + return fmt.Errorf("failed to shutdown the retiring config: %w", err) + } + if err = col.setupConfigurationComponents(ctx); err != nil { + return fmt.Errorf("failed to setup configuration components: %w", err) + } + case err := <-col.asyncErrorChannel: + col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err)) + break LOOP + case s := <-col.signalsChannel: + col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String())) + break LOOP + case <-col.shutdownChan: + col.service.telemetrySettings.Logger.Info("Received shutdown request") + break LOOP + case <-ctx.Done(): + col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err())) + + // Call shutdown with background context as the passed in context has been canceled + return col.shutdown(context.Background()) + } + } + return col.shutdown(ctx) } func (col *Collector) shutdown(ctx context.Context) error { @@ -224,9 +210,6 @@ func (col *Collector) shutdown(ctx context.Context) error { // Accumulate errors and proceed with shutting down remaining components. var errs error - // Begin shutdown sequence. - col.service.telemetrySettings.Logger.Info("Starting shutdown...") - if err := col.set.ConfigProvider.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err)) } diff --git a/service/service.go b/service/service.go index c4162cfd4db..bbcbfdf38ef 100644 --- a/service/service.go +++ b/service/service.go @@ -17,6 +17,7 @@ package service // import "go.opentelemetry.io/collector/service" import ( "context" "fmt" + "runtime" "go.opentelemetry.io/otel/metric" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -108,6 +109,11 @@ func newService(set *settings) (*service, error) { } func (srv *service) Start(ctx context.Context) error { + srv.telemetrySettings.Logger.Info("Starting "+srv.buildInfo.Command+"...", + zap.String("Version", srv.buildInfo.Version), + zap.Int("NumCPU", runtime.NumCPU()), + ) + if err := srv.host.extensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } @@ -116,13 +122,21 @@ func (srv *service) Start(ctx context.Context) error { return fmt.Errorf("cannot start pipelines: %w", err) } - return srv.host.extensions.NotifyPipelineReady() + if err := srv.host.extensions.NotifyPipelineReady(); err != nil { + return err + } + + srv.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.") + return nil } func (srv *service) Shutdown(ctx context.Context) error { // Accumulate errors and proceed with shutting down remaining components. var errs error + // Begin shutdown sequence. + srv.telemetrySettings.Logger.Info("Starting shutdown...") + if err := srv.host.extensions.NotifyPipelineNotReady(); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } @@ -135,6 +149,7 @@ func (srv *service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } + srv.telemetrySettings.Logger.Info("Shutdown complete.") // TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger. return errs }