diff --git a/Makefile b/Makefile index bea68d9818..294e8aea64 100644 --- a/Makefile +++ b/Makefile @@ -200,8 +200,7 @@ docker-integration-test: @./scripts/docker-integration-tests/setup.sh @./scripts/docker-integration-tests/simple/test.sh @./scripts/docker-integration-tests/prometheus/test.sh - # TODO(rartoul): Re-enable once the query P.R lands and we can fix this test. - # @./scripts/docker-integration-tests/carbon/test.sh + @./scripts/docker-integration-tests/carbon/test.sh .PHONY: site-build site-build: diff --git a/scripts/development/m3_stack/m3coordinator.yml b/scripts/development/m3_stack/m3coordinator.yml index ec9d90f462..af712a8dff 100644 --- a/scripts/development/m3_stack/m3coordinator.yml +++ b/scripts/development/m3_stack/m3coordinator.yml @@ -61,10 +61,13 @@ ingest: logSampleRate: 0.01 m3msg: server: - listenAddress: 0.0.0.0:7507 + listenAddress: "0.0.0.0:7507" retry: maxBackoff: 10s jitter: true handler: protobufEnabled: true +carbon: + ingester: + listenAddress: "0.0.0.0:7204" diff --git a/scripts/docker-integration-tests/carbon/m3coordinator.yml b/scripts/docker-integration-tests/carbon/m3coordinator.yml index 81e804d528..ea892d0e89 100644 --- a/scripts/docker-integration-tests/carbon/m3coordinator.yml +++ b/scripts/docker-integration-tests/carbon/m3coordinator.yml @@ -51,4 +51,5 @@ clusters: backgroundHealthCheckFailThrottleFactor: 0.5 carbon: - enabled: true + ingester: + listenAddress: "0.0.0.0:7204" diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh index 769ffada25..19204784fd 100755 --- a/scripts/docker-integration-tests/carbon/test.sh +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -20,14 +20,14 @@ trap defer EXIT setup_single_m3db_node echo "Writing out a carbon metric" -echo "foo.bar.baz 1 `date +%s`" | nc 0.0.0.0 7204 +echo "foo.bar.baz 42 `date +%s`" | nc 0.0.0.0 7204 echo "Attempting to read carbon metric back" function read_carbon { end=$(date +%s) - start=$(($end-3000)) - RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/query_range?start=$start&end=$end&step=10&query={__g0__='foo',__g1__='bar',__g2__='baz'}") - echo "$RESPONSE" | jq '.data.result[0].values[][1]=="1"' | grep -q "true" + start=$(($end-1000)) + RESPONSE=$(curl -sSfg "http://localhost:7201/api/v1/graphite/render?target=foo.bar.*&from=$start&until=$end") + test "$(echo "$RESPONSE" | jq ".[0].datapoints | .[][0] | select(. != null)" | tail -n 1)" = "42" return $? } ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index cd56d071f5..3c4d133bad 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -42,8 +42,9 @@ function setup_single_m3db_node { } function wait_for_db_init { - echo "Sleeping for a bit to ensure db up" - sleep 15 # TODO Replace sleeps with logic to determine when to proceed + echo "Wait for API to be available" + ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq ".namespaces | length")" == "0" ]' echo "Adding namespace" curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ @@ -70,7 +71,7 @@ function wait_for_db_init { } }' - echo "Sleep until namespace is init'd" + echo "Wait until namespace is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 747e3e5637..d71f1178c1 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -47,8 +47,7 @@ const ( ) var ( - defaultCarbonIngesterTimeout = 5 * time.Second - defaultCarbonIngesterEnabled = true + defaultCarbonIngesterWriteTimeout = 15 * time.Second // defaultLimitsConfiguration is applied if `limits` isn't specified. defaultLimitsConfiguration = &LimitsConfiguration{ @@ -102,7 +101,7 @@ type Configuration struct { Ingest *IngestConfiguration `yaml:"ingest"` // Carbon is the carbon configuration. - Carbon CarbonConfiguration `yaml:"carbon"` + Carbon *CarbonConfiguration `yaml:"carbon"` // Limits specifies limits on per-query resource usage. Limits LimitsConfiguration `yaml:"limits"` @@ -145,45 +144,24 @@ type IngestConfiguration struct { // CarbonConfiguration is the configuration for the carbon server. type CarbonConfiguration struct { - Ingestion CarbonIngestionConfiguration + Ingester *CarbonIngesterConfiguration `yaml:"ingester"` } -// CarbonIngestionConfiguration is the configuration struct for carbon ingestion. -type CarbonIngestionConfiguration struct { - Enabled *bool `yaml:"enabled"` - MaxConcurrency int `yaml:"maxConcurrency"` +// CarbonIngesterConfiguration is the configuration struct for carbon ingestion. +type CarbonIngesterConfiguration struct { ListenAddress string `yaml:"listenAddress"` - Timeout *time.Duration `yaml:"timeout"` -} - -// EnabledOrDefault returns the configured value for Enabled, if set, or the default -// value otherwise. -func (c *CarbonIngestionConfiguration) EnabledOrDefault() bool { - if c.Enabled != nil { - return *c.Enabled - } - - return defaultCarbonIngesterEnabled -} - -// TimeoutOrDefault returns the configured value for Timeout, if set, or the default -// value otherwise. -func (c *CarbonIngestionConfiguration) TimeoutOrDefault() time.Duration { - if c.Timeout != nil { - return *c.Timeout - } - - return defaultCarbonIngesterTimeout + MaxConcurrency int `yaml:"maxConcurrency"` + WriteTimeout *time.Duration `yaml:"writeTimeout"` } -// ListenAddressOrDefault returns the configured value for ListenAddress, if set, or the default -// value otherwise. -func (c *CarbonIngestionConfiguration) ListenAddressOrDefault() string { - if c.ListenAddress != "" { - return c.ListenAddress +// WriteTimeoutOrDefault returns the configured value for the write timeout, +// if set, or the default value otherwise. +func (c *CarbonIngesterConfiguration) WriteTimeoutOrDefault() time.Duration { + if c.WriteTimeout != nil { + return *c.WriteTimeout } - return defaultCarbonIngesterListenAddress + return defaultCarbonIngesterWriteTimeout } // LocalConfiguration is the local embedded configuration if running diff --git a/src/query/server/server.go b/src/query/server/server.go index f34d059b5e..87a6ecaf97 100644 --- a/src/query/server/server.go +++ b/src/query/server/server.go @@ -27,6 +27,7 @@ import ( "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -267,8 +268,8 @@ func Run(runOpts RunOptions) { logger.Info("no m3msg server configured") } - carbonIngestConfig := cfg.Carbon.Ingestion - if carbonIngestConfig.EnabledOrDefault() { + if cfg.Carbon != nil && cfg.Carbon.Ingester != nil { + ingesterCfg := cfg.Carbon.Ingester logger.Info("carbon ingestion enabled") var ( @@ -277,12 +278,12 @@ func Run(runOpts RunOptions) { carbonWorkerPoolOpts xsync.PooledWorkerPoolOptions carbonWorkerPoolSize int ) - if carbonIngestConfig.MaxConcurrency > 0 { + if ingesterCfg.MaxConcurrency > 0 { // Use a bounded worker pool if they requested a specific maximum concurrency. carbonWorkerPoolOpts = xsync.NewPooledWorkerPoolOptions(). SetGrowOnDemand(false). SetInstrumentOptions(carbonIOpts) - carbonWorkerPoolSize = carbonIngestConfig.MaxConcurrency + carbonWorkerPoolSize = ingesterCfg.MaxConcurrency } else { carbonWorkerPoolOpts = xsync.NewPooledWorkerPoolOptions(). SetGrowOnDemand(true). @@ -301,7 +302,7 @@ func Run(runOpts RunOptions) { ingester, err := ingestcarbon.NewIngester(carbonIngestDownsamplerAndWriter, ingestcarbon.Options{ InstrumentOptions: carbonIOpts, WorkerPool: workerPool, - Timeout: carbonIngestConfig.TimeoutOrDefault(), + Timeout: ingesterCfg.WriteTimeoutOrDefault(), }) if err != nil { logger.Fatal("unable to create carbon ingester", zap.Error(err)) @@ -309,9 +310,12 @@ func Run(runOpts RunOptions) { var ( serverOpts = xserver.NewOptions().SetInstrumentOptions(carbonIOpts) - carbonListenAddress = carbonIngestConfig.ListenAddressOrDefault() + carbonListenAddress = ingesterCfg.ListenAddress carbonServer = xserver.NewServer(carbonListenAddress, ingester, serverOpts) ) + if strings.TrimSpace(carbonListenAddress) == "" { + logger.Fatal("no listen address specified for carbon ingester") + } logger.Info("starting carbon ingestion server", zap.String("listenAddress", carbonListenAddress)) err = carbonServer.ListenAndServe() diff --git a/src/query/server/server_test.go b/src/query/server/server_test.go index 02af237980..b0c1bb90bd 100644 --- a/src/query/server/server_test.go +++ b/src/query/server/server_test.go @@ -235,7 +235,7 @@ listenAddress: value: "127.0.0.1:17201" carbon: - ingestion: + ingester: listenAddress: "127.0.0.1:17204" metrics: