Skip to content

Commit

Permalink
Merge pull request #472 from galen0624/opentrace
Browse files Browse the repository at this point in the history
#429 issue - OpenTrace zipKin Support
  • Loading branch information
magiconair authored Oct 29, 2018
2 parents bc31559 + 8e8a483 commit 57c76db
Show file tree
Hide file tree
Showing 297 changed files with 49,947 additions and 3 deletions.
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import (
)

type Config struct {

Proxy Proxy
Registry Registry
Listen []Listen
Log Log
Metrics Metrics
UI UI
Runtime Runtime
Tracing Tracing
ProfileMode string
ProfilePath string
Insecure bool
GlobMatchingDisabled bool


}

type CertSource struct {
Expand Down Expand Up @@ -146,3 +150,14 @@ type Consul struct {
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
}

type Tracing struct {
TracingEnabled bool
CollectorType string
ConnectString string
ServiceName string
Topic string
SamplerRate float64
SpanHost string
}

11 changes: 11 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,15 @@ var defaultConfig = &Config{
Color: "light-green",
Access: "rw",
},

Tracing: Tracing{
TracingEnabled: false,
CollectorType: "http",
ConnectString: "http://localhost:9411/api/v1/spans",
ServiceName: "Fabiolb",
Topic: "Fabiolb-Kafka-Topic",
SamplerRate: -1,
SpanHost: "localhost:9998",
},

}
8 changes: 8 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,16 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.UI.Title, "ui.title", defaultConfig.UI.Title, "optional title for the UI")
f.StringVar(&cfg.ProfileMode, "profile.mode", defaultConfig.ProfileMode, "enable profiling mode, one of [cpu, mem, mutex, block]")
f.StringVar(&cfg.ProfilePath, "profile.path", defaultConfig.ProfilePath, "path to profile dump file")
f.BoolVar(&cfg.Tracing.TracingEnabled, "tracing.TracingEnabled", defaultConfig.Tracing.TracingEnabled, "Enable/Disable OpenTrace, one of [true, false]")
f.StringVar(&cfg.Tracing.CollectorType, "tracing.CollectorType", defaultConfig.Tracing.CollectorType, "OpenTrace Collector Type, one of [http, kafka]")
f.StringVar(&cfg.Tracing.ConnectString, "tracing.ConnectString", defaultConfig.Tracing.ConnectString, "OpenTrace Collector host:port")
f.StringVar(&cfg.Tracing.ServiceName, "tracing.ServiceName", defaultConfig.Tracing.ServiceName, "Service name to embed in OpenTrace span")
f.StringVar(&cfg.Tracing.Topic, "tracing.Topic", defaultConfig.Tracing.Topic, "OpenTrace Collector Kafka Topic")
f.Float64Var(&cfg.Tracing.SamplerRate, "tracing.SamplerRate", defaultConfig.Tracing.SamplerRate, "OpenTrace sample rate percentage in decimal form")
f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans")
f.BoolVar(&cfg.GlobMatchingDisabled, "glob.matching.disabled", defaultConfig.GlobMatchingDisabled, "Disable Glob Matching on routes, one of [true, false]")


// deprecated flags
var proxyLogRoutes string
f.StringVar(&proxyLogRoutes, "proxy.log.routes", "", "deprecated. use log.routes.format instead")
Expand Down
59 changes: 59 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -1011,3 +1011,62 @@
# The default is
#
# ui.title =


# Open Trace Configuration Currently supports ZipKin Collector
# tracing.TracingEnabled enables/disables Open Tracing in Fabio. Bool value true/false
#
# The default is
#
# tracing.TracingEnabled = false

# tracing.CollectorType sets what type of collector is used.
# Currently only two types are supported http and kafka
#
# http: sets collector type to http tracing.ConnectString must also be set
# kafka: sets collector type to emit via kafka. tracing.Topic must also be set
#
# The default is
#
# tracing.CollectorType = http

# tracing.ConnectString sets the connection string per connection type.
# If tracing.CollectorType = http tracing.ConnectString should be
# http://URL:PORT where URL is the URL of your collector and PORT is the TCP Port
# it is listening on
#
# If tracing.CollectorType = kafka tracing.ConnectString should be
# HOSTNAME:PORT of your kafka broker
# tracing.Topic must also be set
#
# The default is
#
# tracing.ConnectString = http://localhost:9411/api/v1/spans

# tracing.ServiceName sets the service name used in reporting span information
#
# The default is
#
# tracing.ServiceName = Fabiolb

# tracing.Topic sets the Topic String used if tracing.CollectorType is kafka and
# tracing.ConnectSting is set to a kafka broker
#
# The default is
#
# tracing.Topic = Fabiolb-Kafka-Topic

# tracing.SamplerRate is the rate at which opentrace span data will be collected and sent
# If SamplerRate is <= 0 Never sample
# If SamplerRate is >= 1.0 always sample
# Values between 0 and 1 will be the percentage in decimal form
# Example a value of .50 will be 50% sample rate
#
# The default is
# tracing.SamplerRate = -1

