diff --git a/src/cmd/services/m3comparator/main/querier.go b/src/cmd/services/m3comparator/main/querier.go index e18e7b546d..05bb7de229 100644 --- a/src/cmd/services/m3comparator/main/querier.go +++ b/src/cmd/services/m3comparator/main/querier.go @@ -135,7 +135,7 @@ type seriesGen struct { res time.Duration } -// FetchCompressed fetches timeseries data based on a query. +// FetchCompressedResult fetches timeseries data based on a query. func (q *querier) FetchCompressedResult( ctx context.Context, query *storage.FetchQuery, diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index e79460a98b..2a66488437 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -22,6 +22,7 @@ package config import ( "errors" + "math" "time" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" @@ -43,6 +44,7 @@ import ( "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" "github.com/m3db/m3/src/x/opentracing" + xtime "github.com/m3db/m3/src/x/time" ) // BackendStorageType is an enum for different backends. @@ -355,6 +357,41 @@ type ConsolidationConfiguration struct { type PrometheusQueryConfiguration struct { // MaxSamplesPerQuery is the limit on fetched samples per query. MaxSamplesPerQuery *int `yaml:"maxSamplesPerQuery"` + + // Convert configures Prometheus time series conversions. + Convert *PrometheusConvertConfiguration `yaml:"convert"` +} + +// ConvertOptionsOrDefault creates storage.PromConvertOptions based on the given configuration. +func (c PrometheusQueryConfiguration) ConvertOptionsOrDefault() storage.PromConvertOptions { + opts := storage.NewPromConvertOptions() + if v := c.Convert; v != nil { + opts = opts.SetValueDecreaseTolerance(v.ValueDecreaseTolerance) + + // Default to max time so that it's always applicable if value + // decrease tolerance is non-zero. + toleranceUntil := xtime.UnixNano(math.MaxInt64) + if value := v.ValueDecreaseToleranceUntil; value != nil { + toleranceUntil = xtime.ToUnixNano(*value) + } + opts = opts.SetValueDecreaseToleranceUntil(toleranceUntil) + } + + return opts +} + +// PrometheusConvertConfiguration configures Prometheus time series conversions. +type PrometheusConvertConfiguration struct { + // ValueDecreaseTolerance allows for setting a specific amount of tolerance + // to avoid returning a decrease if it's below a certain tolerance. + // This is useful for applications that have precision issues emitting + // monotonic increasing data and will accidentally make it seem like the + // counter value decreases when it hasn't changed. + ValueDecreaseTolerance float64 `yaml:"valueDecreaseTolerance"` + + // ValueDecreaseToleranceUntil allows for setting a time threshold on + // which to apply the conditional value decrease threshold. + ValueDecreaseToleranceUntil *time.Time `yaml:"valueDecreaseToleranceUntil"` } // MaxSamplesPerQueryOrDefault returns the max samples per query or default. diff --git a/src/query/server/query.go b/src/query/server/query.go index 523a6e30f1..ea8e43b021 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -380,6 +380,8 @@ func Run(runOpts RunOptions) RunResult { } cfg.LookbackDuration = &lookbackDuration + promConvertOptions := cfg.Query.Prometheus.ConvertOptionsOrDefault() + readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools( instrumentOptions, cfg.ReadWorkerPool, @@ -401,7 +403,8 @@ func Run(runOpts RunOptions) RunResult { SetConsolidationFunc(consolidators.TakeLast). SetReadWorkerPool(readWorkerPool). SetWriteWorkerPool(writeWorkerPool). - SetSeriesConsolidationMatchOptions(matchOptions) + SetSeriesConsolidationMatchOptions(matchOptions). + SetPromConvertOptions(promConvertOptions) if runOpts.ApplyCustomTSDBOptions != nil { tsdbOpts, err = runOpts.ApplyCustomTSDBOptions(tsdbOpts, instrumentOptions) diff --git a/src/query/storage/options.go b/src/query/storage/options.go index 6b360b5f24..ea0237e9cf 100644 --- a/src/query/storage/options.go +++ b/src/query/storage/options.go @@ -20,7 +20,11 @@ package storage -import "time" +import ( + "time" + + xtime "github.com/m3db/m3/src/x/time" +) const ( defaultResolutionThresholdForCounterNormalization = time.Hour @@ -28,6 +32,9 @@ const ( type promConvertOptions struct { resolutionThresholdForCounterNormalization time.Duration + + valueDecreaseTolerance float64 + valueDecreaseToleranceUntil xtime.UnixNano } // NewPromConvertOptions builds a new PromConvertOptions with default values. @@ -46,3 +53,23 @@ func (o *promConvertOptions) SetResolutionThresholdForCounterNormalization(value func (o *promConvertOptions) ResolutionThresholdForCounterNormalization() time.Duration { return o.resolutionThresholdForCounterNormalization } + +func (o *promConvertOptions) SetValueDecreaseTolerance(value float64) PromConvertOptions { + opts := *o + opts.valueDecreaseTolerance = value + return &opts +} + +func (o *promConvertOptions) ValueDecreaseTolerance() float64 { + return o.valueDecreaseTolerance +} + +func (o *promConvertOptions) SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions { + opts := *o + opts.valueDecreaseToleranceUntil = value + return &opts +} + +func (o *promConvertOptions) ValueDecreaseToleranceUntil() xtime.UnixNano { + return o.valueDecreaseToleranceUntil +} diff --git a/src/query/storage/prom_converter.go b/src/query/storage/prom_converter.go index 115929a3d0..132c1970bc 100644 --- a/src/query/storage/prom_converter.go +++ b/src/query/storage/prom_converter.go @@ -48,6 +48,9 @@ func iteratorToPromResult( resolution = xtime.UnixNano(maxResolution) resolutionThreshold = promConvertOptions.ResolutionThresholdForCounterNormalization() + valueDecreaseTolerance = promConvertOptions.ValueDecreaseTolerance() + valueDecreaseToleranceUntil = promConvertOptions.ValueDecreaseToleranceUntil() + firstDP = true handleResets = false annotationPayload annotation.Payload @@ -61,6 +64,12 @@ func iteratorToPromResult( for iter.Next() { dp, _, _ := iter.Current() + if valueDecreaseTolerance > 0 && dp.TimestampNanos.Before(valueDecreaseToleranceUntil) { + if !firstDP && dp.Value < prevDP.Value && dp.Value > prevDP.Value*(1-valueDecreaseTolerance) { + dp.Value = prevDP.Value + } + } + if firstDP && maxResolution >= resolutionThreshold { firstAnnotation := iter.FirstAnnotation() if len(firstAnnotation) > 0 { @@ -85,8 +94,6 @@ func iteratorToPromResult( } else { cumulativeSum += dp.Value - prevDP.Value } - - prevDP = dp } else { samples = append(samples, prompb.Sample{ Timestamp: TimeToPromTimestamp(dp.TimestampNanos), @@ -94,6 +101,7 @@ func iteratorToPromResult( }) } + prevDP = dp firstDP = false } diff --git a/src/query/storage/prom_converter_test.go b/src/query/storage/prom_converter_test.go index a5a166fba3..4bd639ba99 100644 --- a/src/query/storage/prom_converter_test.go +++ b/src/query/storage/prom_converter_test.go @@ -304,7 +304,10 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) { } func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) { - t0 := xtime.Now().Truncate(time.Hour) + var ( + t0 = xtime.Now().Truncate(time.Hour) + opts = NewPromConvertOptions() + ) tests := []struct { name string @@ -419,18 +422,106 @@ func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testSeriesIteratorsToPromResultNormalize( - t, tt.isCounter, tt.maxResolution, tt.given, tt.want) + testSeriesIteratorsToPromResult( + t, tt.isCounter, tt.maxResolution, tt.given, tt.want, opts) + }) + } +} + +func TestSeriesIteratorsToPromResultValueDecreaseTolerance(t *testing.T) { + now := xtime.Now().Truncate(time.Hour) + + tests := []struct { + name string + given []float64 + tolerance float64 + until xtime.UnixNano + want []float64 + }{ + { + name: "no tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0, + until: 0, + want: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + }, + { + name: "low tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0.00000001, + until: now.Add(time.Hour), + want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 199.99}, + }, + { + name: "high tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0.0001, + until: now.Add(time.Hour), + want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 200}, + }, + { + name: "tolerance expired", + given: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + tolerance: 0.0001, + until: now, + want: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + }, + { + name: "tolerance expires in the middle", + given: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + tolerance: 0.0001, + until: now.Add(3 * time.Minute), + want: []float64{200, 200, 200, 199.99, 200, 199.99}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testSeriesIteratorsToPromResultValueDecreaseTolerance( + t, now, tt.given, tt.want, tt.tolerance, tt.until) + }) + } +} + +func testSeriesIteratorsToPromResultValueDecreaseTolerance( + t *testing.T, + now xtime.UnixNano, + input []float64, + expectedOutput []float64, + decreaseTolerance float64, + toleranceUntil xtime.UnixNano, +) { + var ( + given = make([]dts.Datapoint, 0, len(input)) + want = make([]prompb.Sample, 0, len(expectedOutput)) + ) + for i, v := range input { + given = append(given, dts.Datapoint{ + TimestampNanos: now.Add(time.Duration(i) * time.Minute), + Value: v, }) } + for i, v := range expectedOutput { + want = append(want, prompb.Sample{ + Timestamp: ms(now.Add(time.Duration(i) * time.Minute)), + Value: v, + }) + } + + opts := NewPromConvertOptions(). + SetValueDecreaseTolerance(decreaseTolerance). + SetValueDecreaseToleranceUntil(toleranceUntil) + + testSeriesIteratorsToPromResult(t, false, 0, given, want, opts) } -func testSeriesIteratorsToPromResultNormalize( +func testSeriesIteratorsToPromResult( t *testing.T, isCounter bool, maxResolution time.Duration, given []dts.Datapoint, want []prompb.Sample, + opts PromConvertOptions, ) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -480,9 +571,8 @@ func testSeriesIteratorsToPromResultNormalize( fetchResult, err := consolidators.NewSeriesFetchResult(it, nil, fetchResultMetadata) assert.NoError(t, err) - opts := models.NewTagOptions() res, err := SeriesIteratorsToPromResult( - context.Background(), fetchResult, nil, opts, NewPromConvertOptions()) + context.Background(), fetchResult, nil, models.NewTagOptions(), opts) require.NoError(t, err) verifyResult(t, want, res) } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index 3d0a3f6733..27f8898a93 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -50,8 +50,6 @@ const ( TypeRemoteDC // TypeMultiDC is for storages that will aggregate multiple datacenters. TypeMultiDC - // TypeDebug is for storages that are used for debugging purposes. - TypeDebug ) // ErrorBehavior describes what this storage type should do on error. This is @@ -367,4 +365,16 @@ type PromConvertOptions interface { // ResolutionThresholdForCounterNormalization returns resolution // starting from which (inclusive) a normalization of counter values is performed. ResolutionThresholdForCounterNormalization() time.Duration + + // SetValueDecreaseTolerance sets relative tolerance against decoded time series value decrease. + SetValueDecreaseTolerance(value float64) PromConvertOptions + + // ValueDecreaseTolerance returns relative tolerance against decoded time series value decrease. + ValueDecreaseTolerance() float64 + + // SetValueDecreaseToleranceUntil sets the timestamp (exclusive) until which the tolerance applies. + SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions + + // ValueDecreaseToleranceUntil the timestamp (exclusive) until which the tolerance applies. + ValueDecreaseToleranceUntil() xtime.UnixNano }