Skip to content

Commit

Permalink
OpenTracing zipKin Support. Reference - Issue #429
Browse files Browse the repository at this point in the history
Co-authored-by: Kristina Fischer <[email protected]>
Co-authored-by: Michael Murphy <[email protected]>
Co-authored-by: Nathan West <[email protected]>
Co-authored-by: Austin Hartzheim <[email protected]>
Co-authored-by: Jacob Hansen <[email protected]>
  • Loading branch information
6 people committed Mar 20, 2018
1 parent f6be07e commit 6845629
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 0 deletions.
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
Metrics Metrics
UI UI
Runtime Runtime
Tracing Tracing
ProfileMode string
ProfilePath string
Insecure bool
Expand Down Expand Up @@ -144,3 +145,13 @@ type Consul struct {
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
}

type Tracing struct {
TracingEnabled bool
CollectorType string
ConnectString string
ServiceName string
Topic string
SamplerRate float64
SpanHost string
}
10 changes: 10 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ var defaultConfig = &Config{
Color: "light-green",
Access: "rw",
},

Tracing: Tracing{
TracingEnabled: false,
CollectorType: "http",
ConnectString: "http://localhost:9411/api/v1/spans",
ServiceName: "Fabiolb",
Topic: "Fabiolb-Kafka-Topic",
SamplerRate: -1,
SpanHost: "localhost:9998",
},
}
7 changes: 7 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.UI.Title, "ui.title", defaultConfig.UI.Title, "optional title for the UI")
f.StringVar(&cfg.ProfileMode, "profile.mode", defaultConfig.ProfileMode, "enable profiling mode, one of [cpu, mem, mutex, block]")
f.StringVar(&cfg.ProfilePath, "profile.path", defaultConfig.ProfilePath, "path to profile dump file")
f.BoolVar(&cfg.Tracing.TracingEnabled, "tracing.TracingEnabled", defaultConfig.Tracing.TracingEnabled, "Enable/Disable OpenTrace, one of [true, false]")
f.StringVar(&cfg.Tracing.CollectorType, "tracing.CollectorType", defaultConfig.Tracing.CollectorType, "OpenTrace Collector Type, one of [http, kafka]")
f.StringVar(&cfg.Tracing.ConnectString, "tracing.ConnectString", defaultConfig.Tracing.ConnectString, "OpenTrace Collector host:port")
f.StringVar(&cfg.Tracing.ServiceName, "tracing.ServiceName", defaultConfig.Tracing.ServiceName, "Service name to embed in OpenTrace span")
f.StringVar(&cfg.Tracing.Topic, "tracing.Topic", defaultConfig.Tracing.Topic, "OpenTrace Collector Kafka Topic")
f.Float64Var(&cfg.Tracing.SamplerRate, "tracing.SamplerRate", defaultConfig.Tracing.SamplerRate, "OpenTrace sample rate percentage in decimal form")
f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans")

// deprecated flags
var proxyLogRoutes string
Expand Down
59 changes: 59 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -994,3 +994,62 @@
# The default is
#
# ui.title =


# Open Trace Configuration Currently supports ZipKin Collector
# tracing.TracingEnabled enables/disables Open Tracing in Fabio. Bool value true/false
#
# The default is
#
# tracing.TracingEnabled = false

# tracing.CollectorType sets what type of collector is used.
# Currently only two types are supported http and kafka
#
# http: sets collector type to http tracing.ConnectString must also be set
# kafka: sets collector type to emit via kafka. tracing.Topic must also be set
#
# The default is
#
# tracing.CollectorType = http

# tracing.ConnectString sets the connection string per connection type.
# If tracing.CollectorType = http tracing.ConnectString should be
# http://URL:PORT where URL is the URL of your collector and PORT is the TCP Port
# it is listening on
#
# If tracing.CollectorType = kafka tracing.ConnectString should be
# HOSTNAME:PORT of your kafka broker
# tracing.Topic must also be set
#
# The default is
#
# tracing.ConnectString = http://localhost:9411/api/v1/spans

