diff --git a/src/dbnode/storage/index/convert/convert.go b/src/dbnode/storage/index/convert/convert.go index 023589a51e..a6178b3ac9 100644 --- a/src/dbnode/storage/index/convert/convert.go +++ b/src/dbnode/storage/index/convert/convert.go @@ -26,10 +26,12 @@ import ( "fmt" "unicode/utf8" + "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" ) const ( @@ -188,6 +190,76 @@ func FromSeriesIDAndTagIter(id ident.ID, tags ident.TagIterator) (doc.Metadata, return d, nil } +// FromSeriesIDAndEncodedTags converts the provided series id and encoded tags into a doc.Metadata. +func FromSeriesIDAndEncodedTags(id ident.BytesID, encodedTags ts.EncodedTags) (doc.Metadata, error) { + var ( + byteOrder = serialize.ByteOrder + total = len(encodedTags) + ) + if total < 4 { + return doc.Metadata{}, fmt.Errorf("encoded tags too short: size=%d, need=%d", total, 4) + } + + header := byteOrder.Uint16(encodedTags[:2]) + encodedTags = encodedTags[2:] + if header != serialize.HeaderMagicNumber { + return doc.Metadata{}, serialize.ErrIncorrectHeader + } + + length := int(byteOrder.Uint16(encodedTags[:2])) + encodedTags = encodedTags[2:] + + var ( + clonedID = clone(id.Bytes()) + fields = make([]doc.Field, 0, length) + expectedStart = firstTagBytesPosition + ) + + for i := 0; i < length; i++ { + if len(encodedTags) < 2 { + return doc.Metadata{}, fmt.Errorf("missing size for tag name: index=%d", i) + } + numBytesName := int(byteOrder.Uint16(encodedTags[:2])) + if numBytesName == 0 { + return doc.Metadata{}, serialize.ErrEmptyTagNameLiteral + } + encodedTags = encodedTags[2:] + + bytesName := encodedTags[:numBytesName] + encodedTags = encodedTags[numBytesName:] + + if len(encodedTags) < 2 { + return doc.Metadata{}, fmt.Errorf("missing size for tag value: index=%d", i) + } + + numBytesValue := int(byteOrder.Uint16(encodedTags[:2])) + encodedTags = encodedTags[2:] + + bytesValue := encodedTags[:numBytesValue] + encodedTags = encodedTags[numBytesValue:] + + var clonedName, clonedValue []byte + clonedName, expectedStart = findSliceOrClone(clonedID, bytesName, expectedStart, + distanceBetweenTagNameAndValue) + clonedValue, expectedStart = findSliceOrClone(clonedID, bytesValue, expectedStart, + distanceBetweenTagValueAndNextName) + + fields = append(fields, doc.Field{ + Name: clonedName, + Value: clonedValue, + }) + } + + d := doc.Metadata{ + ID: clonedID, + Fields: fields, + } + if err := Validate(d); err != nil { + return doc.Metadata{}, err + } + return d, nil +} + func findSliceOrClone(id, tag []byte, expectedStart, nextPositionDistance int) ([]byte, int) { //nolint:unparam n := len(tag) expectedEnd := expectedStart + n diff --git a/src/dbnode/storage/index/convert/convert_benchmark_test.go b/src/dbnode/storage/index/convert/convert_benchmark_test.go index 0847e557e3..90781ed7ac 100644 --- a/src/dbnode/storage/index/convert/convert_benchmark_test.go +++ b/src/dbnode/storage/index/convert/convert_benchmark_test.go @@ -34,12 +34,12 @@ import ( ) type idWithEncodedTags struct { - id ident.ID + id ident.BytesID encodedTags []byte } type idWithTags struct { - id ident.ID + id ident.BytesID tags ident.Tags } @@ -155,6 +155,38 @@ func BenchmarkFromSeriesIDAndTags(b *testing.B) { } } +func BenchmarkFromSeriesIDAndEncodedTags(b *testing.B) { + testData, err := prepareIDAndEncodedTags(b) + require.NoError(b, err) + + b.ResetTimer() + for i := range testData { + _, err := FromSeriesIDAndEncodedTags(testData[i].id, testData[i].encodedTags) + require.NoError(b, err) + } +} + +func BenchmarkFromSeriesIDAndTagIter_TagDecoder(b *testing.B) { + testData, err := prepareIDAndEncodedTags(b) + require.NoError(b, err) + + decoderPool := serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + pool.NewObjectPoolOptions(), + ) + decoderPool.Init() + + decoder := decoderPool.Get() + defer decoder.Close() + + b.ResetTimer() + for i := range testData { + decoder.Reset(checked.NewBytes(testData[i].encodedTags, nil)) + _, err := FromSeriesIDAndTagIter(testData[i].id, decoder) + require.NoError(b, err) + } +} + func prepareIDAndEncodedTags(b *testing.B) ([]idWithEncodedTags, error) { var ( rnd = rand.New(rand.NewSource(42)) //nolint:gosec diff --git a/src/dbnode/storage/index/convert/convert_test.go b/src/dbnode/storage/index/convert/convert_test.go index aa72449797..8aa6c727d8 100644 --- a/src/dbnode/storage/index/convert/convert_test.go +++ b/src/dbnode/storage/index/convert/convert_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" @@ -171,6 +172,101 @@ func TestFromSeriesIDAndTagIterReuseBytesFromSeriesId(t *testing.T) { } } +func TestFromSeriesIDAndEncodedTags(t *testing.T) { + tests := []struct { + name string + id string + }{ + { + name: "no tags in ID", + id: "foo", + }, + { + name: "tags in ID", + id: "bar=baz,quip=quix", + }, + { + name: "tags in ID with specific format", + id: `{bar="baz",quip="quix"}`, + }, + { + name: "tags in ID with specific format reverse order", + id: `{quip="quix",bar="baz"}`, + }, + { + name: "inexact tag occurrence in ID", + id: "quixquip_bazillion_barometers", + }, + } + var ( + tags = ident.NewTags( + ident.StringTag("bar", "baz"), + ident.StringTag("quip", "quix"), + ) + encodedTags = toEncodedTags(t, tags) + ) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + seriesID := ident.BytesID(tt.id) + d, err := convert.FromSeriesIDAndEncodedTags(seriesID, encodedTags) + assert.NoError(t, err) + assertContentsMatch(t, seriesID, tags.Values(), d) + for i := range d.Fields { + assertBackedBySameData(t, d.ID, d.Fields[i].Name) + assertBackedBySameData(t, d.ID, d.Fields[i].Value) + } + }) + } +} + +func TestFromSeriesIDAndEncodedTagsInvalid(t *testing.T) { + var ( + validEncodedTags = []byte{117, 39, 1, 0, 3, 0, 98, 97, 114, 3, 0, 98, 97, 122} + tagsWithReservedName = toEncodedTags(t, ident.NewTags( + ident.StringTag(string(convert.ReservedFieldNameID), "some_value"), + )) + ) + + tests := []struct { + name string + encodedTags []byte + }{ + { + name: "reserved tag name", + encodedTags: tagsWithReservedName, + }, + { + name: "incomplete header", + encodedTags: validEncodedTags[:3], + }, + { + name: "incomplete tag name length", + encodedTags: validEncodedTags[:5], + }, + { + name: "incomplete tag value length", + encodedTags: validEncodedTags[:10], + }, + { + name: "invalid magic number", + encodedTags: []byte{42, 42, 0, 0}, + }, + { + name: "empty tag name", + encodedTags: []byte{117, 39, 1, 0, 0, 0, 3, 0, 98, 97, 122}, + }, + } + seriesID := ident.BytesID("foo") + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := convert.FromSeriesIDAndEncodedTags(seriesID, tt.encodedTags) + assert.Error(t, err) + }) + } +} + func TestToSeriesValid(t *testing.T) { d := doc.Metadata{ ID: []byte("foo"), @@ -307,7 +403,17 @@ func assertBackedBySameData(t *testing.T, outer, inner []byte) { if idx := bytes.Index(outer, inner); idx != -1 { subslice := outer[idx : idx+len(inner)] assert.True(t, test.ByteSlicesBackedBySameData(subslice, inner)) - } else { - assert.Fail(t, "inner byte sequence wasn't found") } } + +func toEncodedTags(t *testing.T, tags ident.Tags) []byte { + pool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), nil) + pool.Init() + encoder := pool.Get() + defer encoder.Finalize() + + require.NoError(t, encoder.Encode(ident.NewTagsIterator(tags))) + data, ok := encoder.Data() + require.True(t, ok) + return append([]byte(nil), data.Bytes()...) +} diff --git a/src/x/serialize/decoder.go b/src/x/serialize/decoder.go index 9d1b5d9dce..574b4c1109 100644 --- a/src/x/serialize/decoder.go +++ b/src/x/serialize/decoder.go @@ -29,7 +29,9 @@ import ( ) var ( - errIncorrectHeader = errors.New("header magic number does not match expected value") + // ErrIncorrectHeader is an error when encoded tag byte sequence doesn't start with + // an expected magic number. + ErrIncorrectHeader = errors.New("header magic number does not match expected value") errInvalidByteStreamIDDecoding = errors.New("internal error, invalid byte stream while decoding ID") errInvalidByteStreamUintDecoding = errors.New("internal error, invalid byte stream while decoding uint") ) @@ -78,8 +80,8 @@ func (d *decoder) Reset(b checked.Bytes) { return } - if header != headerMagicNumber { - d.err = errIncorrectHeader + if header != HeaderMagicNumber { + d.err = ErrIncorrectHeader return } @@ -129,7 +131,7 @@ func (d *decoder) decodeTag() error { // safe to call Bytes() as d.current.Name has inc'd a ref if len(d.currentTagName.Bytes()) == 0 { d.releaseCurrent() - return errEmptyTagNameLiteral + return ErrEmptyTagNameLiteral } if err := d.decodeIDInto(d.currentTagValue); err != nil { diff --git a/src/x/serialize/decoder_fast.go b/src/x/serialize/decoder_fast.go index e9846ac8d1..064f99d4c1 100644 --- a/src/x/serialize/decoder_fast.go +++ b/src/x/serialize/decoder_fast.go @@ -37,22 +37,22 @@ func TagValueFromEncodedTagsFast( "encoded tags too short: size=%d, need=%d", total, 4) } - header := byteOrder.Uint16(encodedTags[:2]) + header := ByteOrder.Uint16(encodedTags[:2]) encodedTags = encodedTags[2:] - if header != headerMagicNumber { - return nil, false, errIncorrectHeader + if header != HeaderMagicNumber { + return nil, false, ErrIncorrectHeader } - length := int(byteOrder.Uint16(encodedTags[:2])) + length := int(ByteOrder.Uint16(encodedTags[:2])) encodedTags = encodedTags[2:] for i := 0; i < length; i++ { if len(encodedTags) < 2 { return nil, false, fmt.Errorf("missing size for tag name: index=%d", i) } - numBytesName := int(byteOrder.Uint16(encodedTags[:2])) + numBytesName := int(ByteOrder.Uint16(encodedTags[:2])) if numBytesName == 0 { - return nil, false, errEmptyTagNameLiteral + return nil, false, ErrEmptyTagNameLiteral } encodedTags = encodedTags[2:] @@ -63,7 +63,7 @@ func TagValueFromEncodedTagsFast( return nil, false, fmt.Errorf("missing size for tag value: index=%d", i) } - numBytesValue := int(byteOrder.Uint16(encodedTags[:2])) + numBytesValue := int(ByteOrder.Uint16(encodedTags[:2])) encodedTags = encodedTags[2:] bytesValue := encodedTags[:numBytesValue] diff --git a/src/x/serialize/encoder.go b/src/x/serialize/encoder.go index a682a639ee..19bd884a25 100644 --- a/src/x/serialize/encoder.go +++ b/src/x/serialize/encoder.go @@ -51,18 +51,20 @@ import ( */ var ( - byteOrder binary.ByteOrder = binary.LittleEndian + // ByteOrder is the byte order used for encoding tags into a byte sequence. + ByteOrder binary.ByteOrder = binary.LittleEndian headerMagicBytes = make([]byte, 2) ) func init() { - encodeUInt16(headerMagicNumber, headerMagicBytes) + encodeUInt16(HeaderMagicNumber, headerMagicBytes) } var ( - errTagEncoderInUse = errors.New("encoder already in use") - errTagLiteralTooLong = errors.New("literal is too long") - errEmptyTagNameLiteral = xerrors.NewInvalidParamsError(errors.New("tag name cannot be empty")) + errTagEncoderInUse = errors.New("encoder already in use") + errTagLiteralTooLong = errors.New("literal is too long") + // ErrEmptyTagNameLiteral is an error when encoded tag name is empty. + ErrEmptyTagNameLiteral = xerrors.NewInvalidParamsError(errors.New("tag name cannot be empty")) ) type newCheckedBytesFn func([]byte, checked.BytesOptions) checked.Bytes @@ -166,7 +168,7 @@ func (e *encoder) Finalize() { func (e *encoder) encodeTag(t ident.Tag) error { if len(t.Name.Bytes()) == 0 { - return errEmptyTagNameLiteral + return ErrEmptyTagNameLiteral } if err := e.encodeID(t.Name); err != nil { @@ -204,10 +206,10 @@ func (e *encoder) encodeUInt16(v uint16) []byte { } func encodeUInt16(v uint16, dest []byte) []byte { - byteOrder.PutUint16(dest, v) + ByteOrder.PutUint16(dest, v) return dest } func decodeUInt16(b []byte) uint16 { - return byteOrder.Uint16(b) + return ByteOrder.Uint16(b) } diff --git a/src/x/serialize/types.go b/src/x/serialize/types.go index a694ae7dc7..e3b31e6e7f 100644 --- a/src/x/serialize/types.go +++ b/src/x/serialize/types.go @@ -27,10 +27,10 @@ import ( "github.com/m3db/m3/src/x/ident" ) -var ( - // headerMagicNumber is an internal header used to denote the beginning of +const ( + // HeaderMagicNumber is an internal header used to denote the beginning of // an encoded stream. - headerMagicNumber uint16 = 10101 + HeaderMagicNumber uint16 = 10101 ) // TagEncoder encodes provided Tag iterators.