Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Jaeger SDK with OTEL SDK + OT Bridge #4574

Merged
merged 37 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
29a75e9
adds OTLP to jtracer
afzal442 Jul 7, 2023
9dbdbe0
replace ot with otlp
afzal442 Jul 7, 2023
f739427
made singleton otel tracer
afzal442 Jul 8, 2023
d4ce5cf
subjugate initialise method to support both tracer
afzal442 Jul 9, 2023
642d772
adds shudown method to tp
afzal442 Jul 9, 2023
8425410
creates tracer in the parent method
afzal442 Jul 9, 2023
6bcab4b
transform gRPC OT to OTEL
sbin64 Jul 11, 2023
ce29456
update http_handler test to support OTEL sdktrace
sbin64 Jul 12, 2023
d429861
updates jaeger tracer to shutdown in the main call
afzal442 Jul 13, 2023
d705dad
adds gRPC server conn
afzal442 Jul 13, 2023
29824ce
adds logger to jtracer and adjsted the tp.shutdown
afzal442 Jul 13, 2023
2018857
adds Close method to jtracer pkg
afzal442 Jul 13, 2023
b54e872
enscapulate err field in jtracer
afzal442 Jul 14, 2023
fd6d707
reform tracer in cmd app
afzal442 Jul 14, 2023
b01349e
modifies jtracer in cmd/app
afzal442 Jul 14, 2023
9447bcf
moves the tracer to main
afzal442 Jul 15, 2023
68ec355
undo reader file changes
afzal442 Jul 15, 2023
a2eb44a
reformats the jtracer pkg
afzal442 Jul 15, 2023
9b2be20
modifies svc name
afzal442 Jul 17, 2023
9cb1de5
Merge remote-tracking branch 'upstream/main' into add-otel-tp-jtracer
afzal442 Jul 17, 2023
2e03743
adds middleware for tracer response
afzal442 Jul 17, 2023
056b81d
adds trace-id url
afzal442 Jul 17, 2023
7e157eb
reverts prob value
afzal442 Jul 17, 2023
8e96af8
minor modification around pointers and versions
afzal442 Jul 18, 2023
88d5f25
pass OTEL tracer into the tracer method
afzal442 Jul 19, 2023
55918b4
excludes pkg from testing
afzal442 Jul 19, 2023
f26e1b2
rmvs jtracer_test file
afzal442 Jul 20, 2023
0eba87a
modifies .nocover
afzal442 Jul 20, 2023
3600c09
makes minor modification around *jtracer pointer
afzal442 Jul 20, 2023
7f81f62
updates pointer receivers for jtracer
afzal442 Jul 20, 2023
eab6735
rmv pointer dereferrencing
afzal442 Jul 20, 2023
e419417
replces with jaeger-all-in-one
afzal442 Jul 20, 2023
22fb80a
updates the log err
afzal442 Jul 20, 2023
43e5032
updates logger to log err jaeger-query
afzal442 Jul 20, 2023
c64e90c
simplifies the traceRes obj
afzal442 Jul 20, 2023
c2960e3
adds defer to resp from Do()
afzal442 Jul 20, 2023
f905480
rmvs OT tracer
afzal442 Jul 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cmd/all-in-one/all_in_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -46,12 +47,13 @@ const (
agentURL = "http://" + agentHostPort

getServicesURL = queryURL + "/api/services"
getTraceURL = queryURL + "/api/traces?service=jaeger-query&tag=jaeger-debug-id:debug"
getSamplingStrategyURL = agentURL + "/sampling?service=whatever"

getServicesAPIV3URL = queryURL + "/api/v3/services"
)

var getTraceURL = queryURL + "/api/traces/"
afzal442 marked this conversation as resolved.
Show resolved Hide resolved

