Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Use already encoded tags when writing time series to commit log #1898

Merged
merged 18 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import:
version: ^0.8

- package: github.com/apache/thrift
version: 0.9.3-pool-read-binary-2
version: 0.9.3-pool-read-binary-3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just merge this into master of our branch? Pretty confusing that we're pinned to a branch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not opposed to it, problem is master is way ahead of where we are at, so we'd have to reset master all the way back to 0.9.3 on our fork which would be weird...

subpackages:
- lib/go/thrift
repo: https://github.com/m3db/thrift
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ db:

pooling:
blockAllocSize: 16
thriftBytesPoolMaxAllocSize: 2048
type: simple
seriesPool:
size: 5242880
Expand Down Expand Up @@ -445,6 +446,7 @@ func TestConfiguration(t *testing.T) {
replication: null
pooling:
blockAllocSize: 16
thriftBytesPoolMaxAllocSize: 2048
type: simple
bytesPool:
buckets:
Expand Down
18 changes: 16 additions & 2 deletions src/cmd/services/m3dbnode/config/pooling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ const (
)

const (
defaultMaxFinalizerCapacity = 4
defaultBlockAllocSize = 16
defaultMaxFinalizerCapacity = 4
defaultBlockAllocSize = 16
defaultThriftBytesPoolMaxAllocSize = 1024
)

type poolPolicyDefault struct {
Expand Down Expand Up @@ -252,6 +253,9 @@ type PoolingPolicy struct {
// The initial alloc size for a block.
BlockAllocSize *int `yaml:"blockAllocSize"`

// The thrift bytes pool max bytes slice allocation for a single binary field.
ThriftBytesPoolMaxAllocSize *int `yaml:"thriftBytesPoolMaxAllocSize"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not really a max is it right? its always this size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The capacity is always this size, but the actual length/size is determined by the length of the bytes being copied.

Since we suffix it with "Alloc" I'm not opposed to just calling it AllocSize and dropping the Max.


// The general pool type (currently only supported: simple).
Type *PoolingType `yaml:"type"`

Expand Down Expand Up @@ -418,6 +422,16 @@ func (p *PoolingPolicy) BlockAllocSizeOrDefault() int {
return defaultBlockAllocSize
}

// ThriftBytesPoolMaxAllocSizeOrDefault returns the configured thrift bytes pool
// max alloc size if provided, or a default value otherwise.
func (p *PoolingPolicy) ThriftBytesPoolMaxAllocSizeOrDefault() int {
if p.ThriftBytesPoolMaxAllocSize != nil {
return *p.ThriftBytesPoolMaxAllocSize
}

return defaultThriftBytesPoolMaxAllocSize
}

// TypeOrDefault returns the configured pooling type if provided, or a default
// value otherwise.
func (p *PoolingPolicy) TypeOrDefault() PoolingType {
Expand Down
10 changes: 10 additions & 0 deletions src/cmd/services/m3dbnode/config/pooling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ func TestContextPoolMaxFinalizerCapacityOrDefault(t *testing.T) {
cpp.MaxFinalizerCapacity = 10
require.Equal(t, 10, cpp.MaxFinalizerCapacityOrDefault())
}

func TestPoolingPolicyThriftBytesPoolMaxAllocSizeOrDefault(t *testing.T) {
policy := PoolingPolicy{}
require.Equal(t, defaultThriftBytesPoolMaxAllocSize,
policy.ThriftBytesPoolMaxAllocSizeOrDefault())

value := 42
policy.ThriftBytesPoolMaxAllocSize = &value
require.Equal(t, 42, policy.ThriftBytesPoolMaxAllocSizeOrDefault())
}
24 changes: 20 additions & 4 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque
if err != nil {
return convert.ToRPCError(err)
}

// The lifecycle of the annotations is more involved than the rest of the data
// so we set the annotation pool put method as the finalization function and
// let the database take care of returning them to the pool.
Expand Down Expand Up @@ -1247,9 +1248,12 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB
if err != nil {
return convert.ToRPCError(err)
}
// The lifecycle of the annotations is more involved than the rest of the data
// so we set the annotation pool put method as the finalization function and
// let the database take care of returning them to the pool.

// The lifecycle of the encoded tags and annotations is more involved than
// the rest of the data so we set the encoded tags and annotation pool put
// calls as finalization functions and let the database take care of
// returning them to the pool.
batchWriter.SetFinalizeEncodedTagsFn(finalizeEncodedTagsFn)
batchWriter.SetFinalizeAnnotationFn(finalizeAnnotationFn)

for i, elem := range req.Elements {
Expand Down Expand Up @@ -1279,6 +1283,7 @@ func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedB
i,
seriesID,
dec,
elem.EncodedTags,
xtime.FromNormalizedTime(elem.Datapoint.Timestamp, d),
elem.Datapoint.Value,
unit,
Expand Down Expand Up @@ -1764,7 +1769,11 @@ func (r *writeBatchPooledReq) Finalize() {
if r.writeTaggedReq != nil {
for _, elem := range r.writeTaggedReq.Elements {
apachethrift.BytesPoolPut(elem.ID)
apachethrift.BytesPoolPut(elem.EncodedTags)
// Ownership of the encoded tagts has been transferred to the BatchWriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tags -> tags

// so they will get returned the pool automatically by the commitlog once
// it finishes writing them to disk via the finalization function that
// gets set on the WriteBatch.

// See comment above about not finalizing annotations here.
}
r.writeTaggedReq = nil
Expand Down Expand Up @@ -1874,6 +1883,13 @@ func (p *writeBatchPooledReqPool) Put(v *writeBatchPooledReq) {
p.pool.Put(v)
}

// finalizeEncodedTagsFn implements ts.FinalizeEncodedTagsFn because
// apachethrift.BytesPoolPut(b) returns a bool but ts.FinalizeEncodedTagsFn
// does not.
func finalizeEncodedTagsFn(b []byte) {
apachethrift.BytesPoolPut(b)
}

// finalizeAnnotationFn implements ts.FinalizeAnnotationFn because
// apachethrift.BytesPoolPut(b) returns a bool but ts.FinalizeAnnotationFn
// does not.
Expand Down
45 changes: 45 additions & 0 deletions src/dbnode/persist/fs/commitlog/read_write_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"

"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -114,6 +117,7 @@ func TestCommitLogReadWrite(t *testing.T) {
write := seriesWrites.writes[seriesWrites.readPosition]

require.Equal(t, write.series.ID.String(), series.ID.String())
require.True(t, write.series.Tags.Equal(series.Tags))
require.Equal(t, write.series.Namespace.String(), series.Namespace.String())
require.Equal(t, write.series.Shard, series.Shard)
require.Equal(t, write.datapoint.Value, datapoint.Value)
Expand Down Expand Up @@ -555,22 +559,63 @@ func (w generatedWrite) String() string {

// generator for commit log write
func genWrite() gopter.Gen {
testTagEncodingPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))
testTagEncodingPool.Init()

return gopter.CombineGens(
gen.Identifier(),
gen.TimeRange(time.Now(), 15*time.Minute),
gen.Float64(),
gen.Identifier(),
gen.UInt32(),
gen.Identifier(),
gen.Identifier(),
gen.Identifier(),
gen.Identifier(),
gen.Bool(),
).Map(func(val []interface{}) generatedWrite {
id := val[0].(string)
t := val[1].(time.Time)
v := val[2].(float64)
ns := val[3].(string)
shard := val[4].(uint32)
tags := map[string]string{
val[5].(string): val[6].(string),
val[7].(string): val[8].(string),
}
encodeTags := val[9].(bool)

var (
seriesTags ident.Tags
seriesEncodedTags []byte
)
for k, v := range tags {
seriesTags.Append(ident.Tag{
Name: ident.StringID(k),
Value: ident.StringID(v),
})
}

if encodeTags {
encoder := testTagEncodingPool.Get()
if err := encoder.Encode(ident.NewTagsIterator(seriesTags)); err != nil {
panic(err)
}
data, ok := encoder.Data()
if !ok {
panic("could not encode tags")
}

// Set encoded tags so the "fast" path is activated.
seriesEncodedTags = data.Bytes()
}

return generatedWrite{
series: ts.Series{
ID: ident.StringID(id),
Tags: seriesTags,
EncodedTags: seriesEncodedTags,
Namespace: ident.StringID(ns),
Shard: shard,
UniqueIndex: uniqueID(ns, id),
Expand Down
18 changes: 9 additions & 9 deletions src/dbnode/persist/fs/commitlog/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/ident"
xos "github.com/m3db/m3/src/x/os"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"
)

Expand Down Expand Up @@ -203,13 +203,13 @@ func (w *writer) Write(

seen := w.seen.Test(uint(series.UniqueIndex))
if !seen {
var (
tags = series.Tags
encodedTags []byte
)

if tags.Values() != nil {
w.tagSliceIter.Reset(tags)
var encodedTags []byte
if series.EncodedTags != nil {
// If already serialized use the serialized tags.
encodedTags = series.EncodedTags
} else if series.Tags.Values() != nil {
// Otherwise serialize the tags.
w.tagSliceIter.Reset(series.Tags)
w.tagEncoder.Reset()
err := w.tagEncoder.Encode(w.tagSliceIter)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/coreos/etcd/embed"
opentracing "github.com/opentracing/opentracing-go"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -1147,6 +1148,9 @@ func withEncodingAndPoolingOptions(
iopts := opts.InstrumentOptions()
scope := opts.InstrumentOptions().MetricsScope()

// Set the max bytes pool byte slice alloc size for the thrift pooling.
apachethrift.SetMaxBytesPoolAlloc(policy.ThriftBytesPoolMaxAllocSizeOrDefault())

bytesPoolOpts := pool.NewObjectPoolOptions().
SetInstrumentOptions(iopts.SetMetricsScope(scope.SubScope("bytes-pool")))
checkedBytesPoolOpts := bytesPoolOpts.
Expand Down
9 changes: 5 additions & 4 deletions src/dbnode/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ type Database interface {
// or WriteTaggedBatch.
//
// Note that when using the BatchWriter the caller owns the lifecycle of the series
// IDs and tag iterators (I.E) if they're being pooled its the callers responsibility
// to return them to the appropriate pool, but the annotations are owned by the
// IDs if they're being pooled its the callers responsibility to return them to the
// appropriate pool, but the encoded tags and annotations are owned by the
// ts.WriteBatch itself and will be finalized when the entire ts.WriteBatch is finalized
// due to their lifecycle being more complicated. Callers can still control the pooling
// of the annotations by using the SetFinalizeAnnotationFn on the WriteBatch itself.
// due to their lifecycle being more complicated.
// Callers can still control the pooling of the encoded tags and annotations by using
// the SetFinalizeEncodedTagsFn and SetFinalizeAnnotationFn on the WriteBatch itself.
BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error)

// WriteBatch is the same as Write, but in batch.
Expand Down
20 changes: 17 additions & 3 deletions src/dbnode/ts/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

// FinalizeEncodedTagsFn is a function that will be called for each encoded tags once
// the WriteBatch itself is finalized.
type FinalizeEncodedTagsFn func(b []byte)

// FinalizeAnnotationFn is a function that will be called for each annotation once
// the WriteBatch itself is finalized.
type FinalizeAnnotationFn func(b []byte)
Expand Down Expand Up @@ -72,9 +76,13 @@ type Series struct {
// ID is the series identifier.
ID ident.ID

// Tags are the series tags.
// Tags is the series tags.
Tags ident.Tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still used anywhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's used on the read side and it was going to be very difficult to remove unfortunately.


// EncodedTags are the series encoded tags, if set then call sites can
// avoid needing to encoded the tags from the series tags provided.
EncodedTags EncodedTags

// Shard is the shard the series belongs to.
Shard uint32
}
Expand All @@ -90,6 +98,9 @@ func (d Datapoint) Equal(x Datapoint) bool {
return d.Timestamp.Equal(x.Timestamp) && d.Value == x.Value
}

// EncodedTags represents the encoded tags for the series.
type EncodedTags []byte

// Annotation represents information used to annotate datapoints.
type Annotation []byte

Expand Down Expand Up @@ -120,17 +131,20 @@ type BatchWriter interface {
value float64,
unit xtime.Unit,
annotation []byte,
)
) error

AddTagged(
originalIndex int,
id ident.ID,
tags ident.TagIterator,
encodedTags EncodedTags,
timestamp time.Time,
value float64,
unit xtime.Unit,
annotation []byte,
)
) error

SetFinalizeEncodedTagsFn(f FinalizeEncodedTagsFn)

SetFinalizeAnnotationFn(f FinalizeAnnotationFn)
}
Loading