From d1328bb0a1cc8125197fec49422b0249030329fa Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Wed, 17 Jul 2024 13:53:09 -0400 Subject: [PATCH 1/3] flush --- core/metrics/base.go | 22 +++++++++++++--------- core/metrics/otlp.go | 27 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/core/metrics/base.go b/core/metrics/base.go index 38f01be5bd..e69c674e2d 100644 --- a/core/metrics/base.go +++ b/core/metrics/base.go @@ -37,12 +37,14 @@ const pyroscopeEndpoint = internal.PyroscopeEndpoint // baseHandler is a base metrics handler that implements the Handler interface. // this is used to reduce the amount of boilerplate code needed to implement opentracing methods. type baseHandler struct { - resource *resource.Resource - tp trace.TracerProvider - tracer trace.Tracer - name string - propagator propagation.TextMapPropagator - meter MeterProvider + resource *resource.Resource + // used only for shutdown, use tp for providing traces. + unwrappedTP *tracesdk.TracerProvider + tp trace.TracerProvider + tracer trace.Tracer + name string + propagator propagation.TextMapPropagator + meter MeterProvider // handler is an integrated handler for everything exported over http. This includes prometheus // or http-based sampling methods for other providers. handler http.Handler @@ -206,16 +208,17 @@ func newBaseHandler(buildInfo config.BuildInfo, extraOpts ...tracesdk.TracerProv opts := append([]tracesdk.TracerProviderOption{tracesdk.WithResource(rsr)}, extraOpts...) // TODO: add a way for users to pass in extra pyroscope options - tp := PyroscopeWrapTracerProvider(tracesdk.NewTracerProvider(opts...), buildInfo) + unwrappedTP := tracesdk.NewTracerProvider(opts...) + tp := PyroscopeWrapTracerProvider(unwrappedTP, buildInfo) // will do nothing if not enabled. StartPyroscope(buildInfo) propagator := b3.New(b3.WithInjectEncoding(b3.B3MultipleHeader | b3.B3SingleHeader)) - return newBaseHandlerWithTracerProvider(rsr, buildInfo, tp, propagator) + return newBaseHandlerWithTracerProvider(rsr, buildInfo, unwrappedTP, tp, propagator) } // newBaseHandlerWithTracerProvider creates a new baseHandler for any opentelemtry tracer. -func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler { +func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.BuildInfo, unwrappedTP *tracesdk.TracerProvider, tracerProvider trace.TracerProvider, propagator propagation.TextMapPropagator) *baseHandler { // default tracer for server. otel.SetTracerProvider(tracerProvider) tracer := tracerProvider.Tracer(buildInfo.Name()) @@ -237,6 +240,7 @@ func newBaseHandlerWithTracerProvider(rsr *resource.Resource, buildInfo config.B // note: meter purposely is not registered until startup. return &baseHandler{ resource: rsr, + unwrappedTP: unwrappedTP, tp: tracerProvider, tracer: tracer, name: buildInfo.Name(), diff --git a/core/metrics/otlp.go b/core/metrics/otlp.go index a55bd8a4ad..9db1c57a68 100644 --- a/core/metrics/otlp.go +++ b/core/metrics/otlp.go @@ -11,6 +11,7 @@ import ( tracesdk "go.opentelemetry.io/otel/sdk/trace" "os" "strings" + "time" ) type otlpHandler struct { @@ -51,6 +52,10 @@ func (n *otlpHandler) Start(ctx context.Context) (err error) { return fmt.Errorf("could not start base handler: %w", err) } + go func() { + handleShutdown(ctx, n.baseHandler.unwrappedTP) + }() + return nil } @@ -58,6 +63,28 @@ func (n *otlpHandler) Type() HandlerType { return OTLP } +// wait for the context to be canceled. +// then flush the traces and shutdown the exporter. +func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) { + <-ctx.Done() + + const shutdownAllowance = time.Second * 10 + + // allow only 10 seconds for graceful shutdown. + // we use without cancel to copy the parents values while making sure are derived context is not canceled. + shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance) + defer cancel() + + err := provider.ForceFlush(shutdownCtx) + if err != nil { + logger.Warnf("could not flush traces: %v", err) + } + err = provider.Shutdown(shutdownCtx) + if err != nil { + logger.Warnf("could not shutdown traces: %v", err) + } +} + const ( otlpTransportEnv = "OTEL_EXPORTER_OTLP_TRANSPORT" ) From 80b8e29370725b78fb1e02a64cb00abd63982c0d Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Wed, 17 Jul 2024 13:55:57 -0400 Subject: [PATCH 2/3] [goreleaser] fix https://github.com/synapsecns/sanguine/pull/2882#discussion_r1681471161 --- core/metrics/otlp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/metrics/otlp.go b/core/metrics/otlp.go index 9db1c57a68..1057be5ea7 100644 --- a/core/metrics/otlp.go +++ b/core/metrics/otlp.go @@ -71,7 +71,7 @@ func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) { const shutdownAllowance = time.Second * 10 // allow only 10 seconds for graceful shutdown. - // we use without cancel to copy the parents values while making sure are derived context is not canceled. + // we use without cancel to copy the parents values while making sure our derived context is not canceled. shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance) defer cancel() From 1397f8f8f146a512bfb2893e1b59ea54a1c667c0 Mon Sep 17 00:00:00 2001 From: Trajan0x Date: Wed, 17 Jul 2024 13:59:44 -0400 Subject: [PATCH 3/3] add a tiny span wait time to allow flushes to be reported --- core/metrics/otlp.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/metrics/otlp.go b/core/metrics/otlp.go index 1057be5ea7..f96f06dd43 100644 --- a/core/metrics/otlp.go +++ b/core/metrics/otlp.go @@ -68,6 +68,7 @@ func (n *otlpHandler) Type() HandlerType { func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) { <-ctx.Done() + const spanWaitTime = time.Millisecond const shutdownAllowance = time.Second * 10 // allow only 10 seconds for graceful shutdown. @@ -75,6 +76,9 @@ func handleShutdown(ctx context.Context, provider *tracesdk.TracerProvider) { shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), shutdownAllowance) defer cancel() + // don't shutdown immediately, wait for a bit to allow the last spans to be sent. This is in process and should be aymptotic to instant. + time.Sleep(spanWaitTime) + err := provider.ForceFlush(shutdownCtx) if err != nil { logger.Warnf("could not flush traces: %v", err)