Skip to content

Commit

Permalink
Merge pull request #2882 from synapsecns/fix/trace-flush
Browse files Browse the repository at this point in the history
flush traces on shutdown
  • Loading branch information
trajan0x authored Jul 17, 2024
2 parents 92b5efa + 1397f8f commit ca253cf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
22 changes: 13 additions & 9 deletions core/metrics/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ const pyroscopeEndpoint = internal.PyroscopeEndpoint
// baseHandler is a base metrics handler that implements the Handler interface.
// this is used to reduce the amount of boilerplate code needed to implement opentracing methods.
type baseHandler struct {
resource *resource.Resource
tp trace.TracerProvider
tracer trace.Tracer
name string
propagator propagation.TextMapPropagator
meter MeterProvider
resource *resource.Resource
// used only for shutdown, use tp for providing traces.
unwrappedTP *tracesdk.TracerProvider
tp trace.TracerProvider
tracer trace.Tracer
name string
propagator propagation.TextMapPropagator
meter MeterProvider
// handler is an integrated handler for everything exported over http. This includes prometheus
// or http-based sampling methods for other providers.
handler http.Handler
Expand Down Expand Up @@ -206,16 +208,17 @@ func newBaseHandler(buildInfo config.BuildInfo, extraOpts ...tracesdk.TracerProv
opts := append([]tracesdk.TracerProviderOption{tracesdk.WithResource(rsr)}, extraOpts...)

// TODO: add a way for users to pass in extra pyroscope options
tp := PyroscopeWrapTracerProvider(tracesdk.NewTracerProvider(opts...), buildInfo)
unwrappedTP := tracesdk.NewTracerProvider(opts...)
tp := PyroscopeWrapTracerProvider(unwrappedTP, buildInfo)
// will do nothing if not enabled.
StartPyroscope(buildInfo)

propagator := b3.New(b3.WithInjectEncoding(b3.B3MultipleHeader | b3.B3SingleHeader))
return newBaseHandlerWithTracerProvider(rsr, buildInfo, tp, propagator)
return newBaseHandlerWithTracerProvider(rsr, buildInfo, unwrappedTP, tp, propagator)
}

// newBaseHandlerWithTracerProvider creates a new baseHandler for any opentelemtry tracer.
func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler {
func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, unwrappedTP *tracesdk.TracerProvider, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler {
// default tracer for server.
otel.SetTracerProvider(tracerProvider)
tracer := tracerProvider.Tracer(buildInfo.Name())
Expand All @@ -237,6 +240,7 @@ func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.B
// note: meter purposely is not registered until startup.
return &baseHandler{
resource: rsr,
unwrappedTP: unwrappedTP,
tp: tracerProvider,
tracer: tracer,
name: buildInfo.Name(),
Expand Down
31 changes: 31 additions & 0 deletions core/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"os"
"strings"
"time"
)

type otlpHandler struct {
Expand Down Expand Up @@ -51,13 +52,43 @@ func (n *otlpHandler) Start(ctx context.Context) (err error) {
return fmt.Errorf("could not start base handler: %w", err)
}

go func() {
handleShutdown(ctx, n.baseHandler.unwrappedTP)
}()

return nil
}

func (n *otlpHandler) Type() HandlerType {
return OTLP
}

// wait for the context to be canceled.
// then flush the traces and shutdown the exporter.
func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) {
<-ctx.Done()

const spanWaitTime = time.Millisecond
const shutdownAllowance = time.Second * 10

// allow only 10 seconds for graceful shutdown.
// we use without cancel to copy the parents values while making sure our derived context is not canceled.
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance)
defer cancel()

// don't shutdown immediately, wait for a bit to allow the last spans to be sent. This is in process and should be aymptotic to instant.
time.Sleep(spanWaitTime)

err := provider.ForceFlush(shutdownCtx)
if err != nil {
logger.Warnf("could not flush traces: %v", err)
}
err = provider.Shutdown(shutdownCtx)
if err != nil {
logger.Warnf("could not shutdown traces: %v", err)
}
}

const (
otlpTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT"
)
Expand Down

0 comments on commit ca253cf

Please sign in to comment.