Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve carbon ingestion performance #1327

Merged
merged 20 commits into from
Jan 30, 2019
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ TOOLS := \
clone_fileset \
dtest \
verify_commitlogs \
verify_index_files
verify_index_files \
carbon_load

.PHONY: setup
setup:
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ This will make the carbon ingestion emit logs for every step that is taking. *No
- p95
- p99
- p999
- p9999
- p9999
8 changes: 8 additions & 0 deletions scripts/development/m3_stack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ services:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
environment:
- M3DB_HOST_ID=m3db_seed
expose:
- "9002"
- "9003"
- "9004"
ports:
- "0.0.0.0:9002:9002"
- "0.0.0.0:9003:9003"
- "0.0.0.0:9004:9004"
richardartoul marked this conversation as resolved.
Show resolved Hide resolved
m3db_data01:
networks:
- backend
Expand Down
142 changes: 111 additions & 31 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ import (
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/log"
"github.com/m3db/m3x/pool"
m3xserver "github.com/m3db/m3x/server"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/uber-go/tally"
)

const (
maxResourcePoolNameSize = 1024
maxPooledTagsSize = 16
defaultResourcePoolSize = 4096
)

var (
// Used for parsing carbon names into tags.
carbonSeparatorByte = byte('.')
Expand All @@ -63,7 +70,6 @@ type Options struct {
Debug bool
InstrumentOptions instrument.Options
WorkerPool xsync.PooledWorkerPool
Timeout time.Duration
}

// CarbonIngesterRules contains the carbon ingestion rules.
Expand Down Expand Up @@ -106,6 +112,21 @@ func NewIngester(
return nil, err
}

poolOpts := pool.NewObjectPoolOptions().
SetInstrumentOptions(opts.InstrumentOptions).
SetRefillLowWatermark(0).
SetRefillHighWatermark(0).
SetSize(defaultResourcePoolSize)

resourcePool := pool.NewObjectPool(poolOpts)
resourcePool.Init(func() interface{} {
return &lineResources{
name: make([]byte, 0, maxResourcePoolNameSize),
datapoints: make([]ts.Datapoint, 1, 1),
tags: make([]models.Tag, 0, maxPooledTagsSize),
}
})

return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
Expand All @@ -115,6 +136,8 @@ func NewIngester(
opts.InstrumentOptions.MetricsScope()),

rules: compiledRules,

lineResourcesPool: resourcePool,
}, nil
}

Expand All @@ -126,10 +149,16 @@ type ingester struct {
tagOpts models.TagOptions

rules []ruleAndRegex

lineResourcesPool pool.ObjectPool
}

