-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Direct conversion of encoded tags to doc.Metadata #3087
Merged
Merged
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
ac80c69
add benchmark
vpranckaitis f0370d8
speed up the search of tag bytes in series ID
vpranckaitis 5e77e4a
update tests
vpranckaitis fe8ee23
PR comments
vpranckaitis 4221b84
add tests that check if bytes tag slices reuse data from series ID
vpranckaitis 4927f6f
update benchmark
vpranckaitis 6bb6d9b
PR comments
vpranckaitis 6d007e3
un-base64 series ID samples
vpranckaitis 9114f90
Merge branch 'master' into vilius/tags_to_doc_performace_improvement
linasm a879a2a
[dbnode] Direct conversion of encoded tags to doc.Metadata
vpranckaitis db2023e
extract decoding steps into smaller functions
vpranckaitis f7c6bcd
thinner encoded tag iterator and benchmarks
vpranckaitis f769ce0
merge DecodeTagName() and DecodeTagValue() methods
vpranckaitis d3a8c38
function that uses indexing instead of sliding slice
vpranckaitis c4296b6
benchmark TagValueFromEncodedTagsFast
vpranckaitis 9b1ad47
revert decoder_fast.go changes
vpranckaitis ba77217
revert decoder_fast_iter.go changes
vpranckaitis 3e949af
remove convert.FromSeriesIDAndEncodedTagsIndex()
vpranckaitis e0cad90
export and use variables from serialize package
vpranckaitis f77ccef
add tests
vpranckaitis 45378ef
Apply suggestions from code review
vpranckaitis a5436e8
PR comments
vpranckaitis 8b725f0
use ident.BytesID instead of ident.ID
vpranckaitis f4f96c0
Merge branch 'master' into vilius/convert_encoded_tags_to_doc
vpranckaitis d782a61
Merge branch 'master' into vilius/convert_encoded_tags_to_doc
vpranckaitis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,32 @@ package convert | |
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"errors" | ||
"fmt" | ||
"unicode/utf8" | ||
|
||
"github.com/m3db/m3/src/dbnode/ts" | ||
"github.com/m3db/m3/src/m3ninx/doc" | ||
"github.com/m3db/m3/src/metrics/metric/id" | ||
"github.com/m3db/m3/src/query/graphite/graphite" | ||
"github.com/m3db/m3/src/x/ident" | ||
"github.com/m3db/m3/src/x/pool" | ||
"github.com/m3db/m3/src/x/serialize" | ||
) | ||
|
||
const ( | ||
// NB: this assumes that series ID has a format: | ||
// {tag1="value1",tag2="value2",...} | ||
// | ||
// Thus firstTagBytesPosition points to the 't' immediately after curly brace '{' | ||
firstTagBytesPosition int = 1 | ||
// distanceBetweenTagNameAndValue corresponds to '="' in series ID that separates tag name from | ||
// it's value | ||
distanceBetweenTagNameAndValue int = 2 | ||
// distanceBetweenTagValueAndNextName corresponds to '",' in series ID that separates | ||
// tag's value from the following tag name | ||
distanceBetweenTagValueAndNextName int = 2 | ||
) | ||
|
||
var ( | ||
|
@@ -108,22 +126,19 @@ func ValidateSeriesTag(tag ident.Tag) error { | |
|
||
// FromSeriesIDAndTags converts the provided series id+tags into a document. | ||
func FromSeriesIDAndTags(id ident.ID, tags ident.Tags) (doc.Metadata, error) { | ||
clonedID := clone(id) | ||
fields := make([]doc.Field, 0, len(tags.Values())) | ||
var ( | ||
clonedID = clone(id.Bytes()) | ||
fields = make([]doc.Field, 0, len(tags.Values())) | ||
expectedStart = firstTagBytesPosition | ||
) | ||
for _, tag := range tags.Values() { | ||
nameBytes, valueBytes := tag.Name.Bytes(), tag.Value.Bytes() | ||
|
||
var clonedName, clonedValue []byte | ||
if idx := bytes.Index(clonedID, nameBytes); idx != -1 { | ||
clonedName = clonedID[idx : idx+len(nameBytes)] | ||
} else { | ||
clonedName = append([]byte(nil), nameBytes...) | ||
} | ||
if idx := bytes.Index(clonedID, valueBytes); idx != -1 { | ||
clonedValue = clonedID[idx : idx+len(valueBytes)] | ||
} else { | ||
clonedValue = append([]byte(nil), valueBytes...) | ||
} | ||
clonedName, expectedStart = findSliceOrClone(clonedID, nameBytes, expectedStart, | ||
distanceBetweenTagNameAndValue) | ||
clonedValue, expectedStart = findSliceOrClone(clonedID, valueBytes, expectedStart, | ||
distanceBetweenTagValueAndNextName) | ||
|
||
fields = append(fields, doc.Field{ | ||
Name: clonedName, | ||
|
@@ -143,23 +158,174 @@ func FromSeriesIDAndTags(id ident.ID, tags ident.Tags) (doc.Metadata, error) { | |
|
||
// FromSeriesIDAndTagIter converts the provided series id+tags into a document. | ||
func FromSeriesIDAndTagIter(id ident.ID, tags ident.TagIterator) (doc.Metadata, error) { | ||
clonedID := clone(id) | ||
fields := make([]doc.Field, 0, tags.Remaining()) | ||
var ( | ||
clonedID = clone(id.Bytes()) | ||
fields = make([]doc.Field, 0, tags.Remaining()) | ||
expectedStart = firstTagBytesPosition | ||
) | ||
for tags.Next() { | ||
tag := tags.Current() | ||
nameBytes, valueBytes := tag.Name.Bytes(), tag.Value.Bytes() | ||
|
||
var clonedName, clonedValue []byte | ||
if idx := bytes.Index(clonedID, nameBytes); idx != -1 { | ||
clonedName = clonedID[idx : idx+len(nameBytes)] | ||
} else { | ||
clonedName = append([]byte(nil), nameBytes...) | ||
clonedName, expectedStart = findSliceOrClone(clonedID, nameBytes, expectedStart, | ||
distanceBetweenTagNameAndValue) | ||
clonedValue, expectedStart = findSliceOrClone(clonedID, valueBytes, expectedStart, | ||
distanceBetweenTagValueAndNextName) | ||
|
||
fields = append(fields, doc.Field{ | ||
Name: clonedName, | ||
Value: clonedValue, | ||
}) | ||
} | ||
if err := tags.Err(); err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
|
||
d := doc.Metadata{ | ||
ID: clonedID, | ||
Fields: fields, | ||
} | ||
if err := Validate(d); err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
return d, nil | ||
} | ||
|
||
func FromSeriesIDAndEncodedTags(id ident.ID, encodedTags ts.EncodedTags) (doc.Metadata, error) { | ||
total := len(encodedTags) | ||
if total < 4 { | ||
return doc.Metadata{}, fmt.Errorf( | ||
"encoded tags too short: size=%d, need=%d", total, 4) | ||
} | ||
|
||
var ( | ||
byteOrder = binary.LittleEndian | ||
headerMagicNumber uint16 = 10101 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make it a global |
||
) | ||
|
||
header := byteOrder.Uint16(encodedTags[:2]) | ||
encodedTags = encodedTags[2:] | ||
if header != headerMagicNumber { | ||
return doc.Metadata{}, errors.New("") | ||
} | ||
|
||
length := int(byteOrder.Uint16(encodedTags[:2])) | ||
encodedTags = encodedTags[2:] | ||
|
||
var ( | ||
clonedID = clone(id.Bytes()) | ||
fields = make([]doc.Field, 0, length) | ||
expectedStart = firstTagBytesPosition | ||
) | ||
|
||
for i := 0; i < length; i++ { | ||
if len(encodedTags) < 2 { | ||
return doc.Metadata{}, fmt.Errorf("missing size for tag name: index=%d", i) | ||
} | ||
if idx := bytes.Index(clonedID, valueBytes); idx != -1 { | ||
clonedValue = clonedID[idx : idx+len(valueBytes)] | ||
} else { | ||
clonedValue = append([]byte(nil), valueBytes...) | ||
numBytesName := int(byteOrder.Uint16(encodedTags[:2])) | ||
if numBytesName == 0 { | ||
return doc.Metadata{}, errors.New("") | ||
} | ||
encodedTags = encodedTags[2:] | ||
|
||
bytesName := encodedTags[:numBytesName] | ||
encodedTags = encodedTags[numBytesName:] | ||
|
||
if len(encodedTags) < 2 { | ||
return doc.Metadata{}, fmt.Errorf("missing size for tag value: index=%d", i) | ||
} | ||
|
||
numBytesValue := int(byteOrder.Uint16(encodedTags[:2])) | ||
encodedTags = encodedTags[2:] | ||
|
||
bytesValue := encodedTags[:numBytesValue] | ||
encodedTags = encodedTags[numBytesValue:] | ||
|
||
var clonedName, clonedValue []byte | ||
clonedName, expectedStart = findSliceOrClone(clonedID, bytesName, expectedStart, | ||
distanceBetweenTagNameAndValue) | ||
clonedValue, expectedStart = findSliceOrClone(clonedID, bytesValue, expectedStart, | ||
distanceBetweenTagValueAndNextName) | ||
|
||
fields = append(fields, doc.Field{ | ||
Name: clonedName, | ||
Value: clonedValue, | ||
}) | ||
} | ||
|
||
d := doc.Metadata{ | ||
ID: clonedID, | ||
Fields: fields, | ||
} | ||
if err := Validate(d); err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
return d, nil | ||
} | ||
|
||
func FromSeriesIDAndEncodedTags2(id ident.ID, encodedTags ts.EncodedTags) (doc.Metadata, error) { | ||
var ( | ||
length int | ||
err error | ||
) | ||
encodedTags, length, err = serialize.DecodeHeader(encodedTags) | ||
if err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
|
||
var ( | ||
clonedID = clone(id.Bytes()) | ||
fields = make([]doc.Field, 0, length) | ||
expectedStart = firstTagBytesPosition | ||
) | ||
|
||
for i := 0; i < length; i++ { | ||
var nameBytes, valueBytes []byte | ||
encodedTags, nameBytes, valueBytes, err = serialize.DecodeTag(encodedTags) | ||
if err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
|
||
var clonedName, clonedValue []byte | ||
clonedName, expectedStart = findSliceOrClone(clonedID, nameBytes, expectedStart, | ||
distanceBetweenTagNameAndValue) | ||
clonedValue, expectedStart = findSliceOrClone(clonedID, valueBytes, expectedStart, | ||
distanceBetweenTagValueAndNextName) | ||
|
||
fields = append(fields, doc.Field{ | ||
Name: clonedName, | ||
Value: clonedValue, | ||
}) | ||
} | ||
|
||
if err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
d := doc.Metadata{ | ||
ID: clonedID, | ||
Fields: fields, | ||
} | ||
if err := Validate(d); err != nil { | ||
return doc.Metadata{}, err | ||
} | ||
return d, nil | ||
} | ||
|
||
func FromSeriesIDAndFastTagIter(id ident.ID, tags id.SortedTagIterator) (doc.Metadata, error) { | ||
var ( | ||
clonedID = clone(id.Bytes()) | ||
fields = make([]doc.Field, 0, tags.NumTags()) | ||
expectedStart = firstTagBytesPosition | ||
) | ||
for tags.Next() { | ||
nameBytes, valueBytes := tags.Current() | ||
|
||
var clonedName, clonedValue []byte | ||
clonedName, expectedStart = findSliceOrClone(clonedID, nameBytes, expectedStart, | ||
distanceBetweenTagNameAndValue) | ||
clonedValue, expectedStart = findSliceOrClone(clonedID, valueBytes, expectedStart, | ||
distanceBetweenTagValueAndNextName) | ||
|
||
fields = append(fields, doc.Field{ | ||
Name: clonedName, | ||
|
@@ -180,6 +346,19 @@ func FromSeriesIDAndTagIter(id ident.ID, tags ident.TagIterator) (doc.Metadata, | |
return d, nil | ||
} | ||
|
||
func findSliceOrClone(id, tag []byte, expectedStart, nextPositionDistance int) ([]byte, int) { //nolint:unparam | ||
n := len(tag) | ||
expectedEnd := expectedStart + n | ||
if expectedStart != -1 && expectedEnd <= len(id) && | ||
bytes.Equal(id[expectedStart:expectedEnd], tag) { | ||
return id[expectedStart:expectedEnd], expectedEnd + nextPositionDistance | ||
} else if idx := bytes.Index(id, tag); idx != -1 { | ||
return id[idx : idx+n], expectedEnd + nextPositionDistance | ||
} else { | ||
return clone(tag), -1 | ||
} | ||
} | ||
|
||
// TagsFromTagsIter returns an ident.Tags from a TagIterator. It also tries | ||
// to re-use bytes from the seriesID if they're also present in the tags | ||
// instead of re-allocating them. This requires that the ident.Tags that is | ||
|
@@ -252,8 +431,7 @@ func TagsFromTagsIter( | |
// NB(prateek): we take an independent copy of the bytes underlying | ||
// any ids provided, as we need to maintain the lifecycle of the indexed | ||
// bytes separately from the rest of the storage subsystem. | ||
func clone(id ident.ID) []byte { | ||
original := id.Bytes() | ||
func clone(original []byte) []byte { | ||
clone := make([]byte, len(original)) | ||
copy(clone, original) | ||
return clone | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change
ident.ID
toident.BytesID
. I've just realized that using an interface here will force an allocation on some of the code paths where we will be using this function.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8b725f0