diff --git a/Makefile b/Makefile index d234934883..bea68d9818 100644 --- a/Makefile +++ b/Makefile @@ -200,6 +200,8 @@ docker-integration-test: @./scripts/docker-integration-tests/setup.sh @./scripts/docker-integration-tests/simple/test.sh @./scripts/docker-integration-tests/prometheus/test.sh + # TODO(rartoul): Re-enable once the query P.R lands and we can fix this test. + # @./scripts/docker-integration-tests/carbon/test.sh .PHONY: site-build site-build: diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index 21fbbc3fb7..a4c7e63256 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -48,10 +48,12 @@ services: expose: - "7201" - "7203" + - "7204" - "7507" ports: - "0.0.0.0:7201:7201" - "0.0.0.0:7203:7203" + - "0.0.0.0:7204:7204" - "0.0.0.0:7507:7507" networks: - backend diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index cbaeddc5df..93a631b7ad 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -67,3 +67,6 @@ ingest: jitter: true handler: protobufEnabled: false + +carbon: + enabled: true diff --git a/scripts/development/m3_stack/start_m3.sh b/scripts/development/m3_stack/start_m3.sh index 0171586ac1..7cd4394a3d 100755 --- a/scripts/development/m3_stack/start_m3.sh +++ b/scripts/development/m3_stack/start_m3.sh @@ -9,7 +9,7 @@ if [[ "$FORCE_BUILD" = true ]] ; then DOCKER_ARGS="--build -d --renew-anon-volumes" fi -echo "Bringing up nodes in the backgorund with docker compose, remember to run ./stop.sh when done" +echo "Bringing up nodes in the background with docker compose, remember to run ./stop.sh when done" docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01 docker-compose -f docker-compose.yml up $DOCKER_ARGS m3db_seed docker-compose -f docker-compose.yml up $DOCKER_ARGS prometheus01 diff --git a/scripts/docker-integration-tests/carbon/docker-compose.yml b/scripts/docker-integration-tests/carbon/docker-compose.yml new file mode 100644 index 0000000000..53a28f0b88 --- /dev/null +++ b/scripts/docker-integration-tests/carbon/docker-compose.yml @@ -0,0 +1,28 @@ +version: "3.5" +services: + dbnode01: + expose: + - "9000-9004" + - "2379-2380" + ports: + - "0.0.0.0:9000-9004:9000-9004" + - "0.0.0.0:2379-2380:2379-2380" + networks: + - backend + image: "m3dbnode_integration:${REVISION}" + coordinator01: + expose: + - "7201" + - "7203" + - "7204" + ports: + - "0.0.0.0:7201:7201" + - "0.0.0.0:7203:7203" + - "0.0.0.0:7204:7204" + networks: + - backend + image: "m3coordinator_integration:${REVISION}" + volumes: + - "./:/etc/m3coordinator/" +networks: + backend: diff --git a/scripts/docker-integration-tests/carbon/m3coordinator.yml b/scripts/docker-integration-tests/carbon/m3coordinator.yml new file mode 100644 index 0000000000..81e804d528 --- /dev/null +++ b/scripts/docker-integration-tests/carbon/m3coordinator.yml @@ -0,0 +1,54 @@ +listenAddress: + type: "config" + value: "0.0.0.0:7201" + +metrics: + scope: + prefix: "coordinator" + prometheus: + handlerPath: /metrics + listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved + sanitization: prometheus + samplingRate: 1.0 + extended: none + +clusters: + - namespaces: + - namespace: agg + type: aggregated + retention: 10h + resolution: 15s + - namespace: unagg + type: unaggregated + retention: 10m + client: + config: + service: + env: default_env + zone: embedded + service: m3db + cacheDir: /var/lib/m3kv + etcdClusters: + - zone: embedded + endpoints: + - dbnode01:2379 + writeConsistencyLevel: majority + readConsistencyLevel: unstrict_majority + writeTimeout: 10s + fetchTimeout: 15s + connectTimeout: 20s + writeRetry: + initialBackoff: 500ms + backoffFactor: 3 + maxRetries: 2 + jitter: true + fetchRetry: + initialBackoff: 500ms + backoffFactor: 2 + maxRetries: 3 + jitter: true + backgroundHealthCheckFailLimit: 4 + backgroundHealthCheckFailThrottleFactor: 0.5 + +carbon: + enabled: true diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh new file mode 100755 index 0000000000..0fdf580c3a --- /dev/null +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash + +set -xe + +source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh +REVISION=$(git rev-parse HEAD) +COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/carbon/docker-compose.yml +export REVISION + +echo "Run m3dbnode and m3coordinator containers" +docker-compose -f ${COMPOSE_FILE} up -d dbnode01 +docker-compose -f ${COMPOSE_FILE} up -d coordinator01 + +# think of this as a defer func() in golang +function defer { + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes +} +trap defer EXIT + +setup_single_m3db_node + +echo "Writing out a carbon metric" +echo "foo.bar.baz 1 `date +%s`" | nc 0.0.0.0 7204 + +echo "Attempting to read carbon metric back" +function read_carbon { + end=$(date +%s) + start=$(($end-300)) + RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/query_range?start=$start&end=$end&step=10&query={__graphite0__='foo',__graphite1__='bar',__graphite2__='baz'}") + echo "$RESPONSE" | jq '.data.result[0].values[][1]=="1"' | grep -q "true" + return $? +} +ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index 0e22a0d0ef..cd56d071f5 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -36,3 +36,99 @@ function retry_with_backoff { return $exitCode } + +function setup_single_m3db_node { + wait_for_db_init +} + +function wait_for_db_init { + echo "Sleeping for a bit to ensure db up" + sleep 15 # TODO Replace sleeps with logic to determine when to proceed + + echo "Adding namespace" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "agg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" + } + } + }' + + echo "Sleep until namespace is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' + + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "unagg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" + } + } + }' + + echo "Sleep until namespace is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + + echo "Placement initialization" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ + "num_shards": 64, + "replication_factor": 1, + "instances": [ + { + "id": "m3db_local", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "dbnode01:9000", + "hostname": "dbnode01", + "port": 9000 + } + ] + }' + + echo "Sleep until placement is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + + echo "Sleep until bootstrapped" + ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' + + echo "Waiting until shards are marked as available" + ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' +} + diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 2e7acd1b29..04362a8911 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -17,95 +17,7 @@ function defer { } trap defer EXIT - -echo "Sleeping for a bit to ensure db up" -sleep 15 # TODO Replace sleeps with logic to determine when to proceed - -echo "Adding namespace" -curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "agg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodNanos": 172800000000000, - "blockSizeNanos": 7200000000000, - "bufferFutureNanos": 600000000000, - "bufferPastNanos": 600000000000, - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodNanos": 300000000000 - }, - "indexOptions": { - "enabled": true, - "blockSizeNanos": 7200000000000 - } - } -}' - -echo "Sleep until namespace is init'd" -ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' - -curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "unagg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodNanos": 172800000000000, - "blockSizeNanos": 7200000000000, - "bufferFutureNanos": 600000000000, - "bufferPastNanos": 600000000000, - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodNanos": 300000000000 - }, - "indexOptions": { - "enabled": true, - "blockSizeNanos": 7200000000000 - } - } -}' - -echo "Sleep until namespace is init'd" -ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' - -echo "Placement initialization" -curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ - "num_shards": 64, - "replication_factor": 1, - "instances": [ - { - "id": "m3db_local", - "isolation_group": "rack-a", - "zone": "embedded", - "weight": 1024, - "endpoint": "dbnode01:9000", - "hostname": "dbnode01", - "port": 9000 - } - ] -}' - -echo "Sleep until placement is init'd" -ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' - -echo "Sleep until bootstrapped" -ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' - -echo "Waiting until shards are marked as available" -ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' +setup_single_m3db_node echo "Start Prometheus containers" docker-compose -f ${COMPOSE_FILE} up -d prometheus01 diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index ade8f772a8..d8776a171a 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -24,70 +24,102 @@ if [ $? -ne 0 ]; then exit 1 fi +# TODO(rartoul): Rewrite this test to use a docker-compose file like the others so that we can share all the +# DB initialization logic with the setup_single_m3db_node command in common.sh like the other files. Right now +# we can't do that because this test doesn't use the docker-compose networking so we have to specify 127.0.0.1 +# as the endpoint in the placement instead of being able to use dbnode01. echo "Sleeping for a bit to ensure db up" -sleep 5 # TODO Replace sleeps with logic to determine when to proceed - -echo "Adding namespace" -curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "default", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodNanos": 172800000000000, - "blockSizeNanos": 7200000000000, - "bufferFutureNanos": 600000000000, - "bufferPastNanos": 600000000000, - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodNanos": 300000000000 - }, - "indexOptions": { - "enabled": true, - "blockSizeNanos": 7200000000000 +sleep 15 # TODO Replace sleeps with logic to determine when to proceed + + echo "Adding namespace" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "agg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" + } } - } -}' - -echo "Sleep until namespace is init'd" -ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.default.indexOptions.enabled)" == true ]' - -echo "Placement initialization" -curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ - "num_shards": 64, - "replication_factor": 1, - "instances": [ - { - "id": "m3db_local", - "isolation_group": "rack-a", - "zone": "embedded", - "weight": 1024, - "endpoint": "127.0.0.1:9000", - "hostname": "127.0.0.1", - "port": 9000 - } - ] -}' - -echo "Sleep until placement is init'd" -ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' - -echo "Sleep until bootstrapped" -ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' - -echo "Waiting until shards are marked as available" -ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' + }' + + echo "Sleep until namespace is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' + + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ + "name": "unagg", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "48h", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "2h" + } + } + }' + + echo "Sleep until namespace is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + + echo "Placement initialization" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ + "num_shards": 64, + "replication_factor": 1, + "instances": [ + { + "id": "m3db_local", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "endpoint": "127.0.0.1::9000", + "hostname": "127.0.0.1:", + "port": 9000 + } + ] + }' + + echo "Sleep until placement is init'd" + ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + + echo "Sleep until bootstrapped" + ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' + + echo "Waiting until shards are marked as available" + ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' echo "Write data" curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ - "namespace": "default", + "namespace": "unagg", "id": "foo", "tags": [ { @@ -107,7 +139,7 @@ curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ echo "Read data" queryResult=$(curl -sSf -X POST 0.0.0.0:9003/query -d '{ - "namespace": "default", + "namespace": "unagg", "query": { "regexp": { "field": "city", @@ -129,4 +161,4 @@ echo "Deleting placement" curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/placement echo "Deleting namespace" -curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/namespace/default +curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/namespace/unagg diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go new file mode 100644 index 0000000000..6db162ccbb --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -0,0 +1,266 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingestcarbon + +import ( + "bytes" + "context" + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/metrics/carbon" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/log" + m3xserver "github.com/m3db/m3x/server" + xsync "github.com/m3db/m3x/sync" + xtime "github.com/m3db/m3x/time" + + "github.com/uber-go/tally" +) + +var ( + // Used for parsing carbon names into tags. + carbonSeparatorByte = byte('.') + carbonSeparatorBytes = []byte{carbonSeparatorByte} + + // Number of pre-formatted key names to generate in the init() function. + numPreFormattedKeyNames = 100 + // Should never be modified after init(). + preFormattedKeyNames = [][]byte{} + + errCannotGenerateTagsFromEmptyName = errors.New("cannot generate tags from empty name") + errIOptsMustBeSet = errors.New("carbon ingester options: instrument options must be st") + errWorkerPoolMustBeSet = errors.New("carbon ingester options: worker pool must be set") +) + +// Options configures the ingester. +type Options struct { + InstrumentOptions instrument.Options + WorkerPool xsync.PooledWorkerPool + Timeout time.Duration +} + +// Validate validates the options struct. +func (o *Options) Validate() error { + if o.InstrumentOptions == nil { + return errIOptsMustBeSet + } + + if o.WorkerPool == nil { + return errWorkerPoolMustBeSet + } + + return nil +} + +// NewIngester returns an ingester for carbon metrics. +func NewIngester( + downsamplerAndWriter ingest.DownsamplerAndWriter, + opts Options, +) (m3xserver.Handler, error) { + err := opts.Validate() + if err != nil { + return nil, err + } + + return &ingester{ + downsamplerAndWriter: downsamplerAndWriter, + opts: opts, + logger: opts.InstrumentOptions.Logger(), + metrics: newCarbonIngesterMetrics( + opts.InstrumentOptions.MetricsScope()), + }, nil +} + +type ingester struct { + downsamplerAndWriter ingest.DownsamplerAndWriter + opts Options + logger log.Logger + metrics carbonIngesterMetrics +} + +func (i *ingester) Handle(conn net.Conn) { + var ( + wg = sync.WaitGroup{} + s = carbon.NewScanner(conn, i.opts.InstrumentOptions) + logger = i.opts.InstrumentOptions.Logger() + ) + + logger.Debug("handling new carbon ingestion connection") + for s.Scan() { + name, timestamp, value := s.Metric() + // TODO(rartoul): Pool. + // Copy name since scanner bytes are recycled. + name = append([]byte(nil), name...) + + wg.Add(1) + i.opts.WorkerPool.Go(func() { + ok := i.write(name, timestamp, value) + if ok { + i.metrics.success.Inc(1) + } + wg.Done() + }) + + i.metrics.malformed.Inc(int64(s.MalformedCount)) + s.MalformedCount = 0 + } + + if err := s.Err(); err != nil { + logger.Errorf("encountered error during carbon ingestion when scanning connection: %s", err) + } + + logger.Debugf("waiting for outstanding carbon ingestion writes to complete") + wg.Wait() + logger.Debugf("all outstanding writes completed, shutting down carbon ingestion handler") + + // Don't close the connection, that is the server's responsibility. +} + +func (i *ingester) write(name []byte, timestamp time.Time, value float64) bool { + datapoints := []ts.Datapoint{{Timestamp: timestamp, Value: value}} + // TODO(rartoul): Pool. + tags, err := GenerateTagsFromName(name) + if err != nil { + i.logger.Errorf("err generating tags from carbon name: %s, err: %s", + string(name), err) + i.metrics.malformed.Inc(1) + return false + } + + var ( + ctx = context.Background() + cleanup func() + ) + if i.opts.Timeout > 0 { + ctx, cleanup = context.WithTimeout(ctx, i.opts.Timeout) + } + + err = i.downsamplerAndWriter.Write(ctx, tags, datapoints, xtime.Second) + if cleanup != nil { + cleanup() + } + + if err != nil { + i.logger.Errorf("err writing carbon metric: %s, err: %s", + string(name), err) + i.metrics.err.Inc(1) + return false + } + + return true +} + +func (i *ingester) Close() { + // We don't maintain any state in-between connections so there is nothing to do here. +} + +func newCarbonIngesterMetrics(m tally.Scope) carbonIngesterMetrics { + return carbonIngesterMetrics{ + success: m.Counter("success"), + err: m.Counter("error"), + malformed: m.Counter("malformed"), + } +} + +type carbonIngesterMetrics struct { + success tally.Counter + err tally.Counter + malformed tally.Counter +} + +// GenerateTagsFromName accepts a carbon metric name and blows it up into a list of +// key-value pair tags such that an input like: +// foo.bar.baz +// becomes +// __graphite0__:foo +// __graphite1__:bar +// __graphite2__:baz +func GenerateTagsFromName(name []byte) (models.Tags, error) { + if len(name) == 0 { + return models.Tags{}, errCannotGenerateTagsFromEmptyName + } + + var ( + numTags = bytes.Count(name, carbonSeparatorBytes) + 1 + tags = make([]models.Tag, 0, numTags) + ) + + startIdx := 0 + tagNum := 0 + for i, charByte := range name { + if charByte == carbonSeparatorByte { + if i+1 < len(name) && name[i+1] == carbonSeparatorByte { + return models.Tags{}, fmt.Errorf("carbon metric: %s has duplicate separator", string(name)) + } + + tags = append(tags, models.Tag{ + Name: getOrGenerateKeyName(tagNum), + Value: name[startIdx:i], + }) + startIdx = i + 1 + tagNum++ + } + } + + // Write out the final tag since the for loop above will miss anything + // after the final separator. Note, that we make sure that the final + // character in the name is not the separator because in that case there + // would be no additional tag to add. I.E if the input was: + // foo.bar.baz + // then the for loop would append foo and bar, but we would still need to + // append baz, however, if the input was: + // foo.bar.baz. + // then the foor loop would have appended foo, bar, and baz already. + if name[len(name)-1] != carbonSeparatorByte { + tags = append(tags, models.Tag{ + Name: getOrGenerateKeyName(tagNum), + Value: name[startIdx:], + }) + } + + return models.Tags{Tags: tags}, nil +} + +func getOrGenerateKeyName(idx int) []byte { + if idx < len(preFormattedKeyNames) { + return preFormattedKeyNames[idx] + } + + return []byte(fmt.Sprintf("__graphite%d__", idx)) +} + +func generateKeyName(idx int) []byte { + return []byte(fmt.Sprintf("__graphite%d__", idx)) +} + +func init() { + for i := 0; i < numPreFormattedKeyNames; i++ { + keyName := generateKeyName(i) + preFormattedKeyNames = append(preFormattedKeyNames, keyName) + } +} diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go new file mode 100644 index 0000000000..082dba077c --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_benchmark_test.go @@ -0,0 +1,43 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingestcarbon + +import ( + "testing" + + "github.com/m3db/m3/src/query/models" +) + +var benchmarkGenerateTagsSink models.Tags + +func BenchmarkGenerateTagsFromName(b *testing.B) { + var ( + testName = []byte("foo.bar.baz.bax") + err error + ) + + for i := 0; i < b.N; i++ { + benchmarkGenerateTagsSink, err = GenerateTagsFromName(testName) + if err != nil { + panic(err) + } + } +} diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go new file mode 100644 index 0000000000..4d36a617c4 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go @@ -0,0 +1,261 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingestcarbon + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math/rand" + "net" + "sort" + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3x/instrument" + xsync "github.com/m3db/m3x/sync" + xtime "github.com/m3db/m3x/time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +const ( + // Keep this value large enough to catch issues like the ingester + // not copying the name. + numLinesInTestPacket = 10000 +) + +// Created by init(). +var ( + testMetrics = []testMetric{} + testPacket = []byte{} + + testOptions = Options{ + InstrumentOptions: instrument.NewOptions(), + WorkerPool: nil, // Set by init(). + } +) + +func TestIngesterHandleConn(t *testing.T) { + ctrl := gomock.NewController(t) + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + + var ( + lock = sync.Mutex{} + + found = []testMetric{} + idx = 0 + ) + mockDownsamplerAndWriter.EXPECT(). + Write(gomock.Any(), gomock.Any(), gomock.Any(), xtime.Second).DoAndReturn(func( + _ context.Context, + tags models.Tags, + dp ts.Datapoints, + unit xtime.Unit, + ) interface{} { + lock.Lock() + found = append(found, testMetric{ + tags: tags, timestamp: int(dp[0].Timestamp.Unix()), value: dp[0].Value}) + + // Make 1 in 10 writes fail to test those paths. + returnErr := idx%10 == 0 + idx++ + lock.Unlock() + + if returnErr { + return errors.New("some_error") + } + return nil + }).AnyTimes() + + byteConn := &byteConn{b: bytes.NewBuffer(testPacket)} + ingester, err := NewIngester(mockDownsamplerAndWriter, testOptions) + require.NoError(t, err) + ingester.Handle(byteConn) + + assertTestMetricsAreEqual(t, testMetrics, found) +} + +func TestGenerateTagsFromName(t *testing.T) { + testCases := []struct { + name string + expectedTags []models.Tag + expectedErr error + }{ + { + name: "foo", + expectedTags: []models.Tag{ + {Name: []byte("__graphite0__"), Value: []byte("foo")}, + }, + }, + { + name: "foo.bar.baz", + expectedTags: []models.Tag{ + {Name: []byte("__graphite0__"), Value: []byte("foo")}, + {Name: []byte("__graphite1__"), Value: []byte("bar")}, + {Name: []byte("__graphite2__"), Value: []byte("baz")}, + }, + }, + { + name: "foo.bar.baz.", + expectedTags: []models.Tag{ + {Name: []byte("__graphite0__"), Value: []byte("foo")}, + {Name: []byte("__graphite1__"), Value: []byte("bar")}, + {Name: []byte("__graphite2__"), Value: []byte("baz")}, + }, + }, + { + name: "foo..bar..baz..", + expectedErr: fmt.Errorf("carbon metric: foo..bar..baz.. has duplicate separator"), + }, + { + name: "foo.bar.baz..", + expectedErr: fmt.Errorf("carbon metric: foo.bar.baz.. has duplicate separator"), + }, + } + + for _, tc := range testCases { + tags, err := GenerateTagsFromName([]byte(tc.name)) + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.expectedTags, tags.Tags) + } +} + +// byteConn implements the net.Conn interface so that we can test the handler without +// going over the network. +type byteConn struct { + b io.Reader + closed bool +} + +func (b *byteConn) Read(buf []byte) (n int, err error) { + if !b.closed { + return b.b.Read(buf) + } + + return 0, io.EOF +} + +func (b *byteConn) Write(buf []byte) (n int, err error) { + panic("not_implemented") +} + +func (b *byteConn) Close() error { + b.closed = true + return nil +} + +func (b *byteConn) LocalAddr() net.Addr { + panic("not_implemented") +} + +func (b *byteConn) RemoteAddr() net.Addr { + panic("not_implemented") +} + +func (b *byteConn) SetDeadline(t time.Time) error { + panic("not_implemented") +} + +func (b *byteConn) SetReadDeadline(t time.Time) error { + panic("not_implemented") +} + +func (b *byteConn) SetWriteDeadline(t time.Time) error { + panic("not_implemented") +} + +type testMetric struct { + metric []byte + tags models.Tags + timestamp int + value float64 + isValid bool +} + +func assertTestMetricsAreEqual(t *testing.T, a, b []testMetric) { + require.Equal(t, len(a), len(b)) + + sort.Slice(b, func(i, j int) bool { + return b[i].timestamp < b[j].timestamp + }) + + for i, f := range b { + require.Equal(t, a[i].tags, f.tags) + require.Equal(t, a[i].timestamp, f.timestamp) + require.Equal(t, a[i].value, f.value) + } +} + +func init() { + var err error + testOptions.WorkerPool, err = xsync.NewPooledWorkerPool(16, xsync.NewPooledWorkerPoolOptions()) + if err != nil { + panic(err) + } + testOptions.WorkerPool.Init() + + for i := 0; i < numLinesInTestPacket; i++ { + var metric []byte + + if i%10 == 0 { + // Make 1 in 10 lines invalid to test the error paths. + if rand.Intn(2) == 0 { + // Invalid line altogether. + line := []byte(fmt.Sprintf("garbage line %d \n", i)) + testPacket = append(testPacket, line...) + continue + } else { + // Valid line, but invalid name (too many separators). + line := []byte(fmt.Sprintf("test..metric..%d %d %d\n", i, i, i)) + testPacket = append(testPacket, line...) + continue + } + } + + metric = []byte(fmt.Sprintf("test.metric.%d", i)) + + tags, err := GenerateTagsFromName(metric) + if err != nil { + panic(err) + } + testMetrics = append(testMetrics, testMetric{ + metric: metric, + tags: tags, + timestamp: i, + value: float64(i), + }) + + line := []byte(fmt.Sprintf("%s %d %d\n", string(metric), i, i)) + testPacket = append(testPacket, line...) + } +} diff --git a/src/cmd/services/m3coordinator/ingest/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go similarity index 99% rename from src/cmd/services/m3coordinator/ingest/config.go rename to src/cmd/services/m3coordinator/ingest/m3msg/config.go index 715846ce92..3de2379432 100644 --- a/src/cmd/services/m3coordinator/ingest/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package ingest +package ingestm3msg import ( "github.com/m3db/m3/src/query/storage" diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go similarity index 99% rename from src/cmd/services/m3coordinator/ingest/ingest.go rename to src/cmd/services/m3coordinator/ingest/m3msg/ingest.go index 998d45215e..863e83138c 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package ingest +package ingestm3msg import ( "context" diff --git a/src/cmd/services/m3coordinator/ingest/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go similarity index 99% rename from src/cmd/services/m3coordinator/ingest/ingest_test.go rename to src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index cb7b410c3b..ae1953820a 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package ingest +package ingestm3msg import ( "context" diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go new file mode 100644 index 0000000000..9bdc169e3e --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -0,0 +1,221 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingest + +import ( + "context" + "sync" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3x/errors" + xtime "github.com/m3db/m3x/time" +) + +// DownsampleAndWriteIter is an interface that can be implemented to use +// the WriteBatch method. +type DownsampleAndWriteIter interface { + Next() bool + Current() (models.Tags, ts.Datapoints, xtime.Unit) + Reset() error + Error() error +} + +// DownsamplerAndWriter is the interface for the downsamplerAndWriter which +// writes metrics to the downsampler as well as to storage in unaggregated form. +type DownsamplerAndWriter interface { + Write( + ctx context.Context, + tags models.Tags, + datapoints ts.Datapoints, + unit xtime.Unit, + ) error + + WriteBatch( + ctx context.Context, + iter DownsampleAndWriteIter, + ) error + + Storage() storage.Storage +} + +// downsamplerAndWriter encapsulates the logic for writing data to the downsampler, +// as well as in unaggregated form to storage. +type downsamplerAndWriter struct { + store storage.Storage + downsampler downsample.Downsampler +} + +// NewDownsamplerAndWriter creates a new downsampler and writer. +func NewDownsamplerAndWriter( + store storage.Storage, + downsampler downsample.Downsampler, +) DownsamplerAndWriter { + return &downsamplerAndWriter{ + store: store, + downsampler: downsampler, + } +} + +func (d *downsamplerAndWriter) Write( + ctx context.Context, + tags models.Tags, + datapoints ts.Datapoints, + unit xtime.Unit, +) error { + if d.downsampler != nil { + appender, err := d.downsampler.NewMetricsAppender() + if err != nil { + return err + } + + for _, tag := range tags.Tags { + appender.AddTag(tag.Name, tag.Value) + } + + samplesAppender, err := appender.SamplesAppender() + if err != nil { + return err + } + + for _, dp := range datapoints { + err := samplesAppender.AppendGaugeSample(dp.Value) + if err != nil { + return err + } + } + + appender.Finalize() + } + + if d.store != nil { + return d.store.Write(ctx, &storage.WriteQuery{ + Tags: tags, + Datapoints: datapoints, + Unit: unit, + Attributes: storage.Attributes{ + MetricsType: storage.UnaggregatedMetricsType, + }, + }) + + } + + return nil +} + +func (d *downsamplerAndWriter) WriteBatch( + ctx context.Context, + iter DownsampleAndWriteIter, +) error { + var ( + wg = sync.WaitGroup{} + multiErr xerrors.MultiError + errLock sync.Mutex + addError = func(err error) { + errLock.Lock() + multiErr = multiErr.Add(err) + errLock.Unlock() + } + ) + + if d.store != nil { + // Write unaggregated. Spin up all the background goroutines that make + // network requests before we do the synchronous work of writing to the + // downsampler. + for iter.Next() { + wg.Add(1) + tags, datapoints, unit := iter.Current() + go func() { + err := d.store.Write(ctx, &storage.WriteQuery{ + Tags: tags, + Datapoints: datapoints, + Unit: unit, + Attributes: storage.Attributes{ + MetricsType: storage.UnaggregatedMetricsType, + }, + }) + if err != nil { + addError(err) + } + wg.Done() + }() + } + } + + // Iter does not need to be synchronized because even though we use it to spawn + // many goroutines above, the iteration is always synchronous. + resetErr := iter.Reset() + if resetErr != nil { + addError(resetErr) + } + + if d.downsampler != nil && resetErr == nil { + err := d.writeAggregatedBatch(iter) + if err != nil { + addError(err) + } + } + + wg.Wait() + return multiErr.LastError() +} + +func (d *downsamplerAndWriter) writeAggregatedBatch( + iter DownsampleAndWriteIter, +) error { + appender, err := d.downsampler.NewMetricsAppender() + if err != nil { + return err + } + + for iter.Next() { + appender.Reset() + tags, datapoints, _ := iter.Current() + for _, tag := range tags.Tags { + appender.AddTag(tag.Name, tag.Value) + } + + samplesAppender, err := appender.SamplesAppender() + if err != nil { + return err + } + + for _, dp := range datapoints { + err := samplesAppender.AppendGaugeSample(dp.Value) + if err != nil { + return err + } + } + } + appender.Finalize() + + if err := iter.Error(); err != nil { + return err + } + + return nil +} + +func (d *downsamplerAndWriter) Storage() storage.Storage { + return d.store +} diff --git a/src/cmd/services/m3coordinator/ingest/write_mock.go b/src/cmd/services/m3coordinator/ingest/write_mock.go new file mode 100644 index 0000000000..55ca2a6a58 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/write_mock.go @@ -0,0 +1,102 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/cmd/services/m3coordinator/ingest (interfaces: DownsamplerAndWriter) + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package ingest is a generated GoMock package. +package ingest + +import ( + "context" + "reflect" + + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3x/time" + + "github.com/golang/mock/gomock" +) + +// MockDownsamplerAndWriter is a mock of DownsamplerAndWriter interface +type MockDownsamplerAndWriter struct { + ctrl *gomock.Controller + recorder *MockDownsamplerAndWriterMockRecorder +} + +// MockDownsamplerAndWriterMockRecorder is the mock recorder for MockDownsamplerAndWriter +type MockDownsamplerAndWriterMockRecorder struct { + mock *MockDownsamplerAndWriter +} + +// NewMockDownsamplerAndWriter creates a new mock instance +func NewMockDownsamplerAndWriter(ctrl *gomock.Controller) *MockDownsamplerAndWriter { + mock := &MockDownsamplerAndWriter{ctrl: ctrl} + mock.recorder = &MockDownsamplerAndWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDownsamplerAndWriter) EXPECT() *MockDownsamplerAndWriterMockRecorder { + return m.recorder +} + +// Storage mocks base method +func (m *MockDownsamplerAndWriter) Storage() storage.Storage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Storage") + ret0, _ := ret[0].(storage.Storage) + return ret0 +} + +// Storage indicates an expected call of Storage +func (mr *MockDownsamplerAndWriterMockRecorder) Storage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockDownsamplerAndWriter)(nil).Storage)) +} + +// Write mocks base method +func (m *MockDownsamplerAndWriter) Write(arg0 context.Context, arg1 models.Tags, arg2 ts.Datapoints, arg3 time.Unit) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Write", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// Write indicates an expected call of Write +func (mr *MockDownsamplerAndWriterMockRecorder) Write(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockDownsamplerAndWriter)(nil).Write), arg0, arg1, arg2, arg3) +} + +// WriteBatch mocks base method +func (m *MockDownsamplerAndWriter) WriteBatch(arg0 context.Context, arg1 DownsampleAndWriteIter) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteBatch", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteBatch indicates an expected call of WriteBatch +func (mr *MockDownsamplerAndWriterMockRecorder) WriteBatch(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteBatch", reflect.TypeOf((*MockDownsamplerAndWriter)(nil).WriteBatch), arg0, arg1) +} diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go new file mode 100644 index 0000000000..00aba1a045 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -0,0 +1,269 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingest + +import ( + "context" + "testing" + "time" + + "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" + "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/test/m3" + "github.com/m3db/m3/src/query/ts" + xtime "github.com/m3db/m3x/time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +var ( + testTags1 = models.Tags{Tags: []models.Tag{ + { + Name: []byte("test_1_key_1"), + Value: []byte("test_1_value_1"), + }, + { + Name: []byte("test_1_key_2"), + Value: []byte("test_1_value_2"), + }, + { + Name: []byte("test_1_key_3"), + Value: []byte("test_1_value_3"), + }, + }} + testTags2 = models.Tags{Tags: []models.Tag{ + { + Name: []byte("test_2_key_1"), + Value: []byte("test_2_value_1"), + }, + { + Name: []byte("test_2_key_2"), + Value: []byte("test_2_value_2"), + }, + { + Name: []byte("test_2_key_3"), + Value: []byte("test_2_value_3"), + }, + }} + + testDatapoints1 = []ts.Datapoint{ + { + Timestamp: time.Unix(0, 0), + Value: 0, + }, + { + Timestamp: time.Unix(0, 1), + Value: 1, + }, + { + Timestamp: time.Unix(0, 2), + Value: 2, + }, + } + testDatapoints2 = []ts.Datapoint{ + { + Timestamp: time.Unix(0, 3), + Value: 3, + }, + { + Timestamp: time.Unix(0, 4), + Value: 4, + }, + { + Timestamp: time.Unix(0, 5), + Value: 5, + }, + } + + testEntries = []testIterEntry{ + {tags: testTags1, datapoints: testDatapoints1}, + {tags: testTags2, datapoints: testDatapoints2}, + } +) + +type testIter struct { + idx int + entries []testIterEntry +} + +type testIterEntry struct { + tags models.Tags + datapoints []ts.Datapoint +} + +func newTestIter(entries []testIterEntry) *testIter { + return &testIter{ + idx: -1, + entries: entries, + } +} + +func (i *testIter) Next() bool { + i.idx++ + return i.idx < len(i.entries) +} + +func (i *testIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) { + if len(i.entries) == 0 || i.idx < 0 || i.idx >= len(i.entries) { + return models.Tags{}, nil, 0 + } + + curr := i.entries[i.idx] + return curr.tags, curr.datapoints, xtime.Second +} + +func (i *testIter) Reset() error { + i.idx = -1 + return nil +} + +func (i *testIter) Error() error { + return nil +} + +func TestDownsampleAndWrite(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, downsampler, session := newTestDownsamplerAndWriter(t, ctrl) + + var ( + mockSamplesAppender = downsample.NewMockSamplesAppender(ctrl) + mockMetricsAppender = downsample.NewMockMetricsAppender(ctrl) + ) + + mockMetricsAppender. + EXPECT(). + SamplesAppender(). + Return(mockSamplesAppender, nil) + for _, tag := range testTags1.Tags { + mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value) + } + + for _, dp := range testDatapoints1 { + mockSamplesAppender.EXPECT().AppendGaugeSample(dp.Value) + } + downsampler.EXPECT().NewMetricsAppender().Return(mockMetricsAppender, nil) + + mockMetricsAppender.EXPECT().Finalize() + + for _, dp := range testDatapoints1 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + + err := downAndWrite.Write(context.Background(), testTags1, testDatapoints1, xtime.Second) + require.NoError(t, err) +} + +func TestDownsampleAndWriteNoDownsampler(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, _, session := newTestDownsamplerAndWriter(t, ctrl) + downAndWrite.downsampler = nil + + for _, dp := range testDatapoints1 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + + err := downAndWrite.Write(context.Background(), testTags1, testDatapoints1, xtime.Second) + require.NoError(t, err) +} + +func TestDownsampleAndWriteBatch(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, downsampler, session := newTestDownsamplerAndWriter(t, ctrl) + + var ( + mockSamplesAppender = downsample.NewMockSamplesAppender(ctrl) + mockMetricsAppender = downsample.NewMockMetricsAppender(ctrl) + ) + + mockMetricsAppender. + EXPECT(). + SamplesAppender(). + Return(mockSamplesAppender, nil).Times(2) + for _, tag := range testTags1.Tags { + mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value) + } + for _, dp := range testDatapoints1 { + mockSamplesAppender.EXPECT().AppendGaugeSample(dp.Value) + } + for _, tag := range testTags2.Tags { + mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value) + } + for _, dp := range testDatapoints2 { + mockSamplesAppender.EXPECT().AppendGaugeSample(dp.Value) + } + downsampler.EXPECT().NewMetricsAppender().Return(mockMetricsAppender, nil) + + mockMetricsAppender.EXPECT().Reset().Times(2) + mockMetricsAppender.EXPECT().Finalize() + + for _, dp := range testDatapoints1 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + for _, dp := range testDatapoints2 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + + iter := newTestIter(testEntries) + err := downAndWrite.WriteBatch(context.Background(), iter) + require.NoError(t, err) +} + +func TestDownsampleAndWriteBatchNoDownsampler(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + downAndWrite, _, session := newTestDownsamplerAndWriter(t, ctrl) + downAndWrite.downsampler = nil + + for _, dp := range testDatapoints1 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + for _, dp := range testDatapoints2 { + session.EXPECT().WriteTagged( + gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), dp.Value, gomock.Any(), gomock.Any()) + } + + iter := newTestIter(testEntries) + err := downAndWrite.WriteBatch(context.Background(), iter) + require.NoError(t, err) +} + +func newTestDownsamplerAndWriter( + t *testing.T, + ctrl *gomock.Controller, +) (*downsamplerAndWriter, *downsample.MockDownsampler, *client.MockSession) { + storage, session := m3.NewStorageAndSession(t, ctrl) + downsampler := downsample.NewMockDownsampler(ctrl) + return NewDownsamplerAndWriter(storage, downsampler).(*downsamplerAndWriter), downsampler, session +} diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 762e56ed70..747e3e5637 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -25,7 +25,7 @@ import ( etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" - "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest/m3msg" "github.com/m3db/m3/src/cmd/services/m3coordinator/server/m3msg" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/m3" @@ -42,13 +42,20 @@ const ( GRPCStorageType BackendStorageType = "grpc" // M3DBStorageType is for m3db backend. M3DBStorageType BackendStorageType = "m3db" + + defaultCarbonIngesterListenAddress = "0.0.0.0:7204" ) -// defaultLimitsConfiguration is applied if `limits` isn't specified. -var defaultLimitsConfiguration = &LimitsConfiguration{ - // this is sufficient for 1 day span / 1s step, or 60 days with a 1m step. - MaxComputedDatapoints: 86400, -} +var ( + defaultCarbonIngesterTimeout = 5 * time.Second + defaultCarbonIngesterEnabled = true + + // defaultLimitsConfiguration is applied if `limits` isn't specified. + defaultLimitsConfiguration = &LimitsConfiguration{ + // this is sufficient for 1 day span / 1s step, or 60 days with a 1m step. + MaxComputedDatapoints: 86400, + } +) // Configuration is the configuration for the query service. type Configuration struct { @@ -94,6 +101,9 @@ type Configuration struct { // Ingest is the ingest server. Ingest *IngestConfiguration `yaml:"ingest"` + // Carbon is the carbon configuration. + Carbon CarbonConfiguration `yaml:"carbon"` + // Limits specifies limits on per-query resource usage. Limits LimitsConfiguration `yaml:"limits"` } @@ -127,12 +137,55 @@ type LimitsConfiguration struct { // IngestConfiguration is the configuration for ingestion server. type IngestConfiguration struct { // Ingester is the configuration for storage based ingester. - Ingester ingest.Configuration `yaml:"ingester"` + Ingester ingestm3msg.Configuration `yaml:"ingester"` // M3Msg is the configuration for m3msg server. M3Msg m3msg.Configuration `yaml:"m3msg"` } +// CarbonConfiguration is the configuration for the carbon server. +type CarbonConfiguration struct { + Ingestion CarbonIngestionConfiguration +} + +// CarbonIngestionConfiguration is the configuration struct for carbon ingestion. +type CarbonIngestionConfiguration struct { + Enabled *bool `yaml:"enabled"` + MaxConcurrency int `yaml:"maxConcurrency"` + ListenAddress string `yaml:"listenAddress"` + Timeout *time.Duration `yaml:"timeout"` +} + +// EnabledOrDefault returns the configured value for Enabled, if set, or the default +// value otherwise. +func (c *CarbonIngestionConfiguration) EnabledOrDefault() bool { + if c.Enabled != nil { + return *c.Enabled + } + + return defaultCarbonIngesterEnabled +} + +// TimeoutOrDefault returns the configured value for Timeout, if set, or the default +// value otherwise. +func (c *CarbonIngestionConfiguration) TimeoutOrDefault() time.Duration { + if c.Timeout != nil { + return *c.Timeout + } + + return defaultCarbonIngesterTimeout +} + +// ListenAddressOrDefault returns the configured value for ListenAddress, if set, or the default +// value otherwise. +func (c *CarbonIngestionConfiguration) ListenAddressOrDefault() string { + if c.ListenAddress != "" { + return c.ListenAddress + } + + return defaultCarbonIngesterListenAddress +} + // LocalConfiguration is the local embedded configuration if running // coordinator embedded in the DB. type LocalConfiguration struct { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 53aacb42e2..51a72deb1c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -630,6 +630,14 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( if err != nil { return shardResult, err } + defer func() { + err := reader.Close() + if err != nil { + s.log.Errorf( + "error closing reader for shard: %d and blockstart: %s and volume: %d, err: %s", + shard, blockStart.String(), mostRecentCompleteSnapshot.ID.VolumeIndex, err.Error()) + } + }() s.log.Debugf( "reading snapshot for shard: %d and blockStart: %s and volume: %d", diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index 6781148d7f..afc2358c27 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -340,6 +340,7 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) { FileSetType: persist.FileSetSnapshotType, }).Return(nil).AnyTimes() mockReader.EXPECT().Entries().Return(1).AnyTimes() + mockReader.EXPECT().Close().Return(nil).AnyTimes() snapshotValues := []testValue{ {foo, start.Add(1 * time.Minute), 1.0, xtime.Nanosecond, nil}, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 768e1c1da5..c8968549b3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -56,7 +56,7 @@ import ( "github.com/stretchr/testify/require" ) -const maxShards = 8192 +const maxShards = 1024 const blockSize = 2 * time.Hour var ( diff --git a/src/metrics/carbon/parser.go b/src/metrics/carbon/parser.go index 3dbe069b74..764f3587a2 100644 --- a/src/metrics/carbon/parser.go +++ b/src/metrics/carbon/parser.go @@ -31,6 +31,7 @@ import ( "time" "unicode/utf8" + "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/unsafe" ) @@ -229,13 +230,15 @@ type Scanner struct { // The number of malformed metrics encountered. MalformedCount int + + iOpts instrument.Options } // NewScanner creates a new carbon scanner. -func NewScanner(r io.Reader) *Scanner { +func NewScanner(r io.Reader, iOpts instrument.Options) *Scanner { s := bufio.NewScanner(r) s.Split(bufio.ScanLines) - return &Scanner{scanner: s} + return &Scanner{scanner: s, iOpts: iOpts} } // Scan scans for the next carbon metric. Malformed metrics are skipped but counted. @@ -247,6 +250,9 @@ func (s *Scanner) Scan() bool { var err error if s.path, s.timestamp, s.value, err = Parse(s.scanner.Bytes()); err != nil { + s.iOpts.Logger().Errorf( + "error trying to scan malformed carbon line: %s, err: %s", + string(s.path), err.Error()) s.MalformedCount++ continue } @@ -277,7 +283,7 @@ func parseWordOffsets(b []byte) (int, int) { } } - valEnd := -1 + valEnd := valStart reachedEnd := true for i := valStart + 1; i < len(b); i++ { valEnd = i diff --git a/src/metrics/carbon/parser_test.go b/src/metrics/carbon/parser_test.go index 367487751d..d609ca7959 100644 --- a/src/metrics/carbon/parser_test.go +++ b/src/metrics/carbon/parser_test.go @@ -28,12 +28,16 @@ import ( "testing" "time" + "github.com/m3db/m3x/instrument" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var ( nullTime time.Time + + testIOpts = instrument.NewOptions() ) type carbonLine struct { @@ -54,6 +58,8 @@ var testLines = []carbonLine{ "foo.bar.nan -NaN 1428951394"}, {"foo.bar.negative", -18000.00000, time.Unix(1429480924, 0), "foo.bar.negative -18000.000000 1429480924"}, + {"short", 1, time.Unix(1, 0), + "short 1 1"}, } func TestScannerMetric(t *testing.T) { @@ -62,7 +68,7 @@ func TestScannerMetric(t *testing.T) { fmt.Fprintf(&buf, "%s\n", line.line) } - s := NewScanner(&buf) + s := NewScanner(&buf, testIOpts) for _, line := range testLines { t.Run(line.path, func(t *testing.T) { require.True(t, s.Scan(), "could not parse to line %s, err: %v", line.line, s.Err()) diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index e56cf0df3d..1c89d7ec1c 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -24,17 +24,17 @@ import ( "context" "errors" "net/http" - "sync" - "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/net/http" - xerrors "github.com/m3db/m3x/errors" + xtime "github.com/m3db/m3x/time" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" @@ -50,33 +50,30 @@ const ( ) var ( - errNoStorageOrDownsampler = errors.New("no storage or downsampler set, requires at least one or both") + errNoDownsamplerAndWriter = errors.New("no ingest.DownsamplerAndWriter was set") ) // PromWriteHandler represents a handler for prometheus write endpoint. type PromWriteHandler struct { - store storage.Storage - downsampler downsample.Downsampler - promWriteMetrics promWriteMetrics - tagOptions models.TagOptions + downsamplerAndWriter ingest.DownsamplerAndWriter + promWriteMetrics promWriteMetrics + tagOptions models.TagOptions } // NewPromWriteHandler returns a new instance of handler. func NewPromWriteHandler( - store storage.Storage, - downsampler downsample.Downsampler, + downsamplerAndWriter ingest.DownsamplerAndWriter, tagOptions models.TagOptions, scope tally.Scope, ) (http.Handler, error) { - if store == nil && downsampler == nil { - return nil, errNoStorageOrDownsampler + if downsamplerAndWriter == nil { + return nil, errNoDownsamplerAndWriter } return &PromWriteHandler{ - store: store, - downsampler: downsampler, - promWriteMetrics: newPromWriteMetrics(scope), - tagOptions: tagOptions, + downsamplerAndWriter: downsamplerAndWriter, + promWriteMetrics: newPromWriteMetrics(scope), + tagOptions: tagOptions, }, nil } @@ -101,7 +98,9 @@ func (h *PromWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { xhttp.Error(w, rErr.Inner(), rErr.Code()) return } - if err := h.write(r.Context(), req); err != nil { + + err := h.write(r.Context(), req) + if err != nil { h.promWriteMetrics.writeErrorsServer.Inc(1) logging.WithContext(r.Context()).Error("Write error", zap.Any("err", err)) xhttp.Error(w, err, http.StatusInternalServerError) @@ -126,104 +125,53 @@ func (h *PromWriteHandler) parseRequest(r *http.Request) (*prompb.WriteRequest, } func (h *PromWriteHandler) write(ctx context.Context, r *prompb.WriteRequest) error { + iter := newPromTSIter(r.Timeseries, h.tagOptions) + return h.downsamplerAndWriter.WriteBatch(ctx, iter) +} + +func newPromTSIter(timeseries []*prompb.TimeSeries, tagOpts models.TagOptions) *promTSIter { + // Construct the tags and datapoints upfront so that if the iterator + // is reset, we don't have to generate them twice. var ( - wg sync.WaitGroup - writeUnaggErr error - writeAggErr error + tags = make([]models.Tags, 0, len(timeseries)) + datapoints = make([]ts.Datapoints, 0, len(timeseries)) ) - if h.downsampler != nil { - // If writing downsampled aggregations, write them async - wg.Add(1) - go func() { - writeAggErr = h.writeAggregated(ctx, r) - wg.Done() - }() + for _, promTS := range timeseries { + tags = append(tags, storage.PromLabelsToM3Tags(promTS.Labels, tagOpts)) + datapoints = append(datapoints, storage.PromSamplesToM3Datapoints(promTS.Samples)) } - if h.store != nil { - // Write the unaggregated points out, don't spawn goroutine - // so we reduce number of goroutines just a fraction - writeUnaggErr = h.writeUnaggregated(ctx, r) + return &promTSIter{ + idx: -1, + tags: tags, + datapoints: datapoints, } +} - if h.downsampler != nil { - // Wait for downsampling to finish if we wrote datapoints - // for aggregations - wg.Wait() - } +type promTSIter struct { + idx int + tags []models.Tags + datapoints []ts.Datapoints +} - var multiErr xerrors.MultiError - multiErr = multiErr.Add(writeUnaggErr) - multiErr = multiErr.Add(writeAggErr) - return multiErr.FinalError() +func (i *promTSIter) Next() bool { + i.idx++ + return i.idx < len(i.tags) } -func (h *PromWriteHandler) writeUnaggregated( - ctx context.Context, - r *prompb.WriteRequest, -) error { - var ( - wg sync.WaitGroup - errLock sync.Mutex - multiErr xerrors.MultiError - ) - for _, t := range r.Timeseries { - t := t // Capture for goroutine - - // TODO(r): Consider adding a worker pool to limit write - // request concurrency, instead of using the batch size - // of incoming request to determine concurrency (some level of control). - wg.Add(1) - go func() { - write := storage.PromWriteTSToM3(t, h.tagOptions) - write.Attributes = storage.Attributes{ - MetricsType: storage.UnaggregatedMetricsType, - } - - if err := h.store.Write(ctx, write); err != nil { - errLock.Lock() - multiErr = multiErr.Add(err) - errLock.Unlock() - } - - wg.Done() - }() +func (i *promTSIter) Current() (models.Tags, ts.Datapoints, xtime.Unit) { + if len(i.tags) == 0 || i.idx < 0 || i.idx >= len(i.tags) { + return models.Tags{}, nil, 0 } - wg.Wait() - return multiErr.LastError() + return i.tags[i.idx], i.datapoints[i.idx], xtime.Millisecond } -func (h *PromWriteHandler) writeAggregated( - _ context.Context, - r *prompb.WriteRequest, -) error { - metricsAppender, err := h.downsampler.NewMetricsAppender() - if err != nil { - return err - } - - multiErr := xerrors.NewMultiError() - for _, ts := range r.Timeseries { - metricsAppender.Reset() - for _, label := range ts.Labels { - metricsAppender.AddTag(label.Name, label.Value) - } - - samplesAppender, err := metricsAppender.SamplesAppender() - if err != nil { - multiErr = multiErr.Add(err) - continue - } - - for _, elem := range ts.Samples { - err := samplesAppender.AppendGaugeSample(elem.Value) - if err != nil { - multiErr = multiErr.Add(err) - } - } - } +func (i *promTSIter) Reset() error { + i.idx = -1 + return nil +} - metricsAppender.Finalize() - return multiErr.LastError() +func (i *promTSIter) Error() error { + return nil } diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index 05329c4e5a..83e35fe83a 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -27,9 +27,9 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/dbnode/x/metrics" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote/test" - "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/query/util/logging" xclock "github.com/m3db/m3x/clock" @@ -40,10 +40,8 @@ import ( func TestPromWriteParsing(t *testing.T) { logging.InitWithCores(nil) - ctrl := gomock.NewController(t) - storage, _ := m3.NewStorageAndSession(t, ctrl) - promWrite := &PromWriteHandler{store: storage} + promWrite := &PromWriteHandler{} promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) @@ -58,10 +56,10 @@ func TestPromWrite(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) - storage, session := m3.NewStorageAndSession(t, ctrl) - session.EXPECT().WriteTagged(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + mockDownsamplerAndWriter.EXPECT().WriteBatch(gomock.Any(), gomock.Any()) - promWrite := &PromWriteHandler{store: storage} + promWrite := &PromWriteHandler{downsamplerAndWriter: mockDownsamplerAndWriter} promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) @@ -78,15 +76,18 @@ func TestWriteErrorMetricCount(t *testing.T) { logging.InitWithCores(nil) ctrl := gomock.NewController(t) - storage, session := m3.NewStorageAndSession(t, ctrl) - session.EXPECT().WriteTagged(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + mockDownsamplerAndWriter.EXPECT().WriteBatch(gomock.Any(), gomock.Any()) reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions()) scope, closer := tally.NewRootScope(tally.ScopeOptions{Reporter: reporter}, time.Millisecond) defer closer.Close() writeMetrics := newPromWriteMetrics(scope) - promWrite := &PromWriteHandler{store: storage, promWriteMetrics: writeMetrics} + promWrite := &PromWriteHandler{ + downsamplerAndWriter: mockDownsamplerAndWriter, + promWriteMetrics: writeMetrics, + } req, _ := http.NewRequest("POST", PromWriteURL, nil) promWrite.ServeHTTP(httptest.NewRecorder(), req) diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 7be06d0003..d7121bb9e1 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -27,7 +27,7 @@ import ( "time" clusterclient "github.com/m3db/m3/src/cluster/client" - "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/api/v1/handler" @@ -63,18 +63,18 @@ var ( // Handler represents an HTTP handler. type Handler struct { - router *mux.Router - handler http.Handler - storage storage.Storage - downsampler downsample.Downsampler - engine *executor.Engine - clusters m3.Clusters - clusterClient clusterclient.Client - config config.Configuration - embeddedDbCfg *dbconfig.DBConfiguration - scope tally.Scope - createdAt time.Time - tagOptions models.TagOptions + router *mux.Router + handler http.Handler + storage storage.Storage + downsamplerAndWriter ingest.DownsamplerAndWriter + engine *executor.Engine + clusters m3.Clusters + clusterClient clusterclient.Client + config config.Configuration + embeddedDbCfg *dbconfig.DBConfiguration + scope tally.Scope + createdAt time.Time + tagOptions models.TagOptions } // Router returns the http handler registered with all relevant routes for query. @@ -84,9 +84,8 @@ func (h *Handler) Router() http.Handler { // NewHandler returns a new instance of handler with routes. func NewHandler( - storage storage.Storage, + downsamplerAndWriter ingest.DownsamplerAndWriter, tagOptions models.TagOptions, - downsampler downsample.Downsampler, engine *executor.Engine, m3dbClusters m3.Clusters, clusterClient clusterclient.Client, @@ -105,18 +104,18 @@ func NewHandler( } h := &Handler{ - router: r, - handler: withMiddleware, - storage: storage, - downsampler: downsampler, - engine: engine, - clusters: m3dbClusters, - clusterClient: clusterClient, - config: cfg, - embeddedDbCfg: embeddedDbCfg, - scope: scope, - createdAt: time.Now(), - tagOptions: tagOptions, + router: r, + handler: withMiddleware, + storage: downsamplerAndWriter.Storage(), + downsamplerAndWriter: downsamplerAndWriter, + engine: engine, + clusters: m3dbClusters, + clusterClient: clusterClient, + config: cfg, + embeddedDbCfg: embeddedDbCfg, + scope: scope, + createdAt: time.Now(), + tagOptions: tagOptions, } return h, nil } @@ -133,8 +132,7 @@ func (h *Handler) RegisterRoutes() error { // Prometheus remote read/write endpoints promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource)) promRemoteWriteHandler, err := remote.NewPromWriteHandler( - h.storage, - h.downsampler, + h.downsamplerAndWriter, h.tagOptions, h.scope.Tagged(remoteSource), ) diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index f9544b44a9..8304b176d1 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/cmd/services/m3query/config" m3json "github.com/m3db/m3/src/query/api/v1/handler/json" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" @@ -48,7 +49,8 @@ func makeTagOptions() models.TagOptions { } func setupHandler(store storage.Storage) (*Handler, error) { - return NewHandler(store, makeTagOptions(), nil, executor.NewEngine(store, tally.NewTestScope("test", nil)), nil, nil, + downsamplerAndWriter := ingest.NewDownsamplerAndWriter(store, nil) + return NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(store, tally.NewTestScope("test", nil)), nil, nil, config.Configuration{}, nil, tally.NewTestScope("", nil)) } diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index 85bc8843d3..d5f88dfec6 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -21,6 +21,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode). //go:generate sh -c "mockgen -package=downsample $PACKAGE/src/cmd/services/m3coordinator/downsample Downsampler,MetricsAppender,SamplesAppender | genclean -pkg $PACKAGE/src/cmd/services/m3coordinator/downsample -out $GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/downsample/downsample_mock.go" //go:generate sh -c "mockgen -package=block -destination=$GOPATH/src/$PACKAGE/src/query/block/block_mock.go $PACKAGE/src/query/block Block,StepIter,SeriesIter,Builder,Step" +//go:generate sh -c "mockgen -package=ingest -destination=$GOPATH/src/$PACKAGE/src/cmd/services/m3coordinator/ingest/write_mock.go $PACKAGE/src/cmd/services/m3coordinator/ingest DownsamplerAndWriter" // mockgen rules for generating mocks for unexported interfaces (file mode). //go:generate sh -c "mockgen -package=m3ql -destination=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types.go" diff --git a/src/query/server/server.go b/src/query/server/server.go index 7958a0f1bd..f34d059b5e 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -33,6 +33,8 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" "github.com/m3db/m3/src/cmd/services/m3coordinator/downsample" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" + "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest/carbon" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" @@ -57,6 +59,7 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/pool" + xserver "github.com/m3db/m3x/server" xsync "github.com/m3db/m3x/sync" xtime "github.com/m3db/m3x/time" @@ -71,6 +74,8 @@ var ( Namespace: "default", Retention: 2 * 24 * time.Hour, } + + defaultCarbonIngesterWorkerPoolSize = 1024 ) type cleanupFn func() error @@ -204,7 +209,8 @@ func Run(runOpts RunOptions) { engine := executor.NewEngine(backendStorage, scope.SubScope("engine")) - handler, err := httpd.NewHandler(backendStorage, tagOptions, downsampler, engine, + promDownsamplerAndWriter := ingest.NewDownsamplerAndWriter(backendStorage, downsampler) + handler, err := httpd.NewHandler(promDownsamplerAndWriter, tagOptions, engine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig, scope) if err != nil { logger.Fatal("unable to set up handlers", zap.Error(err)) @@ -236,7 +242,7 @@ func Run(runOpts RunOptions) { }() if cfg.Ingest != nil { - logger.Info("starting m3msg server ") + logger.Info("starting m3msg server") ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) @@ -244,7 +250,7 @@ func Run(runOpts RunOptions) { server, err := cfg.Ingest.M3Msg.NewServer( ingester.Ingest, - instrumentOptions.SetMetricsScope(scope.SubScope("m3msg")), + instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")), ) if err != nil { @@ -261,6 +267,61 @@ func Run(runOpts RunOptions) { logger.Info("no m3msg server configured") } + carbonIngestConfig := cfg.Carbon.Ingestion + if carbonIngestConfig.EnabledOrDefault() { + logger.Info("carbon ingestion enabled") + + var ( + carbonIOpts = instrumentOptions.SetMetricsScope( + instrumentOptions.MetricsScope().SubScope("ingest-carbon")) + carbonWorkerPoolOpts xsync.PooledWorkerPoolOptions + carbonWorkerPoolSize int + ) + if carbonIngestConfig.MaxConcurrency > 0 { + // Use a bounded worker pool if they requested a specific maximum concurrency. + carbonWorkerPoolOpts = xsync.NewPooledWorkerPoolOptions(). + SetGrowOnDemand(false). + SetInstrumentOptions(carbonIOpts) + carbonWorkerPoolSize = carbonIngestConfig.MaxConcurrency + } else { + carbonWorkerPoolOpts = xsync.NewPooledWorkerPoolOptions(). + SetGrowOnDemand(true). + SetKillWorkerProbability(0.001) + carbonWorkerPoolSize = defaultCarbonIngesterWorkerPoolSize + } + workerPool, err := xsync.NewPooledWorkerPool(carbonWorkerPoolSize, carbonWorkerPoolOpts) + if err != nil { + logger.Fatal("unable to create worker pool for carbon ingester", zap.Error(err)) + } + workerPool.Init() + + // Create a new downsampler and writer because we don't want the carbon ingester to write + // any data unaggregated, so we pass nil for storage.Storage. + carbonIngestDownsamplerAndWriter := ingest.NewDownsamplerAndWriter(nil, downsampler) + ingester, err := ingestcarbon.NewIngester(carbonIngestDownsamplerAndWriter, ingestcarbon.Options{ + InstrumentOptions: carbonIOpts, + WorkerPool: workerPool, + Timeout: carbonIngestConfig.TimeoutOrDefault(), + }) + if err != nil { + logger.Fatal("unable to create carbon ingester", zap.Error(err)) + } + + var ( + serverOpts = xserver.NewOptions().SetInstrumentOptions(carbonIOpts) + carbonListenAddress = carbonIngestConfig.ListenAddressOrDefault() + carbonServer = xserver.NewServer(carbonListenAddress, ingester, serverOpts) + ) + + logger.Info("starting carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) + err = carbonServer.ListenAndServe() + if err != nil { + logger.Fatal("unable to start carbon ingestion server at listen address", + zap.String("listenAddress", carbonListenAddress), zap.Error(err)) + } + logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) + } + var interruptCh <-chan error = make(chan error) if runOpts.InterruptCh != nil { interruptCh = runOpts.InterruptCh diff --git a/src/query/server/server_test.go b/src/query/server/server_test.go index af0434ea97..02af237980 100644 --- a/src/query/server/server_test.go +++ b/src/query/server/server_test.go @@ -234,6 +234,10 @@ listenAddress: type: "config" value: "127.0.0.1:17201" +carbon: + ingestion: + listenAddress: "127.0.0.1:17204" + metrics: scope: prefix: "coordinator"