var httpClient = &http.Client{
Timeout: time.Second,
}
Expand All @@ -70,10 +72,14 @@ func TestAllInOne(t *testing.T) {
func createTrace(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, getServicesURL, nil)
require.NoError(t, err)
req.Header.Add("jaeger-debug-id", "debug")

resp, err := httpClient.Do(req)
require.NoError(t, err)
traceResponse := resp.Header.Get("traceresponse")
parts := strings.Split(traceResponse, "-")
require.Len(t, parts, 4) // [version] [trace-id] [child-id] [trace-flags]
traceID := parts[1]
getTraceURL += traceID
resp.Body.Close()
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -157,5 +163,5 @@ func getServicesAPIV3(t *testing.T) {
jsonpb := runtime.JSONPb{}
err = jsonpb.Unmarshal(body, &servicesResponse)
require.NoError(t, err)
assert.Equal(t, []string{"jaeger-query"}, servicesResponse.GetServices())
assert.Equal(t, []string{"jaeger-all-in-one"}, servicesResponse.GetServices())
}
50 changes: 14 additions & 36 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
package main

import (
"context"
"fmt"
"io"
"log"
"os"

"github.com/opentracing/opentracing-go"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
jaegerClientZapLog "github.com/uber/jaeger-client-go/log/zap"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand All @@ -43,7 +41,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/internal/metrics/expvar"
"github.com/jaegertracing/jaeger/internal/metrics/fork"
"github.com/jaegertracing/jaeger/internal/metrics/jlibadapter"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -103,7 +100,10 @@ by default uses only in-memory database.`,
svc.MetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger"}))
version.NewInfoMetrics(metricsFactory)

tracerCloser := initTracer(svc)
tracer, err := jtracer.New("jaeger-all-in-one")
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I wonder why we don't annotate and return the error here instead of logging it.

Copy link
Member

@yurishkuro yurishkuro Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mostly because if we return an error, it is intercepted at the top level of main like this:

	if err := command.Execute(); err != nil {
		log.Fatal(err)
	}

Here the zapp logger is not available so the stdlib log is used, which is formatted differently. E.g.:

$ go run ./cmd/all-in-one --admin.http.tls.enabled=true --admin.http.tls.key=invalid
. . .
2023/07/20 11:43:08 cannot initialize admin server: failed to watch key pair invalid and : open invalid: no such file or directory
exit status 1

With zapp logs you get line number and other nice things. E.g.

$ go run ./cmd/all-in-one --reporter.grpc.tls.enabled=true --reporter.grpc.tls.key=nopath
. . .
{"level":"fatal","ts":1689868046.063343,"caller":"all-in-one/main.go:192","msg":"Could not create collector proxy","error":"failed to load TLS config: failed to watch key pair nopath and : open nopath: no such file or directory","stacktrace":"main.main.func1\n\t/Users/ysh/dev/jaegertracing/jaeger/cmd/all-in-one/main.go:192\ngithub.com/spf13/cobra.(*Command).execute\n\t/Users/ysh/golang/pkg/mod/github.com/spf13/[email protected]/command.go:940\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/ysh/golang/pkg/mod/github.com/spf13/[email protected]/command.go:1068\ngithub.com/spf13/cobra.(*Command).Execute\n\t/Users/ysh/golang/pkg/mod/github.com/spf13/[email protected]/command.go:992\nmain.main\n\t/Users/ysh/dev/jaegertracing/jaeger/cmd/all-in-one/main.go:243\nruntime.main\n\t/Users/ysh/homebrew/Cellar/go/1.20.5/libexec/src/runtime/proc.go:250"}
exit status 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool thing I noticed here. 🏷️

}

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
Expand Down Expand Up @@ -197,7 +197,7 @@ by default uses only in-memory database.`,
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
metricsFactory, tm,
metricsFactory, tm, tracer,
)

svc.RunAndThen(func() {
Expand All @@ -213,7 +213,9 @@ by default uses only in-memory database.`,
if err := storageFactory.Close(); err != nil {
logger.Error("Failed to close storage factory", zap.Error(err))
}
_ = tracerCloser.Close()
if err := tracer.Close(context.Background()); err != nil {
logger.Error("Error shutting down tracer provider", zap.Error(err))
}
})
return nil
},
Expand Down Expand Up @@ -271,48 +273,24 @@ func startQuery(
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
tm *tenancy.Manager,
jt *jtracer.JTracer,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
jtracer := jtracer.OT(opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jtracer)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt)
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}
return server
}

func initTracer(svc *flags.Service) io.Closer {
logger := svc.Logger
traceCfg := &jaegerClientConfig.Configuration{
ServiceName: "jaeger-query",
Sampler: &jaegerClientConfig.SamplerConfig{
Type: "const",
Param: 1.0,
},
RPCMetrics: true,
}
traceCfg, err := traceCfg.FromEnv()
if err != nil {
logger.Fatal("Failed to read tracer configuration", zap.Error(err))
}
tracer, closer, err := traceCfg.NewTracer(
jaegerClientConfig.Metrics(jlibadapter.NewAdapter(svc.MetricsFactory)),
jaegerClientConfig.Logger(jaegerClientZapLog.NewLogger(logger)),
)
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}
opentracing.SetGlobalTracer(tracer)
return closer
return server
}

func createMetricsQueryService(
Expand Down
6 changes: 3 additions & 3 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type GRPCHandler struct {
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
logger *zap.Logger
tracer jtracer.JTracer
tracer *jtracer.JTracer
nowFn func() time.Time
}

// GRPCHandlerOptions contains optional members of GRPCHandler.
type GRPCHandlerOptions struct {
Logger *zap.Logger
Tracer jtracer.JTracer
Tracer *jtracer.JTracer
NowFn func() time.Time
}

Expand All @@ -73,7 +73,7 @@ func NewGRPCHandler(queryService *querysvc.QueryService,
options.Logger = zap.NewNop()
}

if options.Tracer.OT == nil {
if options.Tracer == nil {
options.Tracer = jtracer.NoOp()
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type grpcClient struct {
conn *grpc.ClientConn
}

func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer jtracer.JTracer, tenancyMgr *tenancy.Manager) (*grpc.Server, net.Addr) {
func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer *jtracer.JTracer, tenancyMgr *tenancy.Manager) (*grpc.Server, net.Addr) {
lis, _ := net.Listen("tcp", ":0")
var grpcOpts []grpc.ServerOption
if tenancyMgr.Enabled {
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (handlerOptions) QueryLookbackDuration(queryLookbackDuration time.Duration)
}

// Tracer creates a HandlerOption that initializes OpenTracing tracer
func (handlerOptions) Tracer(tracer jtracer.JTracer) HandlerOption {
func (handlerOptions) Tracer(tracer *jtracer.JTracer) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.tracer = tracer
}
Expand Down
32 changes: 23 additions & 9 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
Expand Down Expand Up @@ -88,7 +89,7 @@ type APIHandler struct {
basePath string
apiPrefix string
logger *zap.Logger
tracer jtracer.JTracer
tracer *jtracer.JTracer
}

// NewAPIHandler returns an APIHandler
Expand All @@ -111,7 +112,7 @@ func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.Manager, opt
if aH.logger == nil {
aH.logger = zap.NewNop()
}
if aH.tracer.OT == nil {
if aH.tracer == nil {
aH.tracer = jtracer.NoOp()
}
return aH
Expand Down Expand Up @@ -146,12 +147,10 @@ func (aH *APIHandler) handleFunc(
if aH.tenancyMgr.Enabled {
handler = tenancy.ExtractTenantHTTPHandler(aH.tenancyMgr, handler)
}
traceMiddleware := nethttp.Middleware(
aH.tracer.OT,
handler,
nethttp.OperationNameFunc(func(r *http.Request) string {
return route
}))
traceMiddleware := otelhttp.NewHandler(
otelhttp.WithRouteTag(route, traceResponseHandler(handler)),
route,
otelhttp.WithTracerProvider(aH.tracer.OTEL))
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
}

Expand Down Expand Up @@ -523,3 +522,18 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
aH.handleError(w, fmt.Errorf("failed writing HTTP response: %w", err), http.StatusInternalServerError)
}
}

// Returns a handler that generates a traceresponse header.
// https://github.com/w3c/trace-context/blob/main/spec/21-http_response_header_format.md
func traceResponseHandler(handler http.Handler) http.Handler {
// We use the standard TraceContext propagator, since the formats are identical.
// But the propagator uses "traceparent" header name, so we inject it into a map
// `carrier` and then use the result to set the "tracereponse" header.
var prop propagation.TraceContext
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
carrier := make(map[string]string)
prop.Inject(r.Context(), propagation.MapCarrier(carrier))
w.Header().Add("traceresponse", carrier["traceparent"])
handler.ServeHTTP(w, r)
})
}
24 changes: 15 additions & 9 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
testHttp "github.com/stretchr/testify/http"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
otbridge "go.opentelemetry.io/otel/bridge/opentracing"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -303,12 +305,16 @@ func TestGetTrace(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.suffix, func(t *testing.T) {
reporter := jaeger.NewInMemoryReporter()
jaegerTracer, jaegerCloser := jaeger.NewTracer("test", jaeger.NewConstSampler(true), reporter)
jTracer := jtracer.OT(jaegerTracer)
defer jaegerCloser.Close()

ts := initializeTestServer(HandlerOptions.Tracer(jTracer))
exporter := tracetest.NewInMemoryExporter()
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otTracer, wrappedTracerProvider := otbridge.NewTracerPair(tracerProvider.Tracer(""))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
jTracer := jtracer.JTracer{OT: otTracer, OTEL: wrappedTracerProvider}
defer tracerProvider.Shutdown(context.Background())

ts := initializeTestServer(HandlerOptions.Tracer(&jTracer))
defer ts.server.Close()

ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)).
Expand All @@ -319,8 +325,8 @@ func TestGetTrace(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, response.Errors, 0)

assert.Len(t, reporter.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, "/api/traces/{traceID}", reporter.GetSpans()[0].(*jaeger.Span).OperationName())
assert.Len(t, exporter.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, "/api/traces/{traceID}", exporter.GetSpans()[0].Name)

traces := extractTraces(t, &response)
assert.Len(t, traces[0].Spans, 2)
Expand Down
8 changes: 4 additions & 4 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Server struct {
querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer jtracer.JTracer // TODO make part of flags.Service
tracer *jtracer.JTracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
Expand All @@ -65,7 +65,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer jtracer.JTracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, err
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer jtracer.JTracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand Down Expand Up @@ -148,7 +148,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, tracer jtracer.JTracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
Expand Down
Loading