Skip to content

Commit

Permalink
Move log statements to service when possible and cleaner (#5913)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan <[email protected]>

Signed-off-by: Bogdan <[email protected]>
  • Loading branch information
bogdandrutu authored Aug 15, 2022
1 parent a56622d commit 252c317
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 57 deletions.
95 changes: 39 additions & 56 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"os"
"os/signal"
"runtime"
"syscall"

"go.uber.org/atomic"
Expand Down Expand Up @@ -122,52 +121,6 @@ func (col *Collector) Shutdown() {
}
}

// runAndWaitForShutdownEvent waits for one of the shutdown events that can happen.
func (col *Collector) runAndWaitForShutdownEvent(ctx context.Context) error {
col.service.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")

// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
if !col.set.DisableGracefulShutdown {
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
}

LOOP:
for {
select {
case err := <-col.set.ConfigProvider.Watch():
if err != nil {
col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err))
break LOOP
}

col.service.telemetrySettings.Logger.Warn("Config updated, restart service")
col.setCollectorState(Closing)

if err = col.service.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown the retiring config: %w", err)
}
if err = col.setupConfigurationComponents(ctx); err != nil {
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
break LOOP
case s := <-col.signalsChannel:
col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
break LOOP
case <-col.shutdownChan:
col.service.telemetrySettings.Logger.Info("Received shutdown request")
break LOOP
case <-ctx.Done():
col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))

// Call shutdown with background context as the passed in context has been canceled
return col.shutdown(context.Background())
}
}
return col.shutdown(ctx)
}

// setupConfigurationComponents loads the config and starts the components. If all the steps succeeds it
// sets the col.service with the service currently running.
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
Expand Down Expand Up @@ -209,13 +162,46 @@ func (col *Collector) Run(ctx context.Context) error {
return err
}

col.service.telemetrySettings.Logger.Info("Starting "+col.set.BuildInfo.Command+"...",
zap.String("Version", col.set.BuildInfo.Version),
zap.Int("NumCPU", runtime.NumCPU()),
)
// Only notify with SIGTERM and SIGINT if graceful shutdown is enabled.
if !col.set.DisableGracefulShutdown {
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
}

// Everything is ready, now run until an event requiring shutdown happens.
return col.runAndWaitForShutdownEvent(ctx)
LOOP:
for {
select {
case err := <-col.set.ConfigProvider.Watch():
if err != nil {
col.service.telemetrySettings.Logger.Error("Config watch failed", zap.Error(err))
break LOOP
}

col.service.telemetrySettings.Logger.Warn("Config updated, restart service")
col.setCollectorState(Closing)

if err = col.service.Shutdown(ctx); err != nil {
return fmt.Errorf("failed to shutdown the retiring config: %w", err)
}
if err = col.setupConfigurationComponents(ctx); err != nil {
return fmt.Errorf("failed to setup configuration components: %w", err)
}
case err := <-col.asyncErrorChannel:
col.service.telemetrySettings.Logger.Error("Asynchronous error received, terminating process", zap.Error(err))
break LOOP
case s := <-col.signalsChannel:
col.service.telemetrySettings.Logger.Info("Received signal from OS", zap.String("signal", s.String()))
break LOOP
case <-col.shutdownChan:
col.service.telemetrySettings.Logger.Info("Received shutdown request")
break LOOP
case <-ctx.Done():
col.service.telemetrySettings.Logger.Info("Context done, terminating process", zap.Error(ctx.Err()))

// Call shutdown with background context as the passed in context has been canceled
return col.shutdown(context.Background())
}
}
return col.shutdown(ctx)
}

func (col *Collector) shutdown(ctx context.Context) error {
Expand All @@ -224,9 +210,6 @@ func (col *Collector) shutdown(ctx context.Context) error {
// Accumulate errors and proceed with shutting down remaining components.
var errs error

// Begin shutdown sequence.
col.service.telemetrySettings.Logger.Info("Starting shutdown...")

if err := col.set.ConfigProvider.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown config provider: %w", err))
}
Expand Down
17 changes: 16 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package service // import "go.opentelemetry.io/collector/service"
import (
"context"
"fmt"
"runtime"

"go.opentelemetry.io/otel/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -108,6 +109,11 @@ func newService(set *settings) (*service, error) {
}

func (srv *service) Start(ctx context.Context) error {
srv.telemetrySettings.Logger.Info("Starting "+srv.buildInfo.Command+"...",
zap.String("Version", srv.buildInfo.Version),
zap.Int("NumCPU", runtime.NumCPU()),
)

if err := srv.host.extensions.Start(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}
Expand All @@ -116,13 +122,21 @@ func (srv *service) Start(ctx context.Context) error {
return fmt.Errorf("cannot start pipelines: %w", err)
}

return srv.host.extensions.NotifyPipelineReady()
if err := srv.host.extensions.NotifyPipelineReady(); err != nil {
return err
}

srv.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")
return nil
}

func (srv *service) Shutdown(ctx context.Context) error {
// Accumulate errors and proceed with shutting down remaining components.
var errs error

// Begin shutdown sequence.
srv.telemetrySettings.Logger.Info("Starting shutdown...")

if err := srv.host.extensions.NotifyPipelineNotReady(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}
Expand All @@ -135,6 +149,7 @@ func (srv *service) Shutdown(ctx context.Context) error {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

srv.telemetrySettings.Logger.Info("Shutdown complete.")
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.
return errs
}

0 comments on commit 252c317

Please sign in to comment.