diff --git a/Makefile b/Makefile index 294e8aea64..852c3b5e08 100644 --- a/Makefile +++ b/Makefile @@ -76,7 +76,8 @@ TOOLS := \ clone_fileset \ dtest \ verify_commitlogs \ - verify_index_files + verify_index_files \ + carbon_load .PHONY: setup setup: diff --git a/docs/integrations/graphite.md b/docs/integrations/graphite.md index ea3b84b989..f5e0df8c91 100644 --- a/docs/integrations/graphite.md +++ b/docs/integrations/graphite.md @@ -135,4 +135,4 @@ This will make the carbon ingestion emit logs for every step that is taking. *No - p95 - p99 - p999 -- p9999 \ No newline at end of file +- p9999 diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index a4c7e63256..272802ae1a 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -11,6 +11,14 @@ services: - "./m3dbnode.yml:/etc/m3dbnode/m3dbnode.yml" environment: - M3DB_HOST_ID=m3db_seed + expose: + - "9002" + - "9003" + - "9004" + ports: + - "0.0.0.0:9002:9002" + - "0.0.0.0:9003:9003" + - "0.0.0.0:9004:9004" m3db_data01: networks: - backend diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go index afdf3d4993..2862767b5a 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -41,6 +41,7 @@ import ( "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/log" + "github.com/m3db/m3x/pool" m3xserver "github.com/m3db/m3x/server" xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" @@ -48,6 +49,12 @@ import ( "github.com/uber-go/tally" ) +const ( + maxResourcePoolNameSize = 1024 + maxPooledTagsSize = 16 + defaultResourcePoolSize = 4096 +) + var ( // Used for parsing carbon names into tags. carbonSeparatorByte = byte('.') @@ -63,7 +70,6 @@ type Options struct { Debug bool InstrumentOptions instrument.Options WorkerPool xsync.PooledWorkerPool - Timeout time.Duration } // CarbonIngesterRules contains the carbon ingestion rules. @@ -106,6 +112,21 @@ func NewIngester( return nil, err } + poolOpts := pool.NewObjectPoolOptions(). + SetInstrumentOptions(opts.InstrumentOptions). + SetRefillLowWatermark(0). + SetRefillHighWatermark(0). + SetSize(defaultResourcePoolSize) + + resourcePool := pool.NewObjectPool(poolOpts) + resourcePool.Init(func() interface{} { + return &lineResources{ + name: make([]byte, 0, maxResourcePoolNameSize), + datapoints: make([]ts.Datapoint, 1, 1), + tags: make([]models.Tag, 0, maxPooledTagsSize), + } + }) + return &ingester{ downsamplerAndWriter: downsamplerAndWriter, opts: opts, @@ -115,6 +136,8 @@ func NewIngester( opts.InstrumentOptions.MetricsScope()), rules: compiledRules, + + lineResourcesPool: resourcePool, }, nil } @@ -126,10 +149,16 @@ type ingester struct { tagOpts models.TagOptions rules []ruleAndRegex + + lineResourcesPool pool.ObjectPool } func (i *ingester) Handle(conn net.Conn) { var ( + // Interfaces require a context be passed, but M3DB client already has timeouts + // built in and allocating a new context each time is expensive so we just pass + // the same context always and rely on M3DB client timeouts. + ctx = context.Background() wg = sync.WaitGroup{} s = carbon.NewScanner(conn, i.opts.InstrumentOptions) logger = i.opts.InstrumentOptions.Logger() @@ -138,16 +167,20 @@ func (i *ingester) Handle(conn net.Conn) { logger.Debug("handling new carbon ingestion connection") for s.Scan() { name, timestamp, value := s.Metric() - // TODO(rartoul): Pool. + + resources := i.getLineResources() // Copy name since scanner bytes are recycled. - name = append([]byte(nil), name...) + resources.name = append(resources.name[:0], name...) wg.Add(1) i.opts.WorkerPool.Go(func() { - ok := i.write(name, timestamp, value) + ok := i.write(ctx, resources, timestamp, value) if ok { i.metrics.success.Inc(1) } + // The contract is that after the DownsamplerAndWriter returns, any resources + // that it needed to hold onto have already been copied. + i.putLineResources(resources) wg.Done() }) @@ -166,7 +199,12 @@ func (i *ingester) Handle(conn net.Conn) { // Don't close the connection, that is the server's responsibility. } -func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { +func (i *ingester) write( + ctx context.Context, + resources *lineResources, + timestamp time.Time, + value float64, +) bool { downsampleAndStoragePolicies := ingest.WriteOptions{ // Set both of these overrides to true to indicate that only the exact mapping // rules and storage policies that we provide should be used and that all @@ -175,8 +213,9 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { DownsampleOverride: true, WriteOverride: true, } + for _, rule := range i.rules { - if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(name) { + if rule.rule.Pattern == graphite.MatchAllPattern || rule.regexp.Match(resources.name) { // Each rule should only have either mapping rules or storage policies so // one of these should be a no-op. downsampleAndStoragePolicies.DownsampleMappingRules = rule.mappingRules @@ -185,7 +224,7 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { if i.opts.Debug { i.logger.Infof( "carbon metric: %s matched by pattern: %s with mapping rules: %#v and storage policies: %#v", - string(name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies) + string(resources.name), string(rule.rule.Pattern), rule.mappingRules, rule.storagePolicies) } // Break because we only want to apply one rule per metric based on which // ever one matches first. @@ -197,44 +236,32 @@ func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { len(downsampleAndStoragePolicies.WriteStoragePolicies) == 0 { // Nothing to do if none of the policies matched. if i.opts.Debug { - i.logger.Infof("no rules matched carbon metric: %s, skipping", string(name)) + i.logger.Infof("no rules matched carbon metric: %s, skipping", string(resources.name)) } return false } - datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}} - // TODO(rartoul): Pool. - tags, err := GenerateTagsFromName(name, i.tagOpts) + resources.datapoints[0] = ts.Datapoint{Timestamp: timestamp, Value: value} + tags, err := GenerateTagsFromNameIntoSlice(resources.name, i.tagOpts, resources.tags) if err != nil { i.logger.Errorf("err generating tags from carbon name: %s, err: %s", - string(name), err) + string(resources.name), err) i.metrics.malformed.Inc(1) return false } - var ( - ctx = context.Background() - cleanup func() - ) - if i.opts.Timeout > 0 { - ctx, cleanup = context.WithTimeout(ctx, i.opts.Timeout) - } - err = i.downsamplerAndWriter.Write( - ctx, tags, datapoints, xtime.Second, downsampleAndStoragePolicies) - if cleanup != nil { - cleanup() - } + ctx, tags, resources.datapoints, xtime.Second, downsampleAndStoragePolicies) if err != nil { i.logger.Errorf("err writing carbon metric: %s, err: %s", - string(name), err) + string(resources.name), err) i.metrics.err.Inc(1) return false } if i.opts.Debug { - i.logger.Infof("successfully wrote carbon metric: %s", string(name)) + i.logger.Infof("successfully wrote carbon metric: %s", string(resources.name)) } return true } @@ -267,15 +294,36 @@ type carbonIngesterMetrics struct { func GenerateTagsFromName( name []byte, opts models.TagOptions, +) (models.Tags, error) { + return generateTagsFromName(name, opts, nil) +} + +// GenerateTagsFromNameIntoSlice does the same thing as GenerateTagsFromName except +// it allows the caller to provide the slice into which the tags are appended. +func GenerateTagsFromNameIntoSlice( + name []byte, + opts models.TagOptions, + tags []models.Tag, +) (models.Tags, error) { + return generateTagsFromName(name, opts, tags) +} + +func generateTagsFromName( + name []byte, + opts models.TagOptions, + tags []models.Tag, ) (models.Tags, error) { if len(name) == 0 { return models.EmptyTags(), errCannotGenerateTagsFromEmptyName } - var ( - numTags = bytes.Count(name, carbonSeparatorBytes) + 1 - tags = make([]models.Tag, 0, numTags) - ) + numTags := bytes.Count(name, carbonSeparatorBytes) + 1 + + if cap(tags) >= numTags { + tags = tags[:0] + } else { + tags = make([]models.Tag, 0, numTags) + } startIdx := 0 tagNum := 0 @@ -311,7 +359,7 @@ func GenerateTagsFromName( }) } - return models.NewTags(numTags, opts).AddTags(tags), nil + return models.Tags{Opts: opts, Tags: tags}, nil } // Compile all the carbon ingestion rules into regexp so that we can @@ -356,6 +404,38 @@ func compileRules(rules CarbonIngesterRules) ([]ruleAndRegex, error) { return compiledRules, nil } +func (i *ingester) getLineResources() *lineResources { + return i.lineResourcesPool.Get().(*lineResources) +} + +func (i *ingester) putLineResources(l *lineResources) { + tooLargeForPool := cap(l.name) > maxResourcePoolNameSize || + len(l.datapoints) > 1 || // We always write one datapoint at a time. + cap(l.datapoints) > 1 || + cap(l.tags) > maxPooledTagsSize + + if tooLargeForPool { + return + } + + // Reset. + l.name = l.name[:0] + l.datapoints[0] = ts.Datapoint{} + for i := range l.tags { + // Free pointers. + l.tags[i] = models.Tag{} + } + l.tags = l.tags[:0] + + i.lineResourcesPool.Put(l) +} + +type lineResources struct { + name []byte + datapoints []ts.Datapoint + tags []models.Tag +} + type ruleAndRegex struct { rule config.CarbonIngesterRuleConfiguration regexp *regexp.Regexp diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go index 2bd2e22887..9b80be7c00 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go @@ -206,8 +206,9 @@ func TestIngesterHandleConn(t *testing.T) { overrides ingest.WriteOptions, ) interface{} { lock.Lock() + // Clone tags because they (and their underlying bytes) are pooled. found = append(found, testMetric{ - tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value}) + tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value}) // Make 1 in 10 writes fail to test those paths. returnErr := idx%10 == 0 @@ -245,8 +246,9 @@ func TestIngesterHonorsPatterns(t *testing.T) { writeOpts ingest.WriteOptions, ) interface{} { lock.Lock() + // Clone tags because they (and their underlying bytes) are pooled. found = append(found, testMetric{ - tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value}) + tags: tags.Clone(), timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value}) lock.Unlock() // Use panic's instead of require/assert because those don't behave properly when the assertion diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index bb2baf3a0a..bfe5a531d9 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" xerrors "github.com/m3db/m3x/errors" + xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" ) @@ -77,16 +78,19 @@ type WriteOptions struct { type downsamplerAndWriter struct { store storage.Storage downsampler downsample.Downsampler + workerPool xsync.PooledWorkerPool } // NewDownsamplerAndWriter creates a new downsampler and writer. func NewDownsamplerAndWriter( store storage.Storage, downsampler downsample.Downsampler, + workerPool xsync.PooledWorkerPool, ) DownsamplerAndWriter { return &downsamplerAndWriter{ store: store, downsampler: downsampler, + workerPool: workerPool, } } @@ -124,6 +128,8 @@ func (d *downsamplerAndWriter) maybeWriteDownsampler( shouldDownsample = downsamplerExists && (useDefaultMappingRules || downsampleOverride) ) if shouldDownsample { + // TODO(rartoul): MetricsAppender has a Finalize() method, but it does not actually reuse many + // resources. If we can pool this properly we can get a nice speedup. appender, err := d.downsampler.NewMetricsAppender() if err != nil { return err @@ -195,9 +201,10 @@ func (d *downsamplerAndWriter) maybeWriteStorage( ) for _, p := range overrides.WriteStoragePolicies { + p := p // Capture for goroutine. + wg.Add(1) - // TODO(rartoul): Benchmark using a pooled worker pool here. - go func(p policy.StoragePolicy) { + d.workerPool.Go(func() { err := d.store.Write(ctx, &storage.WriteQuery{ Tags: tags, Datapoints: datapoints, @@ -215,7 +222,7 @@ func (d *downsamplerAndWriter) maybeWriteStorage( errLock.Unlock() } wg.Done() - }(p) + }) } wg.Wait() @@ -244,7 +251,7 @@ func (d *downsamplerAndWriter) WriteBatch( for iter.Next() { wg.Add(1) tags, datapoints, unit := iter.Current() - go func() { + d.workerPool.Go(func() { err := d.store.Write(ctx, &storage.WriteQuery{ Tags: tags, Datapoints: datapoints, @@ -257,7 +264,7 @@ func (d *downsamplerAndWriter) WriteBatch( addError(err) } wg.Done() - }() + }) } } diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index 2b512208b8..da958afea8 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -22,6 +22,7 @@ package ingest import ( "context" + "fmt" "testing" "time" @@ -34,6 +35,7 @@ import ( testm3 "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3x/ident" + xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" "github.com/golang/mock/gomock" @@ -41,6 +43,9 @@ import ( ) var ( + // Created by init(). + testWorkerPool xsync.PooledWorkerPool + testTags1 = models.NewTags(3, nil).AddTags( []models.Tag{ { @@ -409,7 +414,7 @@ func newTestDownsamplerAndWriter( ) (*downsamplerAndWriter, *downsample.MockDownsampler, *client.MockSession) { storage, session := testm3.NewStorageAndSession(t, ctrl) downsampler := downsample.NewMockDownsampler(ctrl) - return NewDownsamplerAndWriter(storage, downsampler).(*downsamplerAndWriter), downsampler, session + return NewDownsamplerAndWriter(storage, downsampler, testWorkerPool).(*downsamplerAndWriter), downsampler, session } func newTestDownsamplerAndWriterWithAggregatedNamespace( @@ -420,5 +425,20 @@ func newTestDownsamplerAndWriterWithAggregatedNamespace( storage, session := testm3.NewStorageAndSessionWithAggregatedNamespaces( t, ctrl, aggregatedNamespaces) downsampler := downsample.NewMockDownsampler(ctrl) - return NewDownsamplerAndWriter(storage, downsampler).(*downsamplerAndWriter), downsampler, session + return NewDownsamplerAndWriter(storage, downsampler, testWorkerPool).(*downsamplerAndWriter), downsampler, session +} + +func init() { + var err error + testWorkerPool, err = xsync.NewPooledWorkerPool( + 16, + xsync.NewPooledWorkerPoolOptions(). + SetGrowOnDemand(true), + ) + + if err != nil { + panic(fmt.Sprintf("unable to create pooled worker pool: %v", err)) + } + + testWorkerPool.Init() } diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 2274b07d56..81278bd9a5 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -156,7 +156,6 @@ type CarbonIngesterConfiguration struct { Debug bool `yaml:"debug"` ListenAddress string `yaml:"listenAddress"` MaxConcurrency int `yaml:"maxConcurrency"` - WriteTimeout *time.Duration `yaml:"writeTimeout"` Rules []CarbonIngesterRuleConfiguration `yaml:"rules"` } @@ -243,16 +242,6 @@ type CarbonIngesterStoragePolicyConfiguration struct { Retention time.Duration `yaml:"retention" validate:"nonzero"` } -// WriteTimeoutOrDefault returns the configured value for the write timeout, -// if set, or the default value otherwise. -func (c *CarbonIngesterConfiguration) WriteTimeoutOrDefault() time.Duration { - if c.WriteTimeout != nil { - return *c.WriteTimeout - } - - return defaultCarbonIngesterWriteTimeout -} - // LocalConfiguration is the local embedded configuration if running // coordinator embedded in the DB. type LocalConfiguration struct { diff --git a/src/cmd/tools/carbon_load/README.md b/src/cmd/tools/carbon_load/README.md new file mode 100644 index 0000000000..1fb0961519 --- /dev/null +++ b/src/cmd/tools/carbon_load/README.md @@ -0,0 +1,29 @@ +# carbon_load + +`carbon_load` is a tool to generate load on a carbon ingestion port. + +# Usage +``` +$ git clone git@github.com:m3db/m3.git +$ make carbon_load +$ ./bin/clone_fileset -h + +# example usage +# ./carbon_load \ + -target="0.0.0.0:7204" \ + -numWorkers="20" \ + -cardinality=1000 \ + -name="local.random" \ + -qps=1000 \ + -duration="30s" +``` + +# Benchmarking Carbon Ingestion + +The easiest way to benchmark carbon ingestion end-to-end is to: + +1. Use `start_m3` to get an entire M3 stack running, including an aggregated namespace and a carbon ingestion port. + +2. Run this tool with the target pointed at the exposed carbon ingestion port on the `start_m3` stack. + +3. Use `pprof` to take CPU and heap profiles of `m3coordinator` while the `carbon_load` tool is running. diff --git a/src/cmd/tools/carbon_load/main/main.go b/src/cmd/tools/carbon_load/main/main.go new file mode 100644 index 0000000000..21c454bc1c --- /dev/null +++ b/src/cmd/tools/carbon_load/main/main.go @@ -0,0 +1,146 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// carbon_load is a tool for load testing carbon ingestion. +package main + +import ( + "flag" + "fmt" + "math/rand" + "net" + "os" + "sync" + "time" +) + +const ( + metricFmt = "%s %.4f %v\n" +) + +func startWorker( + target string, + metrics []string, + targetQPS int, + closeCh chan struct{}, +) (numSuccess, numError int) { + var ( + rng = rand.New(rand.NewSource(time.Now().UnixNano())) + idealTimeBetweenSend = time.Duration(int(time.Second) / targetQPS) + ) + + for { + select { + case <-closeCh: + return numSuccess, numError + default: + } + + // Establish initial connection and reestablish if we get disconnected. + conn, err := net.Dial("tcp", target) + if err != nil { + fmt.Printf("dial error: %v, reconnecting in one second\n", err) + time.Sleep(1 * time.Second) + continue + } + + for err == nil { + select { + case <-closeCh: + return numSuccess, numError + default: + } + + var ( + now = time.Now() + randIdx = rng.Int63n(int64(len(metrics))) + metric = metrics[randIdx] + ) + + _, err = fmt.Fprintf(conn, metricFmt, metric, rng.Float32(), int64(now.Unix())) + numSuccess++ + + timeSinceSend := time.Now().Sub(now) + if timeSinceSend < idealTimeBetweenSend { + time.Sleep(idealTimeBetweenSend - timeSinceSend) + } + } + + numError++ + } +} + +func main() { + var ( + target = flag.String("target", "0.0.0.0:7204", "Target host port") + numWorkers = flag.Int("numWorkers", 20, "Number of concurrent connections") + numMetrics = flag.Int("cardinality", 1000, "Cardinality of metrics") + metric = flag.String("name", "local.random", "The metric you send will be [name].[0..1024]") + targetQPS = flag.Int("qps", 1000, "Target QPS") + duration = flag.Duration("duration", 10*time.Second, "Duration of test") + ) + + flag.Parse() + if len(*target) == 0 { + flag.Usage() + os.Exit(-1) + } + + metrics := make([]string, 0, *numMetrics) + for i := 0; i < *numMetrics; i++ { + metrics = append(metrics, fmt.Sprintf("%s.%d", *metric, i)) + } + + var ( + targetQPSPerWorker = *targetQPS / *numWorkers + wg sync.WaitGroup + lock sync.Mutex + numSuccess int + numError int + closeCh = make(chan struct{}) + ) + for n := 0; n < *numWorkers; n++ { + wg.Add(1) + go func() { + nSuccess, nError := startWorker( + *target, metrics, targetQPSPerWorker, closeCh) + + lock.Lock() + numSuccess += nSuccess + numError += nError + lock.Unlock() + + wg.Done() + }() + } + + start := time.Now() + go func() { + time.Sleep(*duration) + + fmt.Println("beginning shutdown...") + close(closeCh) + }() + + wg.Wait() + + durationOfBench := time.Now().Sub(start).Seconds() + fmt.Println("average QPS: ", float64(numSuccess)/durationOfBench) +} diff --git a/src/m3ninx/index/segment/mem/options.go b/src/m3ninx/index/segment/mem/options.go index 448230526b..2fca52520e 100644 --- a/src/m3ninx/index/segment/mem/options.go +++ b/src/m3ninx/index/segment/mem/options.go @@ -34,9 +34,9 @@ const ( defaultBytesArrayPoolCapacity = 1024 // This pool is used in a single-threaded manner. defaultBytesArrayPoolSize = 1 - // 2^24 * 16 bytes (byte slice pointer) * 2 (Golang G.C) ~= + // 2<<23 * 16 bytes (byte slice pointer) * 2 (Golang G.C) ~= // 0.5 GiB max memory usage. - defaultBytesArrayPoolMaxArrayCapacity = 2 ^ 24 + defaultBytesArrayPoolMaxArrayCapacity = 2 << 23 ) // Options is a collection of knobs for an in-memory segment. diff --git a/src/metrics/carbon/parser.go b/src/metrics/carbon/parser.go index 764f3587a2..17b438d7a0 100644 --- a/src/metrics/carbon/parser.go +++ b/src/metrics/carbon/parser.go @@ -44,6 +44,9 @@ const ( intBitSize = 64 floatBitSize = 64 intBase = 10 + + initScannerBufferSize = 2 << 15 // ~ 65KiB + maxScannerBufferSize = 2 << 17 // ~ 0.25iB ) var ( @@ -237,6 +240,13 @@ type Scanner struct { // NewScanner creates a new carbon scanner. func NewScanner(r io.Reader, iOpts instrument.Options) *Scanner { s := bufio.NewScanner(r) + + // Force the scanner to use a large buffer upfront to reduce the number of + // syscalls that occur if the io.Reader is backed by something that requires + // I/O (like a TCP connection). + // TODO(rartoul): Make this configurable. + s.Buffer(make([]byte, 0, initScannerBufferSize), maxScannerBufferSize) + s.Split(bufio.ScanLines) return &Scanner{scanner: s, iOpts: iOpts} } diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 8304b176d1..555c82edeb 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -22,6 +22,7 @@ package httpd import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "testing" @@ -37,6 +38,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/util/logging" + xsync "github.com/m3db/m3x/sync" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -44,12 +46,17 @@ import ( "github.com/uber-go/tally" ) +var ( + // Created by init(). + testWorkerPool xsync.PooledWorkerPool +) + func makeTagOptions() models.TagOptions { return models.NewTagOptions().SetMetricName([]byte("some_name")) } func setupHandler(store storage.Storage) (*Handler, error) { - downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil) + downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil, testWorkerPool) return NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(store, tally.NewTestScope("test", nil)), nil, nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) } @@ -213,3 +220,18 @@ func TestCORSMiddleware(t *testing.T) { assert.Equal(t, "hello!", res.Body.String()) assert.Equal(t, "*", res.Header().Get("Access-Control-Allow-Origin")) } + +func init() { + var err error + testWorkerPool, err = xsync.NewPooledWorkerPool( + 16, + xsync.NewPooledWorkerPoolOptions(). + SetGrowOnDemand(true), + ) + + if err != nil { + panic(fmt.Sprintf("unable to create pooled worker pool: %v", err)) + } + + testWorkerPool.Init() +} diff --git a/src/query/server/server.go b/src/query/server/server.go index dc7d426a07..ee5397a3a2 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -77,7 +77,8 @@ var ( Retention: 2 * 24 * time.Hour, } - defaultCarbonIngesterWorkerPoolSize = 1024 + defaultDownsamplerAndWriterWorkerPoolSize = 1024 + defaultCarbonIngesterWorkerPoolSize = 1024 ) type cleanupFn func() error @@ -219,7 +220,11 @@ func Run(runOpts RunOptions) { engine := executor.NewEngine(backendStorage, scope.SubScope("engine")) - downsamplerAndWriter := ingest.NewDownsamplerAndWriter(backendStorage, downsampler) + downsamplerAndWriter, err := newDownsamplerAndWriter(backendStorage, downsampler) + if err != nil { + logger.Fatal("unable to create new downsampler and writer", zap.Error(err)) + } + handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig, scope) if err != nil { @@ -802,7 +807,6 @@ func startCarbonIngestion( Debug: ingesterCfg.Debug, InstrumentOptions: carbonIOpts, WorkerPool: workerPool, - Timeout: ingesterCfg.WriteTimeoutOrDefault(), }) if err != nil { logger.Fatal("unable to create carbon ingester", zap.Error(err)) @@ -826,3 +830,19 @@ func startCarbonIngestion( } logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) } + +func newDownsamplerAndWriter(storage storage.Storage, downsampler downsample.Downsampler) (ingest.DownsamplerAndWriter, error) { + // Make sure the downsampler and writer gets its own PooledWorkerPool and that its not shared with any other + // codepaths because PooledWorkerPools can deadlock if used recursively. + downAndWriterWorkerPoolOpts := xsync.NewPooledWorkerPoolOptions(). + SetGrowOnDemand(true). + SetKillWorkerProbability(0.001) + downAndWriteWorkerPool, err := xsync.NewPooledWorkerPool( + defaultDownsamplerAndWriterWorkerPoolSize, downAndWriterWorkerPoolOpts) + if err != nil { + return nil, err + } + downAndWriteWorkerPool.Init() + + return ingest.NewDownsamplerAndWriter(storage, downsampler, downAndWriteWorkerPool), nil +}