# tracing.ServiceName sets the service name used in reporting span information
#
# The default is
#
# tracing.ServiceName = Fabiolb

# tracing.Topic sets the Topic String used if tracing.CollectorType is kafka and
# tracing.ConnectSting is set to a kafka broker
#
# The default is
#
# tracing.Topic = Fabiolb-Kafka-Topic

# tracing.SamplerRate is the rate at which opentrace span data will be collected and sent
# If SamplerRate is <= 0 Never sample
# If SamplerRate is >= 1.0 always sample
# Values between 0 and 1 will be the percentage in decimal form
# Example a value of .50 will be 50% sample rate
#
# The default is
# tracing.SamplerRate = -1

# tracing.SpanHost sets host information.
# This is used to specify additional information when sending spans to a collector
#
# The default is
# tracing.SpanHost = localhost:9998
15 changes: 15 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/fabiolb/fabio/registry/file"
"github.com/fabiolb/fabio/registry/static"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/trace"
"github.com/pkg/profile"
dmp "github.com/sergi/go-diff/diffmatchpatch"
)
Expand Down Expand Up @@ -118,6 +119,9 @@ func main() {
initMetrics(cfg)
initRuntime(cfg)
initBackend(cfg)
//Init OpenTracing if Enabled in the Properties File Tracing.TracingEnabled
initOpenTracing(cfg)

startAdmin(cfg)

go watchNoRouteHTML(cfg)
Expand Down Expand Up @@ -195,6 +199,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
Requests: metrics.DefaultRegistry.GetTimer("requests"),
Noroute: metrics.DefaultRegistry.GetCounter("notfound"),
Logger: l,
TracerCfg: cfg.Tracing,
}
}

Expand Down Expand Up @@ -379,6 +384,16 @@ func initBackend(cfg *config.Config) {
}
}

// OpenTracing Init
func initOpenTracing(cfg *config.Config) {
// If fabio.properties file has tracing.TracingEnabled set to true the init tracing
if cfg.Tracing.TracingEnabled {
trace.InitializeTracer(cfg.Tracing.CollectorType, cfg.Tracing.ConnectString, cfg.Tracing.ServiceName, cfg.Tracing.Topic, cfg.Tracing.SamplerRate, cfg.Tracing.SpanHost)

}

}

func watchBackend(cfg *config.Config, first chan bool) {
var (
last string
Expand Down
12 changes: 12 additions & 0 deletions proxy/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/fabiolb/fabio/proxy/gzip"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/uuid"
"github.com/fabiolb/fabio/trace"

)

// HTTPProxy is a dynamic reverse proxy for HTTP and HTTPS protocols.
Expand Down Expand Up @@ -53,6 +55,9 @@ type HTTPProxy struct {
// Logger is the access logger for the requests.
Logger logger.Logger

// TracerCfg is the Open Tracing configuration as provided during startup
TracerCfg config.Tracing

// UUID returns a unique id in uuid format.
// If UUID is nil, uuid.NewUUID() is used.
UUID func() string
Expand All @@ -71,6 +76,10 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set(p.Config.RequestID, id())
}

//Create Span
span := trace.CreateSpan(r, p.TracerCfg.ServiceName)
defer span.Finish()

t := p.Lookup(r)
if t == nil {
status := p.Config.NoRouteStatus
Expand Down Expand Up @@ -144,6 +153,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

//Add OpenTrace Headers to response
trace.InjectHeaders(span, r)

upgrade, accept := r.Header.Get("Upgrade"), r.Header.Get("Accept")

tr := p.Transport
Expand Down
93 changes: 93 additions & 0 deletions trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package trace

import (
"log"
"net/http"
"os"
"strings"

opentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
"github.com/opentracing/opentracing-go/ext"
)

func InjectHeaders(span opentracing.Span, req *http.Request) {
// Inject span data into the request headers
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}

func CreateCollector(collectorType string, connectString string, topic string) zipkin.Collector {
var collector zipkin.Collector
var err error

if collectorType == "http" {
collector, err = zipkin.NewHTTPCollector(connectString)
} else if collectorType == "kafka" {
// TODO set logger?
kafkaHosts := strings.Split(connectString, ",")
collector, err = zipkin.NewKafkaCollector(
kafkaHosts,
zipkin.KafkaTopic(topic),
)
}

if err != nil {
log.Printf("Unable to create Zipkin %s collector: %+v", collectorType, err)
os.Exit(-1)
}

return collector
}

func CreateTracer(recorder zipkin.SpanRecorder, samplerRate float64) opentracing.Tracer {
tracer, err := zipkin.NewTracer(
recorder,
zipkin.WithSampler(zipkin.NewBoundarySampler(samplerRate, 1)),
zipkin.ClientServerSameSpan(false),
zipkin.TraceID128Bit(true),
)

if err != nil {
log.Printf("Unable to create Zipkin tracer: %+v", err)
os.Exit(-1)
}

return tracer
}

func CreateSpan(r *http.Request, serviceName string) opentracing.Span {
globalTracer := opentracing.GlobalTracer()

// If headers contain trace data, create child span from parent; else, create root span
var span opentracing.Span
if globalTracer != nil {
spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
if err != nil {
span = globalTracer.StartSpan(serviceName)
} else {
span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx))
}
}

return span // caller must defer span.finish()
}