# tracing.SpanHost sets host information.
# This is used to specify additional information when sending spans to a collector
#
# The default is
# tracing.SpanHost = localhost:9998
11 changes: 8 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/fabiolb/fabio/registry/file"
"github.com/fabiolb/fabio/registry/static"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/trace"
"github.com/pkg/profile"
dmp "github.com/sergi/go-diff/diffmatchpatch"
)
Expand Down Expand Up @@ -116,6 +117,9 @@ func main() {
initMetrics(cfg)
initRuntime(cfg)
initBackend(cfg)
//Init OpenTracing if Enabled in the Properties File Tracing.TracingEnabled
trace.InitializeTracer(&cfg.Tracing)

startAdmin(cfg)

go watchNoRouteHTML(cfg)
Expand Down Expand Up @@ -190,9 +194,10 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
}
return t
},
Requests: metrics.DefaultRegistry.GetTimer("requests"),
Noroute: metrics.DefaultRegistry.GetCounter("notfound"),
Logger: l,
Requests: metrics.DefaultRegistry.GetTimer("requests"),
Noroute: metrics.DefaultRegistry.GetCounter("notfound"),
Logger: l,
TracerCfg: cfg.Tracing,
}
}

Expand Down
11 changes: 11 additions & 0 deletions proxy/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/fabiolb/fabio/noroute"
"github.com/fabiolb/fabio/proxy/gzip"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/trace"
"github.com/fabiolb/fabio/uuid"
)

Expand Down Expand Up @@ -53,6 +54,9 @@ type HTTPProxy struct {
// Logger is the access logger for the requests.
Logger logger.Logger

// TracerCfg is the Open Tracing configuration as provided during startup
TracerCfg config.Tracing

// UUID returns a unique id in uuid format.
// If UUID is nil, uuid.NewUUID() is used.
UUID func() string
Expand All @@ -71,6 +75,10 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set(p.Config.RequestID, id())
}

//Create Span
span := trace.CreateSpan(r, p.TracerCfg.ServiceName)
defer span.Finish()

t := p.Lookup(r)
if t == nil {
status := p.Config.NoRouteStatus
Expand Down Expand Up @@ -144,6 +152,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

//Add OpenTrace Headers to response
trace.InjectHeaders(span, r)

upgrade, accept := r.Header.Get("Upgrade"), r.Header.Get("Accept")

tr := p.Transport
Expand Down
100 changes: 100 additions & 0 deletions trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package trace

import (
"fmt"
"log"
"net/http"
"os"
"strings"

"github.com/fabiolb/fabio/config"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
)

func InjectHeaders(span opentracing.Span, req *http.Request) {
// Inject span data into the request headers
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}

func CreateCollector(collectorType, connectString, topic string) zipkin.Collector {
var collector zipkin.Collector
var err error

switch collectorType {
case "http":
collector, err = zipkin.NewHTTPCollector(connectString)
case "kafka":
// TODO set logger?
kafkaHosts := strings.Split(connectString, ",")
collector, err = zipkin.NewKafkaCollector(
kafkaHosts,
zipkin.KafkaTopic(topic),
)
default:
err = fmt.Errorf("Unknown collector type.")
}

if err != nil {
log.Fatalf("Unable to create Zipkin %s collector: %v", collectorType, err)
}

return collector
}

func CreateTracer(recorder zipkin.SpanRecorder, samplerRate float64) opentracing.Tracer {
tracer, err := zipkin.NewTracer(
recorder,
zipkin.WithSampler(zipkin.NewBoundarySampler(samplerRate, 1)),
zipkin.ClientServerSameSpan(false),
zipkin.TraceID128Bit(true),
)

if err != nil {
log.Printf("Unable to create Zipkin tracer: %+v", err)
os.Exit(-1)
}

return tracer
}

func CreateSpan(r *http.Request, serviceName string) opentracing.Span {
globalTracer := opentracing.GlobalTracer()

// If headers contain trace data, create child span from parent; else, create root span
var span opentracing.Span
if globalTracer != nil {
spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
if err != nil {
span = globalTracer.StartSpan(serviceName)
} else {
span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx))
}
}

return span // caller must defer span.finish()
}

// InitializeTracer initializes OpenTracing support if Tracing.TracingEnabled
// is set in the config.
func InitializeTracer(traceConfig *config.Tracing) {
if !traceConfig.TracingEnabled {
return
}

log.Printf("Tracing initializing - type: %s, connection string: %s, service name: %s, topic: %s, samplerRate: %v",
traceConfig.CollectorType, traceConfig.ConnectString, traceConfig.ServiceName, traceConfig.Topic, traceConfig.SamplerRate)

// Create a new Zipkin Collector, Recorder, and Tracer
collector := CreateCollector(traceConfig.CollectorType, traceConfig.ConnectString, traceConfig.Topic)
recorder := zipkin.NewRecorder(collector, false, traceConfig.SpanHost, traceConfig.ServiceName)
tracer := CreateTracer(recorder, traceConfig.SamplerRate)

// Set the Zipkin Tracer created above to the GlobalTracer
opentracing.SetGlobalTracer(tracer)
}
Loading

0 comments on commit 57c76db

Please sign in to comment.