Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush traces on shutdown #2882

Merged
merged 3 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions core/metrics/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ const pyroscopeEndpoint = internal.PyroscopeEndpoint
// baseHandler is a base metrics handler that implements the Handler interface.
// this is used to reduce the amount of boilerplate code needed to implement opentracing methods.
type baseHandler struct {
resource *resource.Resource
tp trace.TracerProvider
tracer trace.Tracer
name string
propagator propagation.TextMapPropagator
meter MeterProvider
resource *resource.Resource
// used only for shutdown, use tp for providing traces.
unwrappedTP *tracesdk.TracerProvider
tp trace.TracerProvider
tracer trace.Tracer
name string
propagator propagation.TextMapPropagator
meter MeterProvider
// handler is an integrated handler for everything exported over http. This includes prometheus
// or http-based sampling methods for other providers.
handler http.Handler
Expand Down Expand Up @@ -206,16 +208,17 @@ func newBaseHandler(buildInfo config.BuildInfo, extraOpts ...tracesdk.TracerProv
opts := append([]tracesdk.TracerProviderOption{tracesdk.WithResource(rsr)}, extraOpts...)

// TODO: add a way for users to pass in extra pyroscope options
tp := PyroscopeWrapTracerProvider(tracesdk.NewTracerProvider(opts...), buildInfo)
unwrappedTP := tracesdk.NewTracerProvider(opts...)
tp := PyroscopeWrapTracerProvider(unwrappedTP, buildInfo)
// will do nothing if not enabled.
StartPyroscope(buildInfo)

propagator := b3.New(b3.WithInjectEncoding(b3.B3MultipleHeader | b3.B3SingleHeader))
return newBaseHandlerWithTracerProvider(rsr, buildInfo, tp, propagator)
return newBaseHandlerWithTracerProvider(rsr, buildInfo, unwrappedTP, tp, propagator)
Comment on lines +211 to +217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of newBaseHandler and newBaseHandlerWithTracerProvider.

  1. Function Signature Changes: The addition of unwrappedTP as a parameter before other tracer-related parameters helps in segregating the shutdown-specific tracer provider from others. This is a strategic design choice to enhance modularity and clarity.
  2. Implementation Details: The function setups are quite complex, involving multiple OpenTelemetry configurations and custom logger setups. It's crucial that these configurations are thoroughly tested, especially since they involve external dependencies and environmental configurations.
  3. Error Handling: There is a TODO comment about handling errors which should be addressed to ensure robustness.

These changes are well thought out but require thorough testing and error handling strategies to be fully effective.

Would you like me to help with implementing the error handling or testing strategies mentioned in the TODO comments?

Also applies to: 243-243

}

// newBaseHandlerWithTracerProvider creates a new baseHandler for any opentelemtry tracer.
func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler {
func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, unwrappedTP *tracesdk.TracerProvider, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler {
// default tracer for server.
otel.SetTracerProvider(tracerProvider)
tracer := tracerProvider.Tracer(buildInfo.Name())
Expand All @@ -237,6 +240,7 @@ func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.B
// note: meter purposely is not registered until startup.
return &baseHandler{
resource: rsr,
unwrappedTP: unwrappedTP,
tp: tracerProvider,
tracer: tracer,
name: buildInfo.Name(),
Expand Down
31 changes: 31 additions & 0 deletions core/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"os"
"strings"
"time"
)

type otlpHandler struct {
Expand Down Expand Up @@ -51,13 +52,43 @@
return fmt.Errorf("could not start base handler: %w", err)
}

go func() {
handleShutdown(ctx, n.baseHandler.unwrappedTP)
}()

return nil
}

func (n *otlpHandler) Type() HandlerType {
return OTLP
}

// wait for the context to be canceled.
// then flush the traces and shutdown the exporter.
func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) {
<-ctx.Done()

const spanWaitTime = time.Millisecond
const shutdownAllowance = time.Second * 10

// allow only 10 seconds for graceful shutdown.
// we use without cancel to copy the parents values while making sure our derived context is not canceled.
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling: Typo: 'are' should be 'our'.

Suggested change
shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance)
// we use without cancel to copy the parents values while making sure our derived context is not canceled.

defer cancel()

// don't shutdown immediately, wait for a bit to allow the last spans to be sent. This is in process and should be aymptotic to instant.
time.Sleep(spanWaitTime)

err := provider.ForceFlush(shutdownCtx)
if err != nil {
logger.Warnf("could not flush traces: %v", err)
}

Check warning on line 85 in core/metrics/otlp.go

View check run for this annotation

Codecov / codecov/patch

core/metrics/otlp.go#L84-L85

Added lines #L84 - L85 were not covered by tests
err = provider.Shutdown(shutdownCtx)
if err != nil {
logger.Warnf("could not shutdown traces: %v", err)
}

Check warning on line 89 in core/metrics/otlp.go

View check run for this annotation

Codecov / codecov/patch

core/metrics/otlp.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}

const (
otlpTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT"
)
Expand Down
Loading