Skip to content

Commit

Permalink
Add http zipkin.thrift support (#180)
Browse files Browse the repository at this point in the history
- Accept traces over http on `http://localhost:14268/api/traces?format=zipkin.thrift`
- The body of the http request should contain the thrift binary representation of a zipkin style list of spans encoded using thrift's binary protocol
  • Loading branch information
vprithvi authored May 30, 2017
1 parent 6cf068b commit 5db73b6
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 27 deletions.
73 changes: 60 additions & 13 deletions cmd/collector/app/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,28 @@ import (
tchanThrift "github.com/uber/tchannel-go/thrift"

tJaeger "github.com/uber/jaeger/thrift-gen/jaeger"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

const (
formatParam = "format"
formatParam = "format"
unableToReadBodyErrFormat = "Unable to process request body: %v"
)

// APIHandler handles all HTTP calls to the collector
type APIHandler struct {
jaegerBatchesHandler JaegerBatchesHandler
zipkinSpansHandler ZipkinSpansHandler
}

// NewAPIHandler returns a new APIHandler
func NewAPIHandler(jaegerBatchesHandler JaegerBatchesHandler) *APIHandler {
func NewAPIHandler(
jaegerBatchesHandler JaegerBatchesHandler,
zipkinSpansHandler ZipkinSpansHandler,
) *APIHandler {
return &APIHandler{
jaegerBatchesHandler: jaegerBatchesHandler,
zipkinSpansHandler: zipkinSpansHandler,
}
}

Expand All @@ -56,31 +63,71 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
}

func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
bodyBytes, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
http.Error(w, fmt.Sprintf(unableToReadBodyErrFormat, err), http.StatusInternalServerError)
return
}

format := r.FormValue(formatParam)
switch strings.ToLower(format) {
case "jaeger.thrift":
bodyBytes, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
http.Error(w, fmt.Sprintf("Unable to read from body due to error: %v", err), http.StatusBadRequest)
return
}

tdes := thrift.NewTDeserializer()
// (NB): We decided to use this struct instead of straight batches to be as consistent with tchannel intake as possible.
var req tJaeger.CollectorSubmitBatchesArgs
if err = tdes.Read(&req, bodyBytes); err != nil {
http.Error(w, fmt.Sprintf("Cannot deserialize body due to error: %v", err), http.StatusBadRequest)
http.Error(w, fmt.Sprintf(unableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
ctx, cancel := tchanThrift.NewContext(time.Minute)
defer cancel()
if _, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, req.Batches); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch due to error: %v", err), http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
}

case "zipkin.thrift":
spans, err := deserializeZipkin(bodyBytes)
if err != nil {
http.Error(w, fmt.Sprintf(unableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}

ctx, _ := tchanThrift.NewContext(time.Minute)
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, spans); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
return
}

default:
http.Error(w, fmt.Sprintf("Unsupported format type: %v", format), http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
}

func deserializeZipkin(b []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(b)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

// We don't depend on the size returned by ReadListBegin to preallocate the array because it
// sometimes returns a nil error on bad input and provides an unreasonably large int for size
var spans []*zipkincore.Span
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans = append(spans, zs)
}

return spans, nil
}
94 changes: 81 additions & 13 deletions cmd/collector/app/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,30 @@ import (

"github.com/uber/jaeger/thrift-gen/jaeger"
tJaeger "github.com/uber/jaeger/thrift-gen/jaeger"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

var httpClient = &http.Client{Timeout: 2 * time.Second}

type possiblyErroringJaegerBatchesHandler struct {
type mockJaegerHandler struct {
err error
}

func (p *possiblyErroringJaegerBatchesHandler) SubmitBatches(ctx tchanThrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
func (p *mockJaegerHandler) SubmitBatches(ctx tchanThrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
return nil, p.err
}

type mockZipkinHandler struct {
err error
}

func (p *mockZipkinHandler) SubmitZipkinBatch(ctx tchanThrift.Context, batches []*zipkincore.Span) ([]*zipkincore.Response, error) {
return nil, p.err
}

func initializeTestServer(err error) (*httptest.Server, *APIHandler) {
r := mux.NewRouter()
handler := NewAPIHandler(&possiblyErroringJaegerBatchesHandler{err})
handler := NewAPIHandler(&mockJaegerHandler{err}, &mockZipkinHandler{err})
handler.RegisterRoutes(r)
return httptest.NewServer(r), handler
}
Expand Down Expand Up @@ -78,11 +87,11 @@ func TestJaegerFormat(t *testing.T) {
assert.EqualValues(t, http.StatusOK, statusCode)
assert.EqualValues(t, "", resBodyStr)

handler.jaegerBatchesHandler.(*possiblyErroringJaegerBatchesHandler).err = fmt.Errorf("Bad times ahead")
handler.jaegerBatchesHandler.(*mockJaegerHandler).err = fmt.Errorf("Bad times ahead")
statusCode, resBodyStr, err = postJSON(server.URL+`/api/traces?format=jaeger.thrift`, someBytes)
assert.NoError(t, err)
assert.EqualValues(t, http.StatusInternalServerError, statusCode)
assert.EqualValues(t, "Cannot submit Jaeger batch due to error: Bad times ahead\n", resBodyStr)
assert.EqualValues(t, "Cannot submit Jaeger batch: Bad times ahead\n", resBodyStr)
}

func TestJaegerFormatBadBody(t *testing.T) {
Expand All @@ -92,7 +101,7 @@ func TestJaegerFormatBadBody(t *testing.T) {
statusCode, resBodyStr, err := postJSON(server.URL+`/api/traces?format=jaeger.thrift`, bodyBytes)
assert.NoError(t, err)
assert.EqualValues(t, http.StatusBadRequest, statusCode)
assert.EqualValues(t, "Cannot deserialize body due to error: *jaeger.CollectorSubmitBatchesArgs field 25711 read error: unexpected EOF\n", resBodyStr)
assert.EqualValues(t, "Unable to process request body: *jaeger.CollectorSubmitBatchesArgs field 25711 read error: unexpected EOF\n", resBodyStr)
}

func TestWrongFormat(t *testing.T) {
Expand All @@ -105,13 +114,22 @@ func TestWrongFormat(t *testing.T) {
}

func TestCannotReadBodyFromRequest(t *testing.T) {
handler := NewAPIHandler(&possiblyErroringJaegerBatchesHandler{nil})
req, err := http.NewRequest(http.MethodPost, `/api/traces?format=jaeger.thrift`, &errReader{})
assert.NoError(t, err)
rw := dummyResponseWriter{}
handler.saveSpan(&rw, req)
assert.EqualValues(t, http.StatusBadRequest, rw.myStatusCode)
assert.EqualValues(t, "Unable to read from body due to error: Simulated error reading body\n", rw.myBody)
handler := NewAPIHandler(&mockJaegerHandler{nil}, &mockZipkinHandler{nil})

testCases := []struct {
url string
}{
{`/api/traces?format=jaeger.thrift`},
{`/api/traces?format=zipkin.thrift`},
}
for _, testCase := range testCases {
req, err := http.NewRequest(http.MethodPost, testCase.url, &errReader{})
assert.NoError(t, err)
rw := dummyResponseWriter{}
handler.saveSpan(&rw, req)
assert.EqualValues(t, http.StatusInternalServerError, rw.myStatusCode)
assert.EqualValues(t, "Unable to process request body: Simulated error reading body\n", rw.myBody)
}
}

type errReader struct{}
Expand Down Expand Up @@ -155,3 +173,53 @@ func postJSON(urlStr string, bodyBytes []byte) (int, string, error) {
}
return res.StatusCode, string(body), nil
}

func TestZipkinFormat(t *testing.T) {
span := &zipkincore.Span{}
spans := []*zipkincore.Span{}
spans = append(spans, span)
server, handler := initializeTestServer(nil)
defer server.Close()

bodyBytes := zipkinSerialize(spans)
statusCode, resBodyStr, err := postJSON(server.URL+`/api/traces?format=zipkin.thrift`, bodyBytes)
assert.NoError(t, err)
assert.EqualValues(t, http.StatusOK, statusCode)
assert.EqualValues(t, "", resBodyStr)

handler.zipkinSpansHandler.(*mockZipkinHandler).err = fmt.Errorf("Bad times ahead")
statusCode, resBodyStr, err = postJSON(server.URL+`/api/traces?format=zipkin.thrift`, bodyBytes)
assert.NoError(t, err)
assert.EqualValues(t, http.StatusInternalServerError, statusCode)
assert.EqualValues(t, "Cannot submit Zipkin batch: Bad times ahead\n", resBodyStr)
}

func zipkinSerialize(spans []*zipkincore.Span) []byte {
t := thrift.NewTMemoryBuffer()
p := thrift.NewTBinaryProtocolTransport(t)
p.WriteListBegin(thrift.STRUCT, len(spans))
for _, s := range spans {
s.Write(p)
}
p.WriteListEnd()
return t.Buffer.Bytes()
}

func TestZipkinFormatBadBody(t *testing.T) {
server, _ := initializeTestServer(nil)
defer server.Close()
bodyBytes := []byte("not good")
statusCode, resBodyStr, err := postJSON(server.URL+`/api/traces?format=zipkin.thrift`, bodyBytes)
assert.NoError(t, err)
assert.EqualValues(t, http.StatusBadRequest, statusCode)
assert.EqualValues(t, "Unable to process request body: *zipkincore.Span field 0 read error: EOF\n", resBodyStr)
}

func TestDeserializeZipkinWithBadListStart(t *testing.T) {
span := &zipkincore.Span{TraceID: 12, Name: "test"}
spans := []*zipkincore.Span{}
spans = append(spans, span)
spanBytes := zipkinSerialize(spans)
_, err := deserializeZipkin(append([]byte{0, 255, 255}, spanBytes...))
assert.Error(t, err)
}
2 changes: 1 addition & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
ch.Serve(listener)

r := mux.NewRouter()
apiHandler := app.NewAPIHandler(jaegerBatchesHandler)
apiHandler := app.NewAPIHandler(jaegerBatchesHandler, zipkinSpansHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(*builder.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
Expand Down

0 comments on commit 5db73b6

Please sign in to comment.