func InitializeTracer(collectorType string, connectString string, serviceName string, topic string, samplerRate float64, addressPort string) {
log.Printf("Tracing initializing - type: %s, connection string: %s, service name: %s, topic: %s, samplerRate: %v", collectorType, connectString, serviceName, topic, samplerRate)

// Create a new Zipkin Collector, Recorder, and Tracer
collector := CreateCollector(collectorType, connectString, topic)
recorder := zipkin.NewRecorder(collector, false, addressPort, serviceName)
tracer := CreateTracer(recorder, samplerRate)

// Set the Zipkin Tracer created above to the GlobalTracer
opentracing.SetGlobalTracer(tracer)

log.Printf("\n\nTRACER: %v\n\n", tracer)
log.Printf("\n\nCOLLECTOR: %v\n\n", collector)
log.Printf("\n\nRECORDER: %v\n\n", recorder)
}
54 changes: 54 additions & 0 deletions trace/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package trace

import (
opentracing "github.com/opentracing/opentracing-go"
mocktracer "github.com/opentracing/opentracing-go/mocktracer"
"github.com/opentracing/opentracing-go/ext"
"net/http"
"testing"
)

func mimicTracerInject(req *http.Request) {
// TODO maybe replace this will a call to opentracing.GlobalTracer().Inject()
req.Header.Add("X-B3-TraceId", "1234562345678")
req.Header.Add("X-B3-SpanId", "123456789")
req.Header.Add("X-B3-ParentSpanId", "123456789")
req.Header.Add("X-B3-Flags", "1")
}

// go test -v ./trace
func TestInjectHeaders(t *testing.T) {
serviceName := "TESTING"

req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Error when creating new request.")
t.Fail()
}
mimicTracerInject(req)

mt := mocktracer.New()
opentracing.SetGlobalTracer(mt)
globalTracer := opentracing.GlobalTracer()

var span opentracing.Span
if globalTracer != nil {
spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
if err != nil {
span = globalTracer.StartSpan(serviceName)
} else {
span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx))
}
}

InjectHeaders(span, req)

if req.Header.Get("X-B3-Traceid") == "" {
t.Error("Zipkin headers not set in request.")
t.Fail()
}
if req.Header.Get("X-B3-Traceid") != "1234562345678" {
t.Error("Zipkin headers do not match the values set.")
t.Fail()
}
}

0 comments on commit 6845629

Please sign in to comment.