Skip to content

Commit

Permalink
Integrate OpenTracing (#426)
Browse files Browse the repository at this point in the history
Replace built-in tracing with OpenTracing

Changes:
* internal span reporting completely removed and replaced with OpenTracing
* span context and baggage are propagated via application headers (json only atm, thrift next)
* channel can be configured with `opentracing.Tracer`, otherwise it defaults to `opentracing.GlobalTracer()` (which is no-op by default)
* outbound calls tracing
  * span is started in `(c *Connection) beginCall`
  * if tracer understands "zipkin" format, then the tracing fields are populated in `callReq`
  * the actual span/baggage info is injected in `json/call.go`
  * span is stored in `Response` and finished in a couple of places in `outbound.go`
* inbound requests tracing
  * if tracer understands "zipkin" format, the span is started in `inbound.go` and baggage is added in the encoding handler
  * otherwise span is started in the encoding handler only
  * finished generically in completion callbacks in `inbound.go`
  • Loading branch information
yurishkuro authored and prashantv committed Aug 5, 2016
1 parent c169681 commit 114c2b7
Show file tree
Hide file tree
Showing 37 changed files with 1,280 additions and 4,689 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,12 @@ else
@echo "Not checking gofmt on" $(GO_VERSION)
endif
@echo "Checking for unresolved FIXMEs"
-git grep -i fixme | $(FILTER) | grep -v -e Makefile | tee -a lint.log
-git grep -i -n fixme | $(FILTER) | grep -v -e Makefile | tee -a lint.log
@[ ! -s lint.log ]
else
@echo "Skipping linters on" $(GO_VERSION)
endif


thrift_example: thrift_gen
go build -o $(BUILD)/examples/thrift ./examples/thrift/main.go

Expand Down
79 changes: 30 additions & 49 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/tchannel-go/relay"
"github.com/uber/tchannel-go/tnet"

"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
)

