Skip to content

Commit

Permalink
Improve carbon ingestion performance (#1327)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul authored Jan 30, 2019
1 parent 0b9891b commit 876ae85
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 59 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ TOOLS := \
clone_fileset \
dtest \
verify_commitlogs \
verify_index_files
verify_index_files \
carbon_load

.PHONY: setup
setup:
Expand Down
2 changes: 1 addition & 1 deletion docs/integrations/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ This will make the carbon ingestion emit logs for every step that is taking. *No
- p95
- p99
- p999
- p9999
- p9999
8 changes: 8 additions & 0 deletions scripts/development/m3_stack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ services:
- "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml"
environment:
- M3DB_HOST_ID=m3db_seed
expose:
- "9002"
- "9003"
- "9004"
ports:
- "0.0.0.0:9002:9002"
- "0.0.0.0:9003:9003"
- "0.0.0.0:9004:9004"
m3db_data01:
networks:
- backend
Expand Down
142 changes: 111 additions & 31 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,20 @@ import (
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/log"
"github.com/m3db/m3x/pool"
m3xserver "github.com/m3db/m3x/server"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/uber-go/tally"
)

const (
maxResourcePoolNameSize = 1024
maxPooledTagsSize = 16
defaultResourcePoolSize = 4096
)

var (
// Used for parsing carbon names into tags.
carbonSeparatorByte = byte('.')
Expand All @@ -63,7 +70,6 @@ type Options struct {
Debug bool
InstrumentOptions instrument.Options
WorkerPool xsync.PooledWorkerPool
Timeout time.Duration
}

// CarbonIngesterRules contains the carbon ingestion rules.
Expand Down Expand Up @@ -106,6 +112,21 @@ func NewIngester(
return nil, err
}

poolOpts := pool.NewObjectPoolOptions().
SetInstrumentOptions(opts.InstrumentOptions).
SetRefillLowWatermark(0).
SetRefillHighWatermark(0).
SetSize(defaultResourcePoolSize)

resourcePool := pool.NewObjectPool(poolOpts)
resourcePool.Init(func() interface{} {
return &lineResources{
name: make([]byte, 0, maxResourcePoolNameSize),
datapoints: make([]ts.Datapoint, 1, 1),
tags: make([]models.Tag, 0, maxPooledTagsSize),
}
})

return &ingester{
downsamplerAndWriter: downsamplerAndWriter,
opts: opts,
Expand All @@ -115,6 +136,8 @@ func NewIngester(
opts.InstrumentOptions.MetricsScope()),

rules: compiledRules,

lineResourcesPool: resourcePool,
}, nil
}

Expand All @@ -126,10 +149,16 @@ type ingester struct {
tagOpts models.TagOptions

rules []ruleAndRegex

lineResourcesPool pool.ObjectPool
}

func (i *ingester) Handle(conn net.Conn) {
var (
// Interfaces require a context be passed, but M3DB client already has timeouts
// built in and allocating a new context each time is expensive so we just pass
// the same context always and rely on M3DB client timeouts.
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
Expand All @@ -138,16 +167,20 @@ func (i *ingester) Handle(conn net.Conn) {
logger.Debug("handling new carbon ingestion connection")
for s.Scan() {
name, timestamp, value := s.Metric()
// TODO(rartoul): Pool.

resources := i.getLineResources()
// Copy name since scanner bytes are recycled.
name = append([]byte(nil), name...)
resources.name = append(resources.name[:0], name...)

wg.Add(1)
i.opts.WorkerPool.Go(func() {
ok := i.write(name, timestamp, value)
ok := i.write(ctx, resources, timestamp, value)
if ok {
i.metrics.success.Inc(1)
}
// The contract is that after the DownsamplerAndWriter returns, any resources
// that it needed to hold onto have already been copied.
i.putLineResources(resources)
wg.Done()
})

Expand All @@ -166,7 +199,12 @@ func (i *ingester) Handle(conn net.Conn) {
// Don't close the connection, that is the server's responsibility.
}

func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
func (i *ingester) write(
ctx context.Context,
resources *lineResources,
timestamp time.Time,
value float64,
) bool {
downsampleAndStoragePolicies := ingest.WriteOptions{
// Set both of these overrides to true to indicate that only the exact mapping
// rules and storage policies that we provide should be used and that all
Expand All @@ -175,8 +213,9 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
DownsampleOverride: true,
WriteOverride: true,
}

for _, rule := range i.rules {
if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(name) {
if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(resources.name) {
// Each rule should only have either mapping rules or storage policies so
// one of these should be a no-op.
downsampleAndStoragePolicies.DownsampleMappingRules = rule.mappingRules
Expand All @@ -185,7 +224,7 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
if i.opts.Debug {
i.logger.Infof(
"carbon metric: %s matched by pattern: %s with mapping rules: %#v and storage policies: %#v",
string(name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies)
string(resources.name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies)
}
// Break because we only want to apply one rule per metric based on which
// ever one matches first.
Expand All @@ -197,44 +236,32 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool {
len(downsampleAndStoragePolicies.WriteStoragePolicies) == 0 {
// Nothing to do if none of the policies matched.
if i.opts.Debug {
i.logger.Infof("no rules matched carbon metric: %s, skipping", string(name))
i.logger.Infof("no rules matched carbon metric: %s, skipping", string(resources.name))
}
return false
}

datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}}
// TODO(rartoul): Pool.
tags, err := GenerateTagsFromName(name, i.tagOpts)
resources.datapoints[0] = ts.Datapoint{Timestamp: timestamp, Value: value}
tags, err := GenerateTagsFromNameIntoSlice(resources.name, i.tagOpts, resources.tags)
if err != nil {
i.logger.Errorf("err generating tags from carbon name: %s, err: %s",
string(name), err)
string(resources.name), err)
i.metrics.malformed.Inc(1)
return false
}

var (
ctx = context.Background()
cleanup func()
)
if i.opts.Timeout > 0 {
ctx, cleanup = context.WithTimeout(ctx, i.opts.Timeout)
}

err = i.downsamplerAndWriter.Write(
ctx, tags, datapoints, xtime.Second, downsampleAndStoragePolicies)
if cleanup != nil {
cleanup()
}
ctx, tags, resources.datapoints, xtime.Second, downsampleAndStoragePolicies)

if err != nil {
i.logger.Errorf("err writing carbon metric: %s, err: %s",
string(name), err)
string(resources.name), err)
i.metrics.err.Inc(1)
return false
}

if i.opts.Debug {
i.logger.Infof("successfully wrote carbon metric: %s", string(name))
i.logger.Infof("successfully wrote carbon metric: %s", string(resources.name))
}
return true
}
Expand Down Expand Up @@ -267,15 +294,36 @@ type carbonIngesterMetrics struct {
func GenerateTagsFromName(
name []byte,
opts models.TagOptions,
) (models.Tags, error) {
return generateTagsFromName(name, opts, nil)
}

// GenerateTagsFromNameIntoSlice does the same thing as GenerateTagsFromName except
// it allows the caller to provide the slice into which the tags are appended.
func GenerateTagsFromNameIntoSlice(
name []byte,
opts models.TagOptions,
tags []models.Tag,
) (models.Tags, error) {
return generateTagsFromName(name, opts, tags)
}

func generateTagsFromName(
name []byte,
opts models.TagOptions,
tags []models.Tag,
) (models.Tags, error) {
if len(name) == 0 {
return models.EmptyTags(), errCannotGenerateTagsFromEmptyName
}

var (
numTags = bytes.Count(name, carbonSeparatorBytes) + 1
tags = make([]models.Tag, 0, numTags)
)
numTags := bytes.Count(name, carbonSeparatorBytes) + 1

if cap(tags) >= numTags {
tags = tags[:0]
} else {
tags = make([]models.Tag, 0, numTags)
}

startIdx := 0
tagNum := 0
Expand Down Expand Up @@ -311,7 +359,7 @@ func GenerateTagsFromName(
})
}

return models.NewTags(numTags, opts).AddTags(tags), nil
return models.Tags{Opts: opts, Tags: tags}, nil
}

// Compile all the carbon ingestion rules into regexp so that we can
Expand Down Expand Up @@ -356,6 +404,38 @@ func compileRules(rules CarbonIngesterRules) ([]ruleAndRegex, error) {
return compiledRules, nil
}

func (i *ingester) getLineResources() *lineResources {
return i.lineResourcesPool.Get().(*lineResources)
}

func (i *ingester) putLineResources(l *lineResources) {
tooLargeForPool := cap(l.name) > maxResourcePoolNameSize ||
len(l.datapoints) > 1 || // We always write one datapoint at a time.
cap(l.datapoints) > 1 ||
cap(l.tags) > maxPooledTagsSize

if tooLargeForPool {
return
}

// Reset.
l.name = l.name[:0]
l.datapoints[0] = ts.Datapoint{}
for i := range l.tags {
// Free pointers.
l.tags[i] = models.Tag{}
}
l.tags = l.tags[:0]

i.lineResourcesPool.Put(l)
}

type lineResources struct {
name []byte
datapoints []ts.Datapoint
tags []models.Tag
}

type ruleAndRegex struct {
rule config.CarbonIngesterRuleConfiguration
regexp *regexp.Regexp
Expand Down
6 changes: 4 additions & 2 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ func TestIngesterHandleConn(t *testing.T) {
overrides ingest.WriteOptions,
) interface{} {
lock.Lock()
// Clone tags because they (and their underlying bytes) are pooled.
found = append(found, testMetric{
tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})

// Make 1 in 10 writes fail to test those paths.
returnErr := idx%10 == 0
Expand Down Expand Up @@ -245,8 +246,9 @@ func TestIngesterHonorsPatterns(t *testing.T) {
writeOpts ingest.WriteOptions,
) interface{} {
lock.Lock()
// Clone tags because they (and their underlying bytes) are pooled.
found = append(found, testMetric{
tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value})
lock.Unlock()

// Use panic's instead of require/assert because those don't behave properly when the assertion
Expand Down
Loading

0 comments on commit 876ae85

Please sign in to comment.