Skip to content

Commit

Permalink
change jaeger options to functional style (#161)
Browse files Browse the repository at this point in the history
* change jaeger options to functional style

* fix lints

* add interface validaiton
  • Loading branch information
paivagustavo authored and rghetia committed Oct 4, 2019
1 parent bf2e5e9 commit b1bb19a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 99 deletions.
10 changes: 5 additions & 5 deletions exporter/trace/jaeger/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ func main() {
ctx := context.Background()

// Create Jaeger Exporter
exporter, err := jaeger.NewExporter(jaeger.Options{
CollectorEndpoint: "http://localhost:14268/api/traces",
Process: jaeger.Process{
exporter, err := jaeger.NewExporter(
jaeger.WithCollectorEndpoint("http://localhost:14268/api/traces"),
jaeger.WithProcess(jaeger.Process{
ServiceName: "trace-demo",
},
})
}),
)
if err != nil {
log.Fatal(err)
}
Expand Down
134 changes: 40 additions & 94 deletions exporter/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
package jaeger

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"

"github.com/apache/thrift/lib/go/thrift"
"google.golang.org/api/support/bundler"
"google.golang.org/grpc/codes"

Expand All @@ -34,55 +27,58 @@ import (

const defaultServiceName = "OpenTelemetry"

// Options are the options to be used when initializing a Jaeger exporter.
type Options struct {
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
CollectorEndpoint string

// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
AgentEndpoint string
type Option func(*options)

// options are the options to be used when initializing a Jaeger exporter.
type options struct {
// OnError is the hook to be called when there is
// an error occurred when uploading the stats data.
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
// Optional.
OnError func(err error)

// Username to be used if basic auth is required.
// Optional.
Username string

// Password to be used if basic auth is required.
// Optional.
Password string

// Process contains the information about the exporting process.
Process Process

//BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int
}

// WithOnError sets the hook to be called when there is
// an error occurred when uploading the span data.
// If no custom hook is set, errors are logged.
func WithOnError(onError func(err error)) func(o *options) {
return func(o *options) {
o.OnError = onError
}
}

// WithProcess sets the process with the information about the exporting process.
func WithProcess(process Process) func(o *options) {
return func(o *options) {
o.Process = process
}
}

//WithBufferMaxCount defines the total number of traces that can be buffered in memory
func WithBufferMaxCount(bufferMaxCount int) func(o *options) {
return func(o *options) {
o.BufferMaxCount = bufferMaxCount
}
}

// NewExporter returns a trace.Exporter implementation that exports
// the collected spans to Jaeger.
func NewExporter(o Options) (*Exporter, error) {
if o.CollectorEndpoint == "" && o.AgentEndpoint == "" {
return nil, errors.New("missing endpoint for Jaeger exporter")
func NewExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
uploader, err := endpointOption()
if err != nil {
return nil, err
}

var endpoint string
var client *agentClientUDP
var err error
if o.CollectorEndpoint != "" {
endpoint = o.CollectorEndpoint
} else {
client, err = newAgentClientUDP(o.AgentEndpoint, udpPacketMaxLength)
if err != nil {
return nil, err
}
o := options{}
for _, opt := range opts {
opt(&o)
}

onError := func(err error) {
if o.OnError != nil {
o.OnError(err)
Expand All @@ -99,11 +95,7 @@ func NewExporter(o Options) (*Exporter, error) {
tags[i] = attributeToTag(tag.key, tag.value)
}
e := &Exporter{
endpoint: endpoint,
agentEndpoint: o.AgentEndpoint,
client: client,
username: o.Username,
password: o.Password,
uploader: uploader,
process: &gen.Process{
ServiceName: service,
Tags: tags,
Expand Down Expand Up @@ -145,13 +137,9 @@ type Tag struct {

// Exporter is an implementation of trace.Exporter that uploads spans to Jaeger.
type Exporter struct {
endpoint string
agentEndpoint string
process *gen.Process
bundler *bundler.Bundler
client *agentClientUDP

username, password string
process *gen.Process
bundler *bundler.Bundler
uploader batchUploader
}

var _ trace.Exporter = (*Exporter)(nil)
Expand Down Expand Up @@ -328,48 +316,6 @@ func (e *Exporter) upload(spans []*gen.Span) error {
Spans: spans,
Process: e.process,
}
if e.endpoint != "" {
return e.uploadCollector(batch)
}
return e.uploadAgent(batch)
}

func (e *Exporter) uploadAgent(batch *gen.Batch) error {
return e.client.EmitBatch(batch)
}

func (e *Exporter) uploadCollector(batch *gen.Batch) error {
body, err := serialize(batch)
if err != nil {
return err
}
req, err := http.NewRequest("POST", e.endpoint, body)
if err != nil {
return err
}
if e.username != "" && e.password != "" {
req.SetBasicAuth(e.username, e.password)
}
req.Header.Set("Content-Type", "application/x-thrift")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
}
return nil
}

func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
buf := thrift.NewTMemoryBuffer()
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
return nil, err
}
return buf.Buffer, nil
return e.uploader.upload(batch)
}
141 changes: 141 additions & 0 deletions exporter/trace/jaeger/uploader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package jaeger

import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"

"github.com/apache/thrift/lib/go/thrift"

gen "go.opentelemetry.io/exporter/trace/jaeger/internal/gen-go/jaeger"
)

// batchUploader send a batch of spans to Jaeger
type batchUploader interface {
upload(batch *gen.Batch) error
}

type EndpointOption func() (batchUploader, error)

// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
func WithAgentEndpoint(agentEndpoint string) func() (batchUploader, error) {
return func() (batchUploader, error) {
if agentEndpoint == "" {
return nil, errors.New("agentEndpoint must not be empty.")
}

client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength)
if err != nil {
return nil, err
}

return &agentUploader{client: client}, nil
}
}

// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) func() (batchUploader, error) {
return func() (batchUploader, error) {
if collectorEndpoint == "" {
return nil, errors.New("collectorEndpoint must not be empty.")
}

o := &CollectorEndpointOptions{}
for _, opt := range options {
opt(o)
}

return &collectorUploader{
endpoint: collectorEndpoint,
username: o.username,
password: o.password,
}, nil
}
}

type CollectorEndpointOption func(o *CollectorEndpointOptions)

type CollectorEndpointOptions struct {
// username to be used if basic auth is required.
username string

// password to be used if basic auth is required.
password string
}

// WithUsername sets the username to be used if basic auth is required.
func WithUsername(username string) func(o *CollectorEndpointOptions) {
return func(o *CollectorEndpointOptions) {
o.username = username
}
}

// WithPassword sets the password to be used if basic auth is required.
func WithPassword(password string) func(o *CollectorEndpointOptions) {
return func(o *CollectorEndpointOptions) {
o.password = password
}
}

// agentUploader implements batchUploader interface sending batches to
// Jaeger through the UDP agent.
type agentUploader struct {
client *agentClientUDP
}

var _ batchUploader = (*agentUploader)(nil)

func (a *agentUploader) upload(batch *gen.Batch) error {
return a.client.EmitBatch(batch)
}

// collectorUploader implements batchUploader interface sending batches to
// Jaeger through the collector http endpoint.
type collectorUploader struct {
endpoint string
username string
password string
}

var _ batchUploader = (*collectorUploader)(nil)

func (c *collectorUploader) upload(batch *gen.Batch) error {
body, err := serialize(batch)
if err != nil {
return err
}
req, err := http.NewRequest("POST", c.endpoint, body)
if err != nil {
return err
}
if c.username != "" && c.password != "" {
req.SetBasicAuth(c.username, c.password)
}
req.Header.Set("Content-Type", "application/x-thrift")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

_, _ = io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("failed to upload traces; HTTP status code: %d", resp.StatusCode)
}
return nil
}

func serialize(obj thrift.TStruct) (*bytes.Buffer, error) {
buf := thrift.NewTMemoryBuffer()
if err := obj.Write(thrift.NewTBinaryProtocolTransport(buf)); err != nil {
return nil, err
}
return buf.Buffer, nil
}

0 comments on commit b1bb19a

Please sign in to comment.