Skip to content

Commit

Permalink
Parse Zipkin JSON non-string values (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavolloffay authored and yurishkuro committed Oct 30, 2017
1 parent 44de283 commit 975982b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmd/collector/app/zipkin/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
if contentType == "application/x-thrift" {
tSpans, err = deserializeThrift(bodyBytes)
} else if contentType == "application/json" {
tSpans, err = deserializeJSON(bodyBytes)
tSpans, err = DeserializeJSON(bodyBytes)
} else {
http.Error(w, "Unsupported Content-Type", http.StatusBadRequest)
return
Expand Down
71 changes: 65 additions & 6 deletions cmd/collector/app/zipkin/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
package zipkin

import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"strings"

Expand All @@ -35,9 +40,10 @@ type annotation struct {
Timestamp int64 `json:"timestamp"`
}
type binaryAnnotation struct {
Endpoint endpoint `json:"endpoint"`
Key string `json:"key"`
Value string `json:"value"`
Endpoint endpoint `json:"endpoint"`
Key string `json:"key"`
Value interface{} `json:"value"`
Type string `json:"type"`
}
type zipkinSpan struct {
ID string `json:"id"`
Expand All @@ -56,7 +62,8 @@ var (
errWrongIpv4 = errors.New("wrong ipv4")
)

func deserializeJSON(body []byte) ([]*zipkincore.Span, error) {
// DeserializeJSON deserialize zipkin v1 json spans into zipkin thrift
func DeserializeJSON(body []byte) ([]*zipkincore.Span, error) {
spans, err := decode(body)
if err != nil {
return nil, err
Expand Down Expand Up @@ -180,14 +187,66 @@ func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error)
return nil, err
}

var val []byte
var valType zipkincore.AnnotationType
switch ba.Type {
case "BOOL":
if ba.Value.(bool) {
val = []byte{1}
} else {
val = []byte{0}
}
valType = zipkincore.AnnotationType_BOOL
case "I16":
buff := new(bytes.Buffer)
binary.Write(buff, binary.LittleEndian, int16(ba.Value.(float64)))
val = buff.Bytes()
valType = zipkincore.AnnotationType_I16
case "I32":
buff := new(bytes.Buffer)
binary.Write(buff, binary.LittleEndian, int32(ba.Value.(float64)))
val = buff.Bytes()
valType = zipkincore.AnnotationType_I32
case "I64":
buff := new(bytes.Buffer)
binary.Write(buff, binary.LittleEndian, int64(ba.Value.(float64)))
val = buff.Bytes()
valType = zipkincore.AnnotationType_I64
case "DOUBLE":
val = float64bytes(ba.Value.(float64))
valType = zipkincore.AnnotationType_DOUBLE
case "BYTES":
val, err = base64.StdEncoding.DecodeString(ba.Value.(string))
if err != nil {
return nil, err
}
valType = zipkincore.AnnotationType_BYTES
case "STRING":
fallthrough
default:
str := fmt.Sprintf("%s", ba.Value)
val = []byte(str)
fmt.Println("default")
fmt.Println(str)
valType = zipkincore.AnnotationType_STRING
}

return &zipkincore.BinaryAnnotation{
Key: ba.Key,
Value: []byte(ba.Value),
Value: val,
Host: endpoint,
AnnotationType: zipkincore.AnnotationType_STRING,
AnnotationType: valType,
}, nil
}

// taken from https://stackoverflow.com/a/22492518/4158442
func float64bytes(float float64) []byte {
bits := math.Float64bits(float)
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, bits)
return bytes
}

func parseIpv4(str string) (int32, error) {
// TODO use net.ParseIP
segments := strings.Split(str, ".")
Expand Down
82 changes: 75 additions & 7 deletions cmd/collector/app/zipkin/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package zipkin

import (
"encoding/json"
"errors"
"fmt"
"math"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger/thrift-gen/zipkincore"
)

var endpointFmt = `{"serviceName": "%s", "ipv4": "%s", "ipv6": "%s", "port": %d}`
Expand All @@ -47,7 +49,7 @@ func createSpan(name string, id string, parentID string, traceID string, ts int6
}

func TestDecodeWrongJson(t *testing.T) {
spans, err := deserializeJSON([]byte(""))
spans, err := DeserializeJSON([]byte(""))
require.Error(t, err)
assert.Nil(t, spans)
}
Expand Down Expand Up @@ -77,10 +79,75 @@ func TestUnmarshalBinAnnotation(t *testing.T) {
err := json.Unmarshal([]byte(createBinAnno("foo", "bar", endpointJSON)), binAnno)
require.NoError(t, err)
assert.Equal(t, "foo", binAnno.Key)
assert.Equal(t, "bar", binAnno.Value)
assert.Equal(t, "bar", binAnno.Value.(string))
assert.Equal(t, "foo", binAnno.Endpoint.ServiceName)
}

func TestUnmarshalBinAnnotationNumberValue(t *testing.T) {
tests := []struct {
json string
expected zipkincore.BinaryAnnotation
err error
}{
{
json: `{"key":"foo", "value": 32768, "type": "I16"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{0x0, 0x80}, AnnotationType: zipkincore.AnnotationType_I16},
},
{
json: `{"key":"foo", "value": 32768, "type": "I32"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{0x00, 0x80, 0x00, 0x00}, AnnotationType: zipkincore.AnnotationType_I32},
},
{
json: `{"key":"foo", "value": 32768, "type": "I64"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, AnnotationType: zipkincore.AnnotationType_I64},
},
{
json: `{"key":"foo", "value": -12.666512, "type": "DOUBLE"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{122, 200, 148, 15, 65, 85, 41, 192}, AnnotationType: zipkincore.AnnotationType_DOUBLE},
},
{
json: `{"key":"foo", "value": true, "type": "BOOL"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{1}, AnnotationType: zipkincore.AnnotationType_BOOL},
},
{
json: `{"key":"foo", "value": false, "type": "BOOL"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte{0}, AnnotationType: zipkincore.AnnotationType_BOOL},
},
{
json: `{"key":"foo", "value": "str", "type": "STRING"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte("str"), AnnotationType: zipkincore.AnnotationType_STRING},
},
{
json: `{"key":"foo", "value": "c3Ry", "type": "BYTES"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte("str"), AnnotationType: zipkincore.AnnotationType_BYTES},
},
{
json: `{"key":"foo", "value": "^^^", "type": "BYTES"}`,
err: errors.New("illegal base64 data at input byte 0"),
},
{
json: `{"key":"foo", "value": "733c374d736e41cc"}`,
expected: zipkincore.BinaryAnnotation{Key: "foo", Value: []byte("733c374d736e41cc"), AnnotationType: zipkincore.AnnotationType_STRING},
},
}

for _, test := range tests {
binAnno := &binaryAnnotation{}
err := json.Unmarshal([]byte(test.json), binAnno)
require.NoError(t, err)
tBinAnno, err := binAnnoToThrift(*binAnno)
if test.err != nil {
require.Error(t, err, test.json)
require.Nil(t, tBinAnno)
assert.Equal(t, test.err.Error(), err.Error())
} else {
require.NoError(t, err)
assert.Equal(t, test.expected.Key, tBinAnno.Key)
assert.Equal(t, test.expected.Value, tBinAnno.Value)
}
}
}

func TestUnmarshalSpan(t *testing.T) {
endpJSON := createEndpoint("foo", "127.0.0.1", "2001:db8::c001", 66)
annoJSON := createAnno("cs", 1515, endpJSON)
Expand Down Expand Up @@ -113,34 +180,34 @@ func TestUnmarshalSpan(t *testing.T) {
func TestIncorrectSpanIds(t *testing.T) {
// id missing
spanJSON := createSpan("bar", "", "1", "2", 156, 15145, false, "", "")
spans, err := deserializeJSON([]byte(spanJSON))
spans, err := DeserializeJSON([]byte(spanJSON))
require.Error(t, err)
assert.Equal(t, errIsNotUnsignedLog, err)
assert.Nil(t, spans)
// id longer than 32
spanJSON = createSpan("bar", "123456789123456712345678912345678", "1", "2",
156, 15145, false, "", "")
spans, err = deserializeJSON([]byte(spanJSON))
spans, err = DeserializeJSON([]byte(spanJSON))
require.Error(t, err)
assert.Equal(t, errIsNotUnsignedLog, err)
assert.Nil(t, spans)
// traceId missing
spanJSON = createSpan("bar", "2", "1", "", 156, 15145, false,
"", "")
spans, err = deserializeJSON([]byte(spanJSON))
spans, err = DeserializeJSON([]byte(spanJSON))
require.Error(t, err)
assert.Equal(t, errIsNotUnsignedLog, err)
assert.Nil(t, spans)
// 128 bit traceId
spanJSON = createSpan("bar", "2", "1", "12345678912345671234567891234567", 156, 15145, false,
"", "")
spans, err = deserializeJSON([]byte(spanJSON))
spans, err = DeserializeJSON([]byte(spanJSON))
require.NoError(t, err)
assert.NotNil(t, spans)
// wrong 128 bit traceId
spanJSON = createSpan("bar", "22", "12", "#2345678912345671234567891234562", 156, 15145, false,
"", "")
spans, err = deserializeJSON([]byte(spanJSON))
spans, err = DeserializeJSON([]byte(spanJSON))
require.Error(t, err)
assert.Nil(t, spans)
}
Expand Down Expand Up @@ -218,6 +285,7 @@ func TestBinaryAnnotationToThrift(t *testing.T) {
Endpoint: endp,
Key: "error",
Value: "str",
Type: "STRING",
}
tBinAnno, err := binAnnoToThrift(binAnno)
require.NoError(t, err)
Expand Down

0 comments on commit 975982b

Please sign in to comment.