Skip to content

Commit

Permalink
[dbnode] Direct conversion of encoded tags to doc.Metadata (#3087)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis authored Jan 20, 2021
1 parent 24cbe1c commit 611166b
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 26 deletions.
72 changes: 72 additions & 0 deletions src/dbnode/storage/index/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"fmt"
"unicode/utf8"

"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
)

const (
Expand Down Expand Up @@ -188,6 +190,76 @@ func FromSeriesIDAndTagIter(id ident.ID, tags ident.TagIterator) (doc.Metadata,
return d, nil
}

// FromSeriesIDAndEncodedTags converts the provided series id and encoded tags into a doc.Metadata.
func FromSeriesIDAndEncodedTags(id ident.BytesID, encodedTags ts.EncodedTags) (doc.Metadata, error) {
var (
byteOrder = serialize.ByteOrder
total = len(encodedTags)
)
if total < 4 {
return doc.Metadata{}, fmt.Errorf("encoded tags too short: size=%d, need=%d", total, 4)
}

header := byteOrder.Uint16(encodedTags[:2])
encodedTags = encodedTags[2:]
if header != serialize.HeaderMagicNumber {
return doc.Metadata{}, serialize.ErrIncorrectHeader
}

length := int(byteOrder.Uint16(encodedTags[:2]))
encodedTags = encodedTags[2:]

var (
clonedID = clone(id.Bytes())
fields = make([]doc.Field, 0, length)
expectedStart = firstTagBytesPosition
)

for i := 0; i < length; i++ {
if len(encodedTags) < 2 {
return doc.Metadata{}, fmt.Errorf("missing size for tag name: index=%d", i)
}
numBytesName := int(byteOrder.Uint16(encodedTags[:2]))
if numBytesName == 0 {
return doc.Metadata{}, serialize.ErrEmptyTagNameLiteral
}
encodedTags = encodedTags[2:]

bytesName := encodedTags[:numBytesName]
encodedTags = encodedTags[numBytesName:]

if len(encodedTags) < 2 {
return doc.Metadata{}, fmt.Errorf("missing size for tag value: index=%d", i)
}

numBytesValue := int(byteOrder.Uint16(encodedTags[:2]))
encodedTags = encodedTags[2:]

bytesValue := encodedTags[:numBytesValue]
encodedTags = encodedTags[numBytesValue:]

var clonedName, clonedValue []byte
clonedName, expectedStart = findSliceOrClone(clonedID, bytesName, expectedStart,
distanceBetweenTagNameAndValue)
clonedValue, expectedStart = findSliceOrClone(clonedID, bytesValue, expectedStart,
distanceBetweenTagValueAndNextName)

fields = append(fields, doc.Field{
Name: clonedName,
Value: clonedValue,
})
}

d := doc.Metadata{
ID: clonedID,
Fields: fields,
}
if err := Validate(d); err != nil {
return doc.Metadata{}, err
}
return d, nil
}

func findSliceOrClone(id, tag []byte, expectedStart, nextPositionDistance int) ([]byte, int) { //nolint:unparam
n := len(tag)
expectedEnd := expectedStart + n
Expand Down
36 changes: 34 additions & 2 deletions src/dbnode/storage/index/convert/convert_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ import (
)

type idWithEncodedTags struct {
id ident.ID
id ident.BytesID
encodedTags []byte
}

type idWithTags struct {
id ident.ID
id ident.BytesID
tags ident.Tags
}

Expand Down Expand Up @@ -155,6 +155,38 @@ func BenchmarkFromSeriesIDAndTags(b *testing.B) {
}
}

func BenchmarkFromSeriesIDAndEncodedTags(b *testing.B) {
testData, err := prepareIDAndEncodedTags(b)
require.NoError(b, err)

b.ResetTimer()
for i := range testData {
_, err := FromSeriesIDAndEncodedTags(testData[i].id, testData[i].encodedTags)
require.NoError(b, err)
}
}

func BenchmarkFromSeriesIDAndTagIter_TagDecoder(b *testing.B) {
testData, err := prepareIDAndEncodedTags(b)
require.NoError(b, err)

decoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions(),
)
decoderPool.Init()

decoder := decoderPool.Get()
defer decoder.Close()

b.ResetTimer()
for i := range testData {
decoder.Reset(checked.NewBytes(testData[i].encodedTags, nil))
_, err := FromSeriesIDAndTagIter(testData[i].id, decoder)
require.NoError(b, err)
}
}

