Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
exporter/zipkin: option to set RemoteEndpoint
Browse files Browse the repository at this point in the history
Provide an option `WithRemoteEndpoint` that allows
setting the remote endpoint of a Zipkin exporter.
This change is necessary because the constructor
NewExporter only takes in a local endpoint, implying
that remoteEndpoint is optional but when necessary,
we need to send this information along since it is
used by Zipkin to construct a service graph.
I found this issue while working on the OpenCensus service/agent.

Fixes #959
  • Loading branch information
odeke-em committed Oct 25, 2018
1 parent 96e75b8 commit acc477d
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 12 deletions.
36 changes: 36 additions & 0 deletions exporter/zipkin/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zipkin

import "github.com/openzipkin/zipkin-go/model"

type Option interface {
mutateExporter(*Exporter)
}

type remoteEndpoint struct {
endpoint *model.Endpoint
}

var _ Option = (*remoteEndpoint)(nil)

// WithRemoteEndpoint sets the remote endpoint of the exporter.
func WithRemoteEndpoint(endpoint *model.Endpoint) Option {
return &remoteEndpoint{endpoint: endpoint}
}

func (re *remoteEndpoint) mutateExporter(exp *Exporter) {
exp.remoteEndpoint = re.endpoint
}
30 changes: 19 additions & 11 deletions exporter/zipkin/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
// Exporter is an implementation of trace.Exporter that uploads spans to a
// Zipkin server.
type Exporter struct {
reporter reporter.Reporter
localEndpoint *model.Endpoint
reporter reporter.Reporter
localEndpoint *model.Endpoint
remoteEndpoint *model.Endpoint
}

// NewExporter returns an implementation of trace.Exporter that uploads spans
Expand All @@ -42,16 +43,22 @@ type Exporter struct {
// constructed with github.com/openzipkin/zipkin-go.NewEndpoint, e.g.:
// localEndpoint, err := NewEndpoint("my server", listener.Addr().String())
// localEndpoint can be nil.
func NewExporter(reporter reporter.Reporter, localEndpoint *model.Endpoint) *Exporter {
return &Exporter{
func NewExporter(reporter reporter.Reporter, localEndpoint *model.Endpoint, opts ...Option) *Exporter {
exp := &Exporter{
reporter: reporter,
localEndpoint: localEndpoint,
}

for _, opt := range opts {
opt.mutateExporter(exp)
}

return exp
}

// ExportSpan exports a span to a Zipkin server.
func (e *Exporter) ExportSpan(s *trace.SpanData) {
e.reporter.Send(zipkinSpan(s, e.localEndpoint))
e.reporter.Send(zipkinSpan(s, e.localEndpoint, e.remoteEndpoint))
}

const (
Expand Down Expand Up @@ -110,19 +117,20 @@ func spanKind(s *trace.SpanData) model.Kind {
return model.Undetermined
}

func zipkinSpan(s *trace.SpanData, localEndpoint *model.Endpoint) model.SpanModel {
func zipkinSpan(s *trace.SpanData, localEndpoint, remoteEndpoint *model.Endpoint) model.SpanModel {
sc := s.SpanContext
z := model.SpanModel{
SpanContext: model.SpanContext{
TraceID: convertTraceID(sc.TraceID),
ID: convertSpanID(sc.SpanID),
Sampled: &sampledTrue,
},
Kind: spanKind(s),
Name: s.Name,
Timestamp: s.StartTime,
Shared: false,
LocalEndpoint: localEndpoint,
Kind: spanKind(s),
Name: s.Name,
Timestamp: s.StartTime,
Shared: false,
LocalEndpoint: localEndpoint,
RemoteEndpoint: remoteEndpoint,
}

if s.ParentSpanID != (trace.SpanID{}) {
Expand Down
87 changes: 86 additions & 1 deletion exporter/zipkin/zipkin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package zipkin

import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"sync"
"testing"
"time"

openzipkin "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
httpreporter "github.com/openzipkin/zipkin-go/reporter/http"
"go.opencensus.io/trace"
Expand Down Expand Up @@ -212,7 +216,7 @@ func TestExport(t *testing.T) {
},
}
for _, tt := range tests {
got := zipkinSpan(tt.span, nil)
got := zipkinSpan(tt.span, nil, nil)
if len(got.Annotations) != len(tt.want.Annotations) {
t.Fatalf("zipkinSpan: got %d annotations in span, want %d", len(got.Annotations), len(tt.want.Annotations))
}
Expand Down Expand Up @@ -253,3 +257,84 @@ func TestExport(t *testing.T) {
}
}
}

// Ensure that we can pass in a remote endpoint but also that it is
// transmitted to its origina. Issue #959
func TestRemoteEndpointOptionAndTransmission(t *testing.T) {
type lockableBuffer struct {
sync.Mutex
*bytes.Buffer
}

buf := &lockableBuffer{Mutex: sync.Mutex{}, Buffer: new(bytes.Buffer)}

cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
blob, _ := ioutil.ReadAll(r.Body)
_ = r.Body.Close()
buf.Lock()
buf.Write(blob)
buf.Unlock()
}))
defer cst.Close()

reporter := httpreporter.NewReporter(cst.URL, httpreporter.BatchInterval(10*time.Millisecond))
localEndpoint, _ := openzipkin.NewEndpoint("app", "10.0.0.17")
remoteEndpoint, _ := openzipkin.NewEndpoint("memcached", "10.0.0.42")
exp := NewExporter(reporter, localEndpoint, WithRemoteEndpoint(remoteEndpoint))
exp.ExportSpan(&trace.SpanData{
Name: "Test",
})

// Wait for the upload
<-time.After(300 * time.Millisecond)

want := `[{
"traceId":"0000000000000000",
"id":"0000000000000000",
"name":"Test",
"localEndpoint":{
"serviceName":"app",
"ipv4":"10.0.0.17"
},
"remoteEndpoint":{
"serviceName":"memcached","ipv4":"10.0.0.42"
}
}]`

buf.Lock()
got := buf.String()
buf.Unlock()

// Since the reported JSON could contain spaces and other indentation,
// strip spaces out but also the fields could be mangled so we'll instead
// just use an anagram equivalence to ensure all the output is present
replacer := strings.NewReplacer(" ", "", "\t", "", "\n", "")
wj := replacer.Replace(want)
gj := replacer.Replace(got)
if !anagrams(gj, wj) {
t.Errorf("Mismatched JSON content\nGot:\n\t%s\nWant:\n\t%s", gj, wj)
}
}

func anagrams(s1, s2 string) bool {
if len(s1) != len(s2) {
return false
}
if s1 == "" && s2 == "" {
return true
}
m1 := make(map[byte]int)
for i := range s1 {
m1[s1[i]] += 1
m1[s2[i]] -= 1
}

// Finally check that all the values are at 0
// that is, all the letters in s1 were matched in s2
for _, count := range m1 {
if count != 0 {
return false
}
}
return true
}

0 comments on commit acc477d

Please sign in to comment.