From 68456290d55da6622f38d8a3b64cfb3a5ed3bcec Mon Sep 17 00:00:00 2001 From: Jeremy White Date: Tue, 20 Mar 2018 13:25:23 -0500 Subject: [PATCH] OpenTracing zipKin Support. Reference - Issue #429 Co-authored-by: Kristina Fischer Co-authored-by: Michael Murphy Co-authored-by: Nathan West Co-authored-by: Austin Hartzheim Co-authored-by: Jacob Hansen --- config/config.go | 11 ++++++ config/default.go | 10 +++++ config/load.go | 7 ++++ fabio.properties | 59 ++++++++++++++++++++++++++++ main.go | 15 ++++++++ proxy/http_proxy.go | 12 ++++++ trace/trace.go | 93 +++++++++++++++++++++++++++++++++++++++++++++ trace/trace_test.go | 54 ++++++++++++++++++++++++++ 8 files changed, 261 insertions(+) create mode 100644 trace/trace.go create mode 100644 trace/trace_test.go diff --git a/config/config.go b/config/config.go index f8ad7e4c2..355eeb6c9 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type Config struct { Metrics Metrics UI UI Runtime Runtime + Tracing Tracing ProfileMode string ProfilePath string Insecure bool @@ -144,3 +145,13 @@ type Consul struct { CheckDeregisterCriticalServiceAfter string ChecksRequired string } + +type Tracing struct { + TracingEnabled bool + CollectorType string + ConnectString string + ServiceName string + Topic string + SamplerRate float64 + SpanHost string +} diff --git a/config/default.go b/config/default.go index 45f68424d..eef566913 100644 --- a/config/default.go +++ b/config/default.go @@ -77,4 +77,14 @@ var defaultConfig = &Config{ Color: "light-green", Access: "rw", }, + + Tracing: Tracing{ + TracingEnabled: false, + CollectorType: "http", + ConnectString: "http://localhost:9411/api/v1/spans", + ServiceName: "Fabiolb", + Topic: "Fabiolb-Kafka-Topic", + SamplerRate: -1, + SpanHost: "localhost:9998", + }, } diff --git a/config/load.go b/config/load.go index d75188471..7629e0912 100644 --- a/config/load.go +++ b/config/load.go @@ -186,6 +186,13 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c f.StringVar(&cfg.UI.Title, "ui.title", defaultConfig.UI.Title, "optional title for the UI") f.StringVar(&cfg.ProfileMode, "profile.mode", defaultConfig.ProfileMode, "enable profiling mode, one of [cpu, mem, mutex, block]") f.StringVar(&cfg.ProfilePath, "profile.path", defaultConfig.ProfilePath, "path to profile dump file") + f.BoolVar(&cfg.Tracing.TracingEnabled, "tracing.TracingEnabled", defaultConfig.Tracing.TracingEnabled, "Enable/Disable OpenTrace, one of [true, false]") + f.StringVar(&cfg.Tracing.CollectorType, "tracing.CollectorType", defaultConfig.Tracing.CollectorType, "OpenTrace Collector Type, one of [http, kafka]") + f.StringVar(&cfg.Tracing.ConnectString, "tracing.ConnectString", defaultConfig.Tracing.ConnectString, "OpenTrace Collector host:port") + f.StringVar(&cfg.Tracing.ServiceName, "tracing.ServiceName", defaultConfig.Tracing.ServiceName, "Service name to embed in OpenTrace span") + f.StringVar(&cfg.Tracing.Topic, "tracing.Topic", defaultConfig.Tracing.Topic, "OpenTrace Collector Kafka Topic") + f.Float64Var(&cfg.Tracing.SamplerRate, "tracing.SamplerRate", defaultConfig.Tracing.SamplerRate, "OpenTrace sample rate percentage in decimal form") + f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans") // deprecated flags var proxyLogRoutes string diff --git a/fabio.properties b/fabio.properties index dae2caab0..94bb1c4b8 100644 --- a/fabio.properties +++ b/fabio.properties @@ -994,3 +994,62 @@ # The default is # # ui.title = + + +# Open Trace Configuration Currently supports ZipKin Collector +# tracing.TracingEnabled enables/disables Open Tracing in Fabio. Bool value true/false +# +# The default is +# +# tracing.TracingEnabled = false + +# tracing.CollectorType sets what type of collector is used. +# Currently only two types are supported http and kafka +# +# http: sets collector type to http tracing.ConnectString must also be set +# kafka: sets collector type to emit via kafka. tracing.Topic must also be set +# +# The default is +# +# tracing.CollectorType = http + +# tracing.ConnectString sets the connection string per connection type. +# If tracing.CollectorType = http tracing.ConnectString should be +# http://URL:PORT where URL is the URL of your collector and PORT is the TCP Port +# it is listening on +# +# If tracing.CollectorType = kafka tracing.ConnectString should be +# HOSTNAME:PORT of your kafka broker +# tracing.Topic must also be set +# +# The default is +# +# tracing.ConnectString = http://localhost:9411/api/v1/spans + +# tracing.ServiceName sets the service name used in reporting span information +# +# The default is +# +# tracing.ServiceName = Fabiolb + +# tracing.Topic sets the Topic String used if tracing.CollectorType is kafka and +# tracing.ConnectSting is set to a kafka broker +# +# The default is +# +# tracing.Topic = Fabiolb-Kafka-Topic + +# tracing.SamplerRate is the rate at which opentrace span data will be collected and sent +# If SamplerRate is <= 0 Never sample +# If SamplerRate is >= 1.0 always sample +# Values between 0 and 1 will be the percentage in decimal form +# Example a value of .50 will be 50% sample rate +# +# The default is +# tracing.SamplerRate = -1 + +# tracing.SpanHost sets host information. +# This is used to specify additional information when sending spans to a collector +# +# The default is +# tracing.SpanHost = localhost:9998 \ No newline at end of file diff --git a/main.go b/main.go index 601503c6a..faa085d48 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,7 @@ import ( "github.com/fabiolb/fabio/registry/file" "github.com/fabiolb/fabio/registry/static" "github.com/fabiolb/fabio/route" + "github.com/fabiolb/fabio/trace" "github.com/pkg/profile" dmp "github.com/sergi/go-diff/diffmatchpatch" ) @@ -118,6 +119,9 @@ func main() { initMetrics(cfg) initRuntime(cfg) initBackend(cfg) + //Init OpenTracing if Enabled in the Properties File Tracing.TracingEnabled + initOpenTracing(cfg) + startAdmin(cfg) go watchNoRouteHTML(cfg) @@ -195,6 +199,7 @@ func newHTTPProxy(cfg *config.Config) http.Handler { Requests: metrics.DefaultRegistry.GetTimer("requests"), Noroute: metrics.DefaultRegistry.GetCounter("notfound"), Logger: l, + TracerCfg: cfg.Tracing, } } @@ -379,6 +384,16 @@ func initBackend(cfg *config.Config) { } } +// OpenTracing Init +func initOpenTracing(cfg *config.Config) { + // If fabio.properties file has tracing.TracingEnabled set to true the init tracing + if cfg.Tracing.TracingEnabled { + trace.InitializeTracer(cfg.Tracing.CollectorType, cfg.Tracing.ConnectString, cfg.Tracing.ServiceName, cfg.Tracing.Topic, cfg.Tracing.SamplerRate, cfg.Tracing.SpanHost) + + } + +} + func watchBackend(cfg *config.Config, first chan bool) { var ( last string diff --git a/proxy/http_proxy.go b/proxy/http_proxy.go index b531373af..a8335f40a 100644 --- a/proxy/http_proxy.go +++ b/proxy/http_proxy.go @@ -19,6 +19,8 @@ import ( "github.com/fabiolb/fabio/proxy/gzip" "github.com/fabiolb/fabio/route" "github.com/fabiolb/fabio/uuid" + "github.com/fabiolb/fabio/trace" + ) // HTTPProxy is a dynamic reverse proxy for HTTP and HTTPS protocols. @@ -53,6 +55,9 @@ type HTTPProxy struct { // Logger is the access logger for the requests. Logger logger.Logger + // TracerCfg is the Open Tracing configuration as provided during startup + TracerCfg config.Tracing + // UUID returns a unique id in uuid format. // If UUID is nil, uuid.NewUUID() is used. UUID func() string @@ -71,6 +76,10 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Header.Set(p.Config.RequestID, id()) } + //Create Span + span := trace.CreateSpan(r, p.TracerCfg.ServiceName) + defer span.Finish() + t := p.Lookup(r) if t == nil { status := p.Config.NoRouteStatus @@ -144,6 +153,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + //Add OpenTrace Headers to response + trace.InjectHeaders(span, r) + upgrade, accept := r.Header.Get("Upgrade"), r.Header.Get("Accept") tr := p.Transport diff --git a/trace/trace.go b/trace/trace.go new file mode 100644 index 000000000..7747efe62 --- /dev/null +++ b/trace/trace.go @@ -0,0 +1,93 @@ +package trace + +import ( + "log" + "net/http" + "os" + "strings" + + opentracing "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go-opentracing" + "github.com/opentracing/opentracing-go/ext" +) + +func InjectHeaders(span opentracing.Span, req *http.Request) { + // Inject span data into the request headers + opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(req.Header), + ) +} + +func CreateCollector(collectorType string, connectString string, topic string) zipkin.Collector { + var collector zipkin.Collector + var err error + + if collectorType == "http" { + collector, err = zipkin.NewHTTPCollector(connectString) + } else if collectorType == "kafka" { + // TODO set logger? + kafkaHosts := strings.Split(connectString, ",") + collector, err = zipkin.NewKafkaCollector( + kafkaHosts, + zipkin.KafkaTopic(topic), + ) + } + + if err != nil { + log.Printf("Unable to create Zipkin %s collector: %+v", collectorType, err) + os.Exit(-1) + } + + return collector +} + +func CreateTracer(recorder zipkin.SpanRecorder, samplerRate float64) opentracing.Tracer { + tracer, err := zipkin.NewTracer( + recorder, + zipkin.WithSampler(zipkin.NewBoundarySampler(samplerRate, 1)), + zipkin.ClientServerSameSpan(false), + zipkin.TraceID128Bit(true), + ) + + if err != nil { + log.Printf("Unable to create Zipkin tracer: %+v", err) + os.Exit(-1) + } + + return tracer +} + +func CreateSpan(r *http.Request, serviceName string) opentracing.Span { + globalTracer := opentracing.GlobalTracer() + + // If headers contain trace data, create child span from parent; else, create root span + var span opentracing.Span + if globalTracer != nil { + spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) + if err != nil { + span = globalTracer.StartSpan(serviceName) + } else { + span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx)) + } + } + + return span // caller must defer span.finish() +} + +func InitializeTracer(collectorType string, connectString string, serviceName string, topic string, samplerRate float64, addressPort string) { + log.Printf("Tracing initializing - type: %s, connection string: %s, service name: %s, topic: %s, samplerRate: %v", collectorType, connectString, serviceName, topic, samplerRate) + + // Create a new Zipkin Collector, Recorder, and Tracer + collector := CreateCollector(collectorType, connectString, topic) + recorder := zipkin.NewRecorder(collector, false, addressPort, serviceName) + tracer := CreateTracer(recorder, samplerRate) + + // Set the Zipkin Tracer created above to the GlobalTracer + opentracing.SetGlobalTracer(tracer) + + log.Printf("\n\nTRACER: %v\n\n", tracer) + log.Printf("\n\nCOLLECTOR: %v\n\n", collector) + log.Printf("\n\nRECORDER: %v\n\n", recorder) +} \ No newline at end of file diff --git a/trace/trace_test.go b/trace/trace_test.go new file mode 100644 index 000000000..acfe83209 --- /dev/null +++ b/trace/trace_test.go @@ -0,0 +1,54 @@ +package trace + +import ( + opentracing "github.com/opentracing/opentracing-go" + mocktracer "github.com/opentracing/opentracing-go/mocktracer" + "github.com/opentracing/opentracing-go/ext" + "net/http" + "testing" +) + +func mimicTracerInject(req *http.Request) { + // TODO maybe replace this will a call to opentracing.GlobalTracer().Inject() + req.Header.Add("X-B3-TraceId", "1234562345678") + req.Header.Add("X-B3-SpanId", "123456789") + req.Header.Add("X-B3-ParentSpanId", "123456789") + req.Header.Add("X-B3-Flags", "1") +} + +// go test -v ./trace +func TestInjectHeaders(t *testing.T) { + serviceName := "TESTING" + + req, err := http.NewRequest("GET", "http://example.com", nil) + if err != nil { + t.Error("Error when creating new request.") + t.Fail() + } + mimicTracerInject(req) + + mt := mocktracer.New() + opentracing.SetGlobalTracer(mt) + globalTracer := opentracing.GlobalTracer() + + var span opentracing.Span + if globalTracer != nil { + spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) + if err != nil { + span = globalTracer.StartSpan(serviceName) + } else { + span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx)) + } + } + + InjectHeaders(span, req) + + if req.Header.Get("X-B3-Traceid") == "" { + t.Error("Zipkin headers not set in request.") + t.Fail() + } + if req.Header.Get("X-B3-Traceid") != "1234562345678" { + t.Error("Zipkin headers do not match the values set.") + t.Fail() + } +}