func prepareIDAndEncodedTags(b *testing.B) ([]idWithEncodedTags, error) {
var (
rnd = rand.New(rand.NewSource(42)) //nolint:gosec
Expand Down
110 changes: 108 additions & 2 deletions src/dbnode/storage/index/convert/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -171,6 +172,101 @@ func TestFromSeriesIDAndTagIterReuseBytesFromSeriesId(t *testing.T) {
}
}

func TestFromSeriesIDAndEncodedTags(t *testing.T) {
tests := []struct {
name string
id string
}{
{
name: "no tags in ID",
id: "foo",
},
{
name: "tags in ID",
id: "bar=baz,quip=quix",
},
{
name: "tags in ID with specific format",
id: `{bar="baz",quip="quix"}`,
},
{
name: "tags in ID with specific format reverse order",
id: `{quip="quix",bar="baz"}`,
},
{
name: "inexact tag occurrence in ID",
id: "quixquip_bazillion_barometers",
},
}
var (
tags = ident.NewTags(
ident.StringTag("bar", "baz"),
ident.StringTag("quip", "quix"),
)
encodedTags = toEncodedTags(t, tags)
)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
seriesID := ident.BytesID(tt.id)
d, err := convert.FromSeriesIDAndEncodedTags(seriesID, encodedTags)
assert.NoError(t, err)
assertContentsMatch(t, seriesID, tags.Values(), d)
for i := range d.Fields {
assertBackedBySameData(t, d.ID, d.Fields[i].Name)
assertBackedBySameData(t, d.ID, d.Fields[i].Value)
}
})
}
}

func TestFromSeriesIDAndEncodedTagsInvalid(t *testing.T) {
var (
validEncodedTags = []byte{117, 39, 1, 0, 3, 0, 98, 97, 114, 3, 0, 98, 97, 122}
tagsWithReservedName = toEncodedTags(t, ident.NewTags(
ident.StringTag(string(convert.ReservedFieldNameID), "some_value"),
))
)

tests := []struct {
name string
encodedTags []byte
}{
{
name: "reserved tag name",
encodedTags: tagsWithReservedName,
},
{
name: "incomplete header",
encodedTags: validEncodedTags[:3],
},
{
name: "incomplete tag name length",
encodedTags: validEncodedTags[:5],
},
{
name: "incomplete tag value length",
encodedTags: validEncodedTags[:10],
},
{
name: "invalid magic number",
encodedTags: []byte{42, 42, 0, 0},
},
{
name: "empty tag name",
encodedTags: []byte{117, 39, 1, 0, 0, 0, 3, 0, 98, 97, 122},
},
}
seriesID := ident.BytesID("foo")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := convert.FromSeriesIDAndEncodedTags(seriesID, tt.encodedTags)
assert.Error(t, err)
})
}
}

func TestToSeriesValid(t *testing.T) {
d := doc.Metadata{
ID: []byte("foo"),
Expand Down Expand Up @@ -307,7 +403,17 @@ func assertBackedBySameData(t *testing.T, outer, inner []byte) {
if idx := bytes.Index(outer, inner); idx != -1 {
subslice := outer[idx : idx+len(inner)]
assert.True(t, test.ByteSlicesBackedBySameData(subslice, inner))
} else {
assert.Fail(t, "inner byte sequence wasn't found")
}
}

func toEncodedTags(t *testing.T, tags ident.Tags) []byte {
pool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), nil)
pool.Init()
encoder := pool.Get()
defer encoder.Finalize()

require.NoError(t, encoder.Encode(ident.NewTagsIterator(tags)))
data, ok := encoder.Data()
require.True(t, ok)
return append([]byte(nil), data.Bytes()...)
}
10 changes: 6 additions & 4 deletions src/x/serialize/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
)

var (
errIncorrectHeader = errors.New("header magic number does not match expected value")
// ErrIncorrectHeader is an error when encoded tag byte sequence doesn't start with
// an expected magic number.
ErrIncorrectHeader = errors.New("header magic number does not match expected value")
errInvalidByteStreamIDDecoding = errors.New("internal error, invalid byte stream while decoding ID")
errInvalidByteStreamUintDecoding = errors.New("internal error, invalid byte stream while decoding uint")
)
Expand Down Expand Up @@ -78,8 +80,8 @@ func (d *decoder) Reset(b checked.Bytes) {
return
}