Expand All @@ -46,14 +47,8 @@ var (

const (
ephemeralHostPort = "0.0.0.0:0"

// DefaultTraceSampleRate is the default sampling rate for traces.
DefaultTraceSampleRate = 1.0
)

// TraceReporterFactory is the interface of the method to generate TraceReporter instance.
type TraceReporterFactory func(*Channel) TraceReporter

// ChannelOptions are used to control parameters on a create a TChannel
type ChannelOptions struct {
// Default Connection options
Expand Down Expand Up @@ -84,15 +79,9 @@ type ChannelOptions struct {
// Note: This is not a stable part of the API and may change.
TimeNow func() time.Time

// Trace reporter to use for this channel.
TraceReporter TraceReporter

// Trace reporter factory to generate trace reporter instance.
TraceReporterFactory TraceReporterFactory

// TraceSampleRate is the rate of requests to sample, and should be in the range [0, 1].
// If this value is not set, then DefaultTraceSampleRate is used.
TraceSampleRate *float64
// Tracer is an OpenTracing Tracer used to manage distributed tracing spans.
// If not set, opentracing.GlobalTracer() is used.
Tracer opentracing.Tracer
}

// ChannelState is the state of a channel.
Expand Down Expand Up @@ -146,14 +135,23 @@ type Channel struct {
// channelConnectionCommon is the list of common objects that both use
// and can be copied directly from the channel to the connection.
type channelConnectionCommon struct {
log Logger
relayStats relay.Stats
relayLocal map[string]struct{}
statsReporter StatsReporter
traceReporter TraceReporter
subChannels *subChannelMap
timeNow func() time.Time
traceSampleRate float64
log Logger
relayStats relay.Stats
relayLocal map[string]struct{}
statsReporter StatsReporter
tracer opentracing.Tracer
subChannels *subChannelMap
timeNow func() time.Time
}

// Tracer returns the OpenTracing Tracer for this channel. If no tracer was provided
// in the configuration, returns opentracing.GlobalTracer(). Note that this approach
// allows opentracing.GlobalTracer() to be initialized _after_ the channel is created.
func (ccc channelConnectionCommon) Tracer() opentracing.Tracer {
if ccc.tracer != nil {
return ccc.tracer
}
return opentracing.GlobalTracer()
}

// NewChannel creates a new Channel. The new channel can be used to send outbound requests
Expand Down Expand Up @@ -188,11 +186,6 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
timeNow = time.Now
}

traceSampleRate := DefaultTraceSampleRate
if opts.TraceSampleRate != nil {
traceSampleRate = *opts.TraceSampleRate
}

relayStats := relay.NewNoopStats()
if opts.RelayStats != nil {
relayStats = opts.RelayStats
Expand All @@ -203,12 +196,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
log: logger.WithFields(
LogField{"service", serviceName},
LogField{"process", processName}),
relayStats: relayStats,
relayLocal: toStringSet(opts.RelayLocalHandlers),
statsReporter: statsReporter,
subChannels: &subChannelMap{},
timeNow: timeNow,
traceSampleRate: traceSampleRate,
relayStats: relayStats,
relayLocal: toStringSet(opts.RelayLocalHandlers),
statsReporter: statsReporter,
subChannels: &subChannelMap{},
timeNow: timeNow,
tracer: opts.Tracer,
},

connectionOptions: opts.DefaultConnectionOptions,
Expand All @@ -227,16 +220,6 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
ch.mutable.conns = make(map[uint32]*Connection)
ch.createCommonStats()

// TraceReporter may use the channel, so we must initialize it once the channel is ready.
traceReporter := opts.TraceReporter
if opts.TraceReporterFactory != nil {
traceReporter = opts.TraceReporterFactory(ch)
}
if traceReporter == nil {
traceReporter = NullReporter
}
ch.traceReporter = traceReporter

ch.registerInternal()

registerNewChannel(ch)
Expand Down Expand Up @@ -317,6 +300,9 @@ type Registrar interface {

// Peers returns the peer list for this Registrar.
Peers() *PeerList

// Tracer returns OpenTracing Tracer this channel was configured with
Tracer() opentracing.Tracer
}

// Register registers a handler for a method.
Expand Down Expand Up @@ -456,11 +442,6 @@ func (ch *Channel) StatsReporter() StatsReporter {
return ch.statsReporter
}

// TraceReporter returns the trace reporter for this channel.
func (ch *Channel) TraceReporter() TraceReporter {
return ch.traceReporter
}

// StatsTags returns the common tags that should be used when reporting stats.
// It returns a new map for each call.
func (ch *Channel) StatsTags() map[string]string {
Expand Down
25 changes: 24 additions & 1 deletion channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"os"
"testing"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,7 +69,6 @@ func TestStats(t *testing.T) {
peerInfo := ch.PeerInfo()
tags := ch.StatsTags()
assert.NotNil(t, ch.StatsReporter(), "StatsReporter missing")
assert.NotNil(t, ch.TraceReporter(), "TraceReporter missing")
assert.Equal(t, peerInfo.ProcessName, tags["app"], "app tag")
assert.Equal(t, peerInfo.ServiceName, tags["service"], "service tag")
assert.Equal(t, hostname, tags["host"], "hostname tag")
Expand Down Expand Up @@ -112,3 +113,25 @@ func TestIsolatedSubChannelsDontSharePeers(t *testing.T) {
assert.NotNil(t, sub.peers.peersByHostPort["127.0.0.1:3000"])
assert.Nil(t, isolatedSub.peers.peersByHostPort["127.0.0.1:3000"])
}

func TestChannelTracerMethod(t *testing.T) {
mockTracer := mocktracer.New()
ch, err := NewChannel("svc", &ChannelOptions{
Tracer: mockTracer,
})
require.NoError(t, err)
defer ch.Close()
assert.Equal(t, mockTracer, ch.Tracer(), "expecting tracer passed at initialization")

ch, err = NewChannel("svc", &ChannelOptions{})
require.NoError(t, err)
defer ch.Close()
assert.EqualValues(t, opentracing.GlobalTracer(), ch.Tracer(), "expecting default tracer")

// because ch.Tracer() function is doing dynamic lookup, we can change global tracer
origTracer := opentracing.GlobalTracer()
defer opentracing.InitGlobalTracer(origTracer)

opentracing.InitGlobalTracer(mockTracer)
assert.Equal(t, mockTracer, ch.Tracer(), "expecting tracer set as global tracer")
}
20 changes: 8 additions & 12 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

type tchannelCtxParams struct {
span *Span
tracingDisabled bool
call IncomingCall
options *CallOptions
retryOptions *RetryOptions
Expand Down Expand Up @@ -76,9 +76,7 @@ func getTChannelParams(ctx context.Context) *tchannelCtxParams {

// NewContext returns a new root context used to make TChannel requests.
func NewContext(timeout time.Duration) (context.Context, context.CancelFunc) {
return NewContextBuilder(timeout).
setSpan(NewRootSpan()).
Build()
return NewContextBuilder(timeout).Build()
}

// WrapContextForTest returns a copy of the given Context that is associated with the call.
Expand All @@ -90,10 +88,9 @@ func WrapContextForTest(ctx context.Context, call IncomingCall) context.Context
}

// newIncomingContext creates a new context for an incoming call with the given span.
func newIncomingContext(call IncomingCall, timeout time.Duration, span *Span) (context.Context, context.CancelFunc) {
func newIncomingContext(call IncomingCall, timeout time.Duration) (context.Context, context.CancelFunc) {
return NewContextBuilder(timeout).
setIncomingCall(call).
setSpan(span).
Build()
}

Expand All @@ -105,17 +102,16 @@ func CurrentCall(ctx context.Context) IncomingCall {
return nil
}

// CurrentSpan returns the Span value for the provided Context
func CurrentSpan(ctx context.Context) *Span {
func currentCallOptions(ctx context.Context) *CallOptions {
if params := getTChannelParams(ctx); params != nil {
return params.span
return params.options
}
return nil
}

func currentCallOptions(ctx context.Context) *CallOptions {
func isTracingDisabled(ctx context.Context) bool {
if params := getTChannelParams(ctx); params != nil {
return params.options
return params.tracingDisabled
}
return nil
return false
}
40 changes: 1 addition & 39 deletions context_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ type ContextBuilder struct {

// ParentContext to build the new context from. If empty, context.Background() is used.
// The new (child) context inherits a number of properties from the parent context:
// - the tracing Span, unless replaced via SetExternalSpan()
// - context fields, accessible via `ctx.Value(key)`
// - headers if parent is a ContextWithHeaders, unless replaced via SetHeaders()
ParentContext context.Context

// Hidden fields: we do not want users outside of tchannel to set these.
incomingCall IncomingCall
span *Span

// replaceParentHeaders is set to true when SetHeaders() method is called.
// It forces headers from ParentContext to be ignored. When false, parent
Expand Down Expand Up @@ -153,12 +151,6 @@ func (cb *ContextBuilder) SetIncomingCallForTest(call IncomingCall) *ContextBuil
return cb.setIncomingCall(call)
}

// SetSpanForTest sets a tracing span in the context.
// This should only be used in unit tests.
func (cb *ContextBuilder) SetSpanForTest(span *Span) *ContextBuilder {
return cb.setSpan(span)
}

// SetRetryOptions sets RetryOptions in the context.
func (cb *ContextBuilder) SetRetryOptions(retryOptions *RetryOptions) *ContextBuilder {
cb.RetryOptions = retryOptions
Expand All @@ -180,36 +172,11 @@ func (cb *ContextBuilder) SetParentContext(ctx context.Context) *ContextBuilder
return cb
}

// SetExternalSpan creates a new TChannel tracing Span from externally provided IDs
// and sets it as the current span for the context.
// Intended for integration with other Zipkin-like tracers.
func (cb *ContextBuilder) SetExternalSpan(traceID, spanID, parentID uint64, traced bool) *ContextBuilder {
span := newSpan(traceID, spanID, parentID, traced)
return cb.setSpan(span)
}

func (cb *ContextBuilder) setSpan(span *Span) *ContextBuilder {
cb.span = span
return cb
}

func (cb *ContextBuilder) setIncomingCall(call IncomingCall) *ContextBuilder {
cb.incomingCall = call
return cb
}

func (cb *ContextBuilder) getSpan() *Span {
if cb.span != nil {
return cb.span
}
if cb.ParentContext != nil {
if span := CurrentSpan(cb.ParentContext); span != nil {
return span
}
}
return NewRootSpan()
}

func (cb *ContextBuilder) getHeaders() map[string]string {
if cb.ParentContext == nil || cb.replaceParentHeaders {
return cb.Headers
Expand All @@ -232,18 +199,13 @@ func (cb *ContextBuilder) getHeaders() map[string]string {

// Build returns a ContextWithHeaders that can be used to make calls.
func (cb *ContextBuilder) Build() (ContextWithHeaders, context.CancelFunc) {
span := cb.getSpan()
if cb.TracingDisabled {
span.EnableTracing(false)
}

params := &tchannelCtxParams{
options: cb.CallOptions,
span: span,
call: cb.incomingCall,
retryOptions: cb.RetryOptions,
connectTimeout: cb.ConnectTimeout,
hideListeningOnOutbound: cb.hideListeningOnOutbound,
tracingDisabled: cb.TracingDisabled,
}

parent := cb.ParentContext
Expand Down
3 changes: 0 additions & 3 deletions context_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func (c headerCtx) SetResponseHeaders(headers map[string]string) {
// If the parent `ctx` is already an instance of ContextWithHeaders, its existing headers
// will be ignored. In order to merge new headers with parent headers, use ContextBuilder.
func WrapWithHeaders(ctx context.Context, headers map[string]string) ContextWithHeaders {
if hctx, ok := ctx.(headerCtx); ok {
ctx = hctx
}
h := &headersContainer{
reqHeaders: headers,
}
Expand Down
Loading

0 comments on commit 114c2b7

Please sign in to comment.