func (i *ingester) Handle(conn net.Conn) {
var (
// Interfaces require a context be passed, but M3DB client already has timeouts
// built in and allocating a new context each time is expensive so we just pass
// the same context always and rely on M3DB client timeouts.
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
Expand All @@ -138,16 +167,20 @@ func (i *ingester) Handle(conn net.Conn) {
logger.Debug("handling new carbon ingestion connection")
for s.Scan() {
name, timestamp, value := s.Metric()
// TODO(rartoul): Pool.

resources := i.getLineResources()
// Copy name since scanner bytes are recycled.
name = append([]byte(nil), name...)
resources.name = append(resources.name[:0], name...)

wg.Add(1)
i.opts.WorkerPool.Go(func() {
ok := i.write(name, timestamp, value)
ok := i.write(ctx, resources, timestamp, value)
if ok {
i.metrics.success.Inc(1)
}
// The contract is that after the DownsamplerAndWriter returns, any resources
// that it needed to hold onto have already been copied.
i.putLineResources(resources)
wg.Done()
})

Expand All @@ -166,7 +199,12 @@ func (i *ingester) Handle(conn net.Conn) {
// Don't close the connection, that is the server's responsibility.
}

func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
func (i *ingester) write(
ctx context.Context,
resources *lineResources,
timestamp time.Time,
value float64,
) bool {
downsampleAndStoragePolicies := ingest.WriteOptions{
// Set both of these overrides to true to indicate that only the exact mapping
// rules and storage policies that we provide should be used and that all
Expand All @@ -175,8 +213,9 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
DownsampleOverride: true,
WriteOverride: true,
}

for _, rule := range i.rules {
if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(name) {
if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(resources.name) {
// Each rule should only have either mapping rules or storage policies so
// one of these should be a no-op.
downsampleAndStoragePolicies.DownsampleMappingRules = rule.mappingRules
Expand All @@ -185,7 +224,7 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
if i.opts.Debug {
i.logger.Infof(
"carbon metric: %s matched by pattern: %s with mapping rules: %#v and storage policies: %#v",
string(name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies)
string(resources.name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies)
}
// Break because we only want to apply one rule per metric based on which
// ever one matches first.
Expand All @@ -197,44 +236,32 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
len(downsampleAndStoragePolicies.WriteStoragePolicies) == 0 {
// Nothing to do if none of the policies matched.
if i.opts.Debug {
i.logger.Infof("no rules matched carbon metric: %s, skipping", string(name))
i.logger.Infof("no rules matched carbon metric: %s, skipping", string(resources.name))
}
return false
}

datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
// TODO(rartoul): Pool.
tags, err := GenerateTagsFromName(name, i.tagOpts)
resources.datapoints[0] = ts.Datapoint{Timestamp: timestamp, Value: value}
tags, err := GenerateTagsFromNameIntoSlice(resources.name, i.tagOpts, resources.tags)
if err != nil {
i.logger.Errorf("err generating tags from carbon name: %s, err: %s",
string(name), err)
string(resources.name), err)
i.metrics.malformed.Inc(1)
return false
}

var (
ctx = context.Background()
cleanup func()
)
if i.opts.Timeout > 0 {
ctx, cleanup = context.WithTimeout(ctx, i.opts.Timeout)
}

err = i.downsamplerAndWriter.Write(
ctx, tags, datapoints, xtime.Second, downsampleAndStoragePolicies)
if cleanup != nil {
cleanup()
}
ctx, tags, resources.datapoints, xtime.Second, downsampleAndStoragePolicies)

if err != nil {
i.logger.Errorf("err writing carbon metric: %s, err: %s",
string(name), err)
string(resources.name), err)
i.metrics.err.Inc(1)
return false
}

if i.opts.Debug {
i.logger.Infof("successfully wrote carbon metric: %s", string(name))
i.logger.Infof("successfully wrote carbon metric: %s", string(resources.name))
}
return true
}
Expand Down Expand Up @@ -267,15 +294,36 @@ type carbonIngesterMetrics struct {
func GenerateTagsFromName(
name []byte,
opts models.TagOptions,
) (models.Tags, error) {
return generateTagsFromName(name, opts, nil)
}

// GenerateTagsFromNameIntoSlice does the same thing as GenerateTagsFromName except
// it allows the caller to provide the slice into which the tags are appended.
func GenerateTagsFromNameIntoSlice(
name []byte,
opts models.TagOptions,
tags []models.Tag,
) (models.Tags, error) {
return generateTagsFromName(name, opts, tags)
}

func generateTagsFromName(
name []byte,
opts models.TagOptions,
tags []models.Tag,
) (models.Tags, error) {
if len(name) == 0 {
return models.EmptyTags(), errCannotGenerateTagsFromEmptyName
}

var (
numTags = bytes.Count(name, carbonSeparatorBytes) + 1
tags = make([]models.Tag, 0, numTags)
)
numTags := bytes.Count(name, carbonSeparatorBytes) + 1

if cap(tags) >= numTags {
tags = tags[:0]
} else {
tags = make([]models.Tag, 0, numTags)
}

startIdx := 0
tagNum := 0
Expand Down Expand Up @@ -311,7 +359,7 @@ func GenerateTagsFromName(
})
}

return models.NewTags(numTags, opts).AddTags(tags), nil
return models.Tags{Opts: opts, Tags: tags}, nil
}

// Compile all the carbon ingestion rules into regexp so that we can
Expand Down Expand Up @@ -356,6 +404,38 @@ func compileRules(rules CarbonIngesterRules) ([]ruleAndRegex, error) {
return compiledRules, nil
}

func (i *ingester) getLineResources() *lineResources {
return i.lineResourcesPool.Get().(*lineResources)
}

func (i *ingester) putLineResources(l *lineResources) {
tooLargeForPool := cap(l.name) > maxResourcePoolNameSize ||
len(l.datapoints) > 1 || // We always write one datapoint at a time.
cap(l.datapoints) > 1 ||
cap(l.tags) > maxPooledTagsSize

if tooLargeForPool {
return
}

// Reset.
l.name = l.name[:0]
l.datapoints[0] = ts.Datapoint{}
for i := range l.tags {
// Free pointers.
l.tags[i] = models.Tag{}
}
l.tags = l.tags[:0]
richardartoul marked this conversation as resolved.
Show resolved Hide resolved

i.lineResourcesPool.Put(l)
}

type lineResources struct {
name []byte
datapoints []ts.Datapoint
tags []models.Tag
}

type ruleAndRegex struct {
rule config.CarbonIngesterRuleConfiguration
regexp *regexp.Regexp
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ func TestIngesterHandleConn(t *testing.T) {
overrides ingest.WriteOptions,
) interface{} {
lock.Lock()
// Clone tags because they (and their underlying bytes) are pooled.
found = append(found, testMetric{
tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})

// Make 1 in 10 writes fail to test those paths.
returnErr := idx%10 == 0
Expand Down Expand Up @@ -245,8 +246,9 @@ func TestIngesterHonorsPatterns(t *testing.T) {
writeOpts ingest.WriteOptions,
) interface{} {
lock.Lock()
// Clone tags because they (and their underlying bytes) are pooled.
found = append(found, testMetric{
tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
lock.Unlock()

// Use panic's instead of require/assert because those don't behave properly when the assertion
Expand Down
Loading