if header != headerMagicNumber {
d.err = errIncorrectHeader
if header != HeaderMagicNumber {
d.err = ErrIncorrectHeader
return
}

Expand Down Expand Up @@ -129,7 +131,7 @@ func (d *decoder) decodeTag() error {
// safe to call Bytes() as d.current.Name has inc'd a ref
if len(d.currentTagName.Bytes()) == 0 {
d.releaseCurrent()
return errEmptyTagNameLiteral
return ErrEmptyTagNameLiteral
}

if err := d.decodeIDInto(d.currentTagValue); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions src/x/serialize/decoder_fast.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ func TagValueFromEncodedTagsFast(
"encoded tags too short: size=%d, need=%d", total, 4)
}

header := byteOrder.Uint16(encodedTags[:2])
header := ByteOrder.Uint16(encodedTags[:2])
encodedTags = encodedTags[2:]
if header != headerMagicNumber {
return nil, false, errIncorrectHeader
if header != HeaderMagicNumber {
return nil, false, ErrIncorrectHeader
}

length := int(byteOrder.Uint16(encodedTags[:2]))
length := int(ByteOrder.Uint16(encodedTags[:2]))
encodedTags = encodedTags[2:]

for i := 0; i < length; i++ {
if len(encodedTags) < 2 {
return nil, false, fmt.Errorf("missing size for tag name: index=%d", i)
}
numBytesName := int(byteOrder.Uint16(encodedTags[:2]))
numBytesName := int(ByteOrder.Uint16(encodedTags[:2]))
if numBytesName == 0 {
return nil, false, errEmptyTagNameLiteral
return nil, false, ErrEmptyTagNameLiteral
}
encodedTags = encodedTags[2:]

Expand All @@ -63,7 +63,7 @@ func TagValueFromEncodedTagsFast(
return nil, false, fmt.Errorf("missing size for tag value: index=%d", i)
}

numBytesValue := int(byteOrder.Uint16(encodedTags[:2]))
numBytesValue := int(ByteOrder.Uint16(encodedTags[:2]))
encodedTags = encodedTags[2:]

bytesValue := encodedTags[:numBytesValue]
Expand Down
18 changes: 10 additions & 8 deletions src/x/serialize/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,20 @@ import (
*/

var (
byteOrder binary.ByteOrder = binary.LittleEndian
// ByteOrder is the byte order used for encoding tags into a byte sequence.
ByteOrder binary.ByteOrder = binary.LittleEndian
headerMagicBytes = make([]byte, 2)
)

func init() {
encodeUInt16(headerMagicNumber, headerMagicBytes)
encodeUInt16(HeaderMagicNumber, headerMagicBytes)
}

var (
errTagEncoderInUse = errors.New("encoder already in use")
errTagLiteralTooLong = errors.New("literal is too long")
errEmptyTagNameLiteral = xerrors.NewInvalidParamsError(errors.New("tag name cannot be empty"))
errTagEncoderInUse = errors.New("encoder already in use")
errTagLiteralTooLong = errors.New("literal is too long")
// ErrEmptyTagNameLiteral is an error when encoded tag name is empty.
ErrEmptyTagNameLiteral = xerrors.NewInvalidParamsError(errors.New("tag name cannot be empty"))
)

type newCheckedBytesFn func([]byte, checked.BytesOptions) checked.Bytes
Expand Down Expand Up @@ -166,7 +168,7 @@ func (e *encoder) Finalize() {

func (e *encoder) encodeTag(t ident.Tag) error {
if len(t.Name.Bytes()) == 0 {
return errEmptyTagNameLiteral
return ErrEmptyTagNameLiteral
}

if err := e.encodeID(t.Name); err != nil {
Expand Down Expand Up @@ -204,10 +206,10 @@ func (e *encoder) encodeUInt16(v uint16) []byte {
}

func encodeUInt16(v uint16, dest []byte) []byte {
byteOrder.PutUint16(dest, v)
ByteOrder.PutUint16(dest, v)
return dest
}

func decodeUInt16(b []byte) uint16 {
return byteOrder.Uint16(b)
return ByteOrder.Uint16(b)
}
Loading

0 comments on commit 611166b

Please sign in to comment.