diff --git a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go index 26261b3189..bee0f7dc5f 100644 --- a/src/cmd/services/m3coordinator/ingest/carbon/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/carbon/ingest.go @@ -127,17 +127,20 @@ func NewIngester( } }) + scope := opts.InstrumentOptions.MetricsScope() + metrics, err := newCarbonIngesterMetrics(scope) + if err != nil { + return nil, err + } + return &ingester{ downsamplerAndWriter: downsamplerAndWriter, opts: opts, logger: opts.InstrumentOptions.Logger(), tagOpts: tagOpts, - metrics: newCarbonIngesterMetrics( - opts.InstrumentOptions.MetricsScope()), - - rules: compiledRules, - - lineResourcesPool: resourcePool, + metrics: metrics, + rules: compiledRules, + lineResourcesPool: resourcePool, }, nil } @@ -166,6 +169,7 @@ func (i *ingester) Handle(conn net.Conn) { logger.Debug("handling new carbon ingestion connection") for s.Scan() { + received := time.Now() name, timestamp, value := s.Metric() resources := i.getLineResources() @@ -178,6 +182,19 @@ func (i *ingester) Handle(conn net.Conn) { if ok { i.metrics.success.Inc(1) } + + now := time.Now() + + // Always record age regardless of success/failure since + // sometimes errors can be due to how old the metrics are + // and not recording age would obscure this visibility from + // the metrics of how fresh/old the incoming metrics are. + age := now.Sub(timestamp) + i.metrics.ingestLatency.RecordDuration(age) + + // Also record write latency (not relative to metric timestamp). + i.metrics.writeLatency.RecordDuration(now.Sub(received)) + // The contract is that after the DownsamplerAndWriter returns, any resources // that it needed to hold onto have already been copied. i.putLineResources(resources) @@ -283,10 +300,8 @@ func (i *ingester) writeWithOptions( return err } - err = i.downsamplerAndWriter.Write( - ctx, tags, resources.datapoints, xtime.Second, nil, opts, - ) - + err = i.downsamplerAndWriter.Write(ctx, tags, resources.datapoints, + xtime.Second, nil, opts) if err != nil { i.logger.Error("err writing carbon metric", zap.String("name", string(resources.name)), zap.Error(err)) @@ -301,18 +316,26 @@ func (i *ingester) Close() { // We don't maintain any state in-between connections so there is nothing to do here. } -func newCarbonIngesterMetrics(m tally.Scope) carbonIngesterMetrics { - return carbonIngesterMetrics{ - success: m.Counter("success"), - err: m.Counter("error"), - malformed: m.Counter("malformed"), - } +type carbonIngesterMetrics struct { + success tally.Counter + err tally.Counter + malformed tally.Counter + ingestLatency tally.Histogram + writeLatency tally.Histogram } -type carbonIngesterMetrics struct { - success tally.Counter - err tally.Counter - malformed tally.Counter +func newCarbonIngesterMetrics(scope tally.Scope) (carbonIngesterMetrics, error) { + buckets, err := ingest.NewLatencyBuckets() + if err != nil { + return carbonIngesterMetrics{}, err + } + return carbonIngesterMetrics{ + success: scope.Counter("success"), + err: scope.Counter("error"), + malformed: scope.Counter("malformed"), + writeLatency: scope.SubScope("write").Histogram("latency", buckets.WriteLatencyBuckets), + ingestLatency: scope.SubScope("ingest").Histogram("latency", buckets.IngestLatencyBuckets), + }, nil } // GenerateTagsFromName accepts a carbon metric name and blows it up into a list of diff --git a/src/cmd/services/m3coordinator/ingest/metrics.go b/src/cmd/services/m3coordinator/ingest/metrics.go new file mode 100644 index 0000000000..ad514fc117 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/metrics.go @@ -0,0 +1,90 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingest + +import ( + "time" + + "github.com/uber-go/tally" +) + +// LatencyBuckets are a set of latency buckets useful for measuring things. +type LatencyBuckets struct { + WriteLatencyBuckets tally.DurationBuckets + IngestLatencyBuckets tally.DurationBuckets +} + +// NewLatencyBuckets returns write and ingest latency buckets useful for +// measuring ingest latency (i.e. time from datapoint/sample created to time +// ingested) and write latency (i.e. time from received a sample from remote +// source to completion of that write locally). +func NewLatencyBuckets() (LatencyBuckets, error) { + upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10) + if err != nil { + return LatencyBuckets{}, err + } + + upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18) + if err != nil { + return LatencyBuckets{}, err + } + + upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11) + if err != nil { + return LatencyBuckets{}, err + } + + upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12) + if err != nil { + return LatencyBuckets{}, err + } + upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets + + upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12) + if err != nil { + return LatencyBuckets{}, err + } + + upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19) + if err != nil { + return LatencyBuckets{}, err + } + upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets + + var writeLatencyBuckets tally.DurationBuckets + writeLatencyBuckets = append(writeLatencyBuckets, upTo1sBuckets...) + writeLatencyBuckets = append(writeLatencyBuckets, upTo10sBuckets...) + writeLatencyBuckets = append(writeLatencyBuckets, upTo60sBuckets...) + writeLatencyBuckets = append(writeLatencyBuckets, upTo60mBuckets...) + + var ingestLatencyBuckets tally.DurationBuckets + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...) + ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...) + + return LatencyBuckets{ + WriteLatencyBuckets: writeLatencyBuckets, + IngestLatencyBuckets: ingestLatencyBuckets, + }, nil +} diff --git a/src/cmd/services/m3coordinator/ingest/metrics_test.go b/src/cmd/services/m3coordinator/ingest/metrics_test.go new file mode 100644 index 0000000000..c2f5543e42 --- /dev/null +++ b/src/cmd/services/m3coordinator/ingest/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ingest + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLatencyBuckets(t *testing.T) { + buckets, err := NewLatencyBuckets() + require.NoError(t, err) + + // NB(r): Bucket length is tested just to sanity check how many buckets we are creating + require.Equal(t, 50, len(buckets.WriteLatencyBuckets.AsDurations())) + + // NB(r): Bucket values are tested to sanity check they look right + // nolint: lll + expected := "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s]" + actual := fmt.Sprintf("%v", buckets.WriteLatencyBuckets.AsDurations()) + require.Equal(t, expected, actual) + + // NB(r): Bucket length is tested just to sanity check how many buckets we are creating + require.Equal(t, 80, len(buckets.IngestLatencyBuckets.AsDurations())) + + // NB(r): Bucket values are tested to sanity check they look right + // nolint: lll + expected = "[0s 100ms 200ms 300ms 400ms 500ms 600ms 700ms 800ms 900ms 1s 1.5s 2s 2.5s 3s 3.5s 4s 4.5s 5s 5.5s 6s 6.5s 7s 7.5s 8s 8.5s 9s 9.5s 10s 15s 20s 25s 30s 35s 40s 45s 50s 55s 1m0s 5m0s 10m0s 15m0s 20m0s 25m0s 30m0s 35m0s 40m0s 45m0s 50m0s 55m0s 1h0m0s 1h30m0s 2h0m0s 2h30m0s 3h0m0s 3h30m0s 4h0m0s 4h30m0s 5h0m0s 5h30m0s 6h0m0s 6h30m0s 7h0m0s 8h0m0s 9h0m0s 10h0m0s 11h0m0s 12h0m0s 13h0m0s 14h0m0s 15h0m0s 16h0m0s 17h0m0s 18h0m0s 19h0m0s 20h0m0s 21h0m0s 22h0m0s 23h0m0s 24h0m0s]" + actual = fmt.Sprintf("%v", buckets.IngestLatencyBuckets.AsDurations()) + require.Equal(t, expected, actual) +} diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index d1ecebc7d4..3e4fc0c549 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -195,63 +195,22 @@ type promWriteMetrics struct { } func newPromWriteMetrics(scope tally.Scope) (promWriteMetrics, error) { - upTo1sBuckets, err := tally.LinearDurationBuckets(0, 100*time.Millisecond, 10) + buckets, err := ingest.NewLatencyBuckets() if err != nil { return promWriteMetrics{}, err } - - upTo10sBuckets, err := tally.LinearDurationBuckets(time.Second, 500*time.Millisecond, 18) - if err != nil { - return promWriteMetrics{}, err - } - - upTo60sBuckets, err := tally.LinearDurationBuckets(10*time.Second, 5*time.Second, 11) - if err != nil { - return promWriteMetrics{}, err - } - - upTo60mBuckets, err := tally.LinearDurationBuckets(0, 5*time.Minute, 12) - if err != nil { - return promWriteMetrics{}, err - } - upTo60mBuckets = upTo60mBuckets[1:] // Remove the first 0s to get 5 min aligned buckets - - upTo6hBuckets, err := tally.LinearDurationBuckets(time.Hour, 30*time.Minute, 12) - if err != nil { - return promWriteMetrics{}, err - } - - upTo24hBuckets, err := tally.LinearDurationBuckets(6*time.Hour, time.Hour, 19) - if err != nil { - return promWriteMetrics{}, err - } - upTo24hBuckets = upTo24hBuckets[1:] // Remove the first 6h to get 1 hour aligned buckets - - var writeLatencyBuckets tally.DurationBuckets - writeLatencyBuckets = append(writeLatencyBuckets, upTo1sBuckets...) - writeLatencyBuckets = append(writeLatencyBuckets, upTo10sBuckets...) - writeLatencyBuckets = append(writeLatencyBuckets, upTo60sBuckets...) - writeLatencyBuckets = append(writeLatencyBuckets, upTo60mBuckets...) - - var ingestLatencyBuckets tally.DurationBuckets - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo1sBuckets...) - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo10sBuckets...) - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60sBuckets...) - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo60mBuckets...) - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo6hBuckets...) - ingestLatencyBuckets = append(ingestLatencyBuckets, upTo24hBuckets...) return promWriteMetrics{ writeSuccess: scope.SubScope("write").Counter("success"), writeErrorsServer: scope.SubScope("write").Tagged(map[string]string{"code": "5XX"}).Counter("errors"), writeErrorsClient: scope.SubScope("write").Tagged(map[string]string{"code": "4XX"}).Counter("errors"), - writeBatchLatency: scope.SubScope("write").Histogram("batch-latency", writeLatencyBuckets), - writeBatchLatencyBuckets: writeLatencyBuckets, - ingestLatency: scope.SubScope("ingest").Histogram("latency", ingestLatencyBuckets), - ingestLatencyBuckets: ingestLatencyBuckets, + writeBatchLatency: scope.SubScope("write").Histogram("batch-latency", buckets.WriteLatencyBuckets), + writeBatchLatencyBuckets: buckets.WriteLatencyBuckets, + ingestLatency: scope.SubScope("ingest").Histogram("latency", buckets.IngestLatencyBuckets), + ingestLatencyBuckets: buckets.IngestLatencyBuckets, forwardSuccess: scope.SubScope("forward").Counter("success"), forwardErrors: scope.SubScope("forward").Counter("errors"), forwardDropped: scope.SubScope("forward").Counter("dropped"), - forwardLatency: scope.SubScope("forward").Histogram("latency", writeLatencyBuckets), + forwardLatency: scope.SubScope("forward").Histogram("latency", buckets.WriteLatencyBuckets), }, nil }