Skip to content

Commit

Permalink
[query] Prom converter supporting value decrease tolerance (#3914)
Browse files Browse the repository at this point in the history
* [query] Prom converter supporting value decrease tolerance

* [query] Move prometheus convert options to query prometheus config (#3915)

Co-authored-by: Rob Skillington <[email protected]>
  • Loading branch information
linasm and robskillington authored Nov 10, 2021
1 parent 1b94908 commit e37f346
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/cmd/services/m3comparator/main/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type seriesGen struct {
res time.Duration
}

// FetchCompressed fetches timeseries data based on a query.
// FetchCompressedResult fetches timeseries data based on a query.
func (q *querier) FetchCompressedResult(
ctx context.Context,
query *storage.FetchQuery,
Expand Down
37 changes: 37 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package config

import (
"errors"
"math"
"time"

etcdclient "github.com/m3db/m3/src/cluster/client/etcd"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
"github.com/m3db/m3/src/x/opentracing"
xtime "github.com/m3db/m3/src/x/time"
)

// BackendStorageType is an enum for different backends.
Expand Down Expand Up @@ -355,6 +357,41 @@ type ConsolidationConfiguration struct {
type PrometheusQueryConfiguration struct {
// MaxSamplesPerQuery is the limit on fetched samples per query.
MaxSamplesPerQuery *int `yaml:"maxSamplesPerQuery"`

// Convert configures Prometheus time series conversions.
Convert *PrometheusConvertConfiguration `yaml:"convert"`
}

// ConvertOptionsOrDefault creates storage.PromConvertOptions based on the given configuration.
func (c PrometheusQueryConfiguration) ConvertOptionsOrDefault() storage.PromConvertOptions {
opts := storage.NewPromConvertOptions()
if v := c.Convert; v != nil {
opts = opts.SetValueDecreaseTolerance(v.ValueDecreaseTolerance)

// Default to max time so that it's always applicable if value
// decrease tolerance is non-zero.
toleranceUntil := xtime.UnixNano(math.MaxInt64)
if value := v.ValueDecreaseToleranceUntil; value != nil {
toleranceUntil = xtime.ToUnixNano(*value)
}
opts = opts.SetValueDecreaseToleranceUntil(toleranceUntil)
}

return opts
}

// PrometheusConvertConfiguration configures Prometheus time series conversions.
type PrometheusConvertConfiguration struct {
// ValueDecreaseTolerance allows for setting a specific amount of tolerance
// to avoid returning a decrease if it's below a certain tolerance.
// This is useful for applications that have precision issues emitting
// monotonic increasing data and will accidentally make it seem like the
// counter value decreases when it hasn't changed.
ValueDecreaseTolerance float64 `yaml:"valueDecreaseTolerance"`

// ValueDecreaseToleranceUntil allows for setting a time threshold on
// which to apply the conditional value decrease threshold.
ValueDecreaseToleranceUntil *time.Time `yaml:"valueDecreaseToleranceUntil"`
}

// MaxSamplesPerQueryOrDefault returns the max samples per query or default.
Expand Down
5 changes: 4 additions & 1 deletion src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ func Run(runOpts RunOptions) RunResult {
}
cfg.LookbackDuration = &lookbackDuration

promConvertOptions := cfg.Query.Prometheus.ConvertOptionsOrDefault()

readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools(
instrumentOptions,
cfg.ReadWorkerPool,
Expand All @@ -401,7 +403,8 @@ func Run(runOpts RunOptions) RunResult {
SetConsolidationFunc(consolidators.TakeLast).
SetReadWorkerPool(readWorkerPool).
SetWriteWorkerPool(writeWorkerPool).
SetSeriesConsolidationMatchOptions(matchOptions)
SetSeriesConsolidationMatchOptions(matchOptions).
SetPromConvertOptions(promConvertOptions)

if runOpts.ApplyCustomTSDBOptions != nil {
tsdbOpts, err = runOpts.ApplyCustomTSDBOptions(tsdbOpts, instrumentOptions)
Expand Down
29 changes: 28 additions & 1 deletion src/query/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@

package storage

import "time"
import (
"time"

xtime "github.com/m3db/m3/src/x/time"
)

const (
defaultResolutionThresholdForCounterNormalization = time.Hour
)

type promConvertOptions struct {
resolutionThresholdForCounterNormalization time.Duration

valueDecreaseTolerance float64
valueDecreaseToleranceUntil xtime.UnixNano
}

// NewPromConvertOptions builds a new PromConvertOptions with default values.
Expand All @@ -46,3 +53,23 @@ func (o *promConvertOptions) SetResolutionThresholdForCounterNormalization(value
func (o *promConvertOptions) ResolutionThresholdForCounterNormalization() time.Duration {
return o.resolutionThresholdForCounterNormalization
}

func (o *promConvertOptions) SetValueDecreaseTolerance(value float64) PromConvertOptions {
opts := *o
opts.valueDecreaseTolerance = value
return &opts
}

func (o *promConvertOptions) ValueDecreaseTolerance() float64 {
return o.valueDecreaseTolerance
}

func (o *promConvertOptions) SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions {
opts := *o
opts.valueDecreaseToleranceUntil = value
return &opts
}

func (o *promConvertOptions) ValueDecreaseToleranceUntil() xtime.UnixNano {
return o.valueDecreaseToleranceUntil
}
12 changes: 10 additions & 2 deletions src/query/storage/prom_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func iteratorToPromResult(
resolution = xtime.UnixNano(maxResolution)
resolutionThreshold = promConvertOptions.ResolutionThresholdForCounterNormalization()

valueDecreaseTolerance = promConvertOptions.ValueDecreaseTolerance()
valueDecreaseToleranceUntil = promConvertOptions.ValueDecreaseToleranceUntil()

firstDP = true
handleResets = false
annotationPayload annotation.Payload
Expand All @@ -61,6 +64,12 @@ func iteratorToPromResult(
for iter.Next() {
dp, _, _ := iter.Current()

if valueDecreaseTolerance > 0 && dp.TimestampNanos.Before(valueDecreaseToleranceUntil) {
if !firstDP && dp.Value < prevDP.Value && dp.Value > prevDP.Value*(1-valueDecreaseTolerance) {
dp.Value = prevDP.Value
}
}

if firstDP && maxResolution >= resolutionThreshold {
firstAnnotation := iter.FirstAnnotation()
if len(firstAnnotation) > 0 {
Expand All @@ -85,15 +94,14 @@ func iteratorToPromResult(
} else {
cumulativeSum += dp.Value - prevDP.Value
}

prevDP = dp
} else {
samples = append(samples, prompb.Sample{
Timestamp: TimeToPromTimestamp(dp.TimestampNanos),
Value: dp.Value,
})
}

prevDP = dp
firstDP = false
}

Expand Down
102 changes: 96 additions & 6 deletions src/query/storage/prom_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) {
}

func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) {
t0 := xtime.Now().Truncate(time.Hour)
var (
t0 = xtime.Now().Truncate(time.Hour)
opts = NewPromConvertOptions()
)

tests := []struct {
name string
Expand Down Expand Up @@ -419,18 +422,106 @@ func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testSeriesIteratorsToPromResultNormalize(
t, tt.isCounter, tt.maxResolution, tt.given, tt.want)
testSeriesIteratorsToPromResult(
t, tt.isCounter, tt.maxResolution, tt.given, tt.want, opts)
})
}
}

func TestSeriesIteratorsToPromResultValueDecreaseTolerance(t *testing.T) {
now := xtime.Now().Truncate(time.Hour)

tests := []struct {
name string
given []float64
tolerance float64
until xtime.UnixNano
want []float64
}{
{
name: "no tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0,
until: 0,
want: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
},
{
name: "low tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0.00000001,
until: now.Add(time.Hour),
want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 199.99},
},
{
name: "high tolerance",
given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99},
tolerance: 0.0001,
until: now.Add(time.Hour),
want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 200},
},
{
name: "tolerance expired",
given: []float64{200, 199.99, 200, 199.99, 200, 199.99},
tolerance: 0.0001,
until: now,
want: []float64{200, 199.99, 200, 199.99, 200, 199.99},
},
{
name: "tolerance expires in the middle",
given: []float64{200, 199.99, 200, 199.99, 200, 199.99},
tolerance: 0.0001,
until: now.Add(3 * time.Minute),
want: []float64{200, 200, 200, 199.99, 200, 199.99},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testSeriesIteratorsToPromResultValueDecreaseTolerance(
t, now, tt.given, tt.want, tt.tolerance, tt.until)
})
}
}

func testSeriesIteratorsToPromResultValueDecreaseTolerance(
t *testing.T,
now xtime.UnixNano,
input []float64,
expectedOutput []float64,
decreaseTolerance float64,
toleranceUntil xtime.UnixNano,
) {
var (
given = make([]dts.Datapoint, 0, len(input))
want = make([]prompb.Sample, 0, len(expectedOutput))
)
for i, v := range input {
given = append(given, dts.Datapoint{
TimestampNanos: now.Add(time.Duration(i) * time.Minute),
Value: v,
})
}
for i, v := range expectedOutput {
want = append(want, prompb.Sample{
Timestamp: ms(now.Add(time.Duration(i) * time.Minute)),
Value: v,
})
}

opts := NewPromConvertOptions().
SetValueDecreaseTolerance(decreaseTolerance).
SetValueDecreaseToleranceUntil(toleranceUntil)

testSeriesIteratorsToPromResult(t, false, 0, given, want, opts)
}

func testSeriesIteratorsToPromResultNormalize(
func testSeriesIteratorsToPromResult(
t *testing.T,
isCounter bool,
maxResolution time.Duration,
given []dts.Datapoint,
want []prompb.Sample,
opts PromConvertOptions,
) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -480,9 +571,8 @@ func testSeriesIteratorsToPromResultNormalize(
fetchResult, err := consolidators.NewSeriesFetchResult(it, nil, fetchResultMetadata)
assert.NoError(t, err)

opts := models.NewTagOptions()
res, err := SeriesIteratorsToPromResult(
context.Background(), fetchResult, nil, opts, NewPromConvertOptions())
context.Background(), fetchResult, nil, models.NewTagOptions(), opts)
require.NoError(t, err)
verifyResult(t, want, res)
}
Expand Down
14 changes: 12 additions & 2 deletions src/query/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
TypeRemoteDC
// TypeMultiDC is for storages that will aggregate multiple datacenters.
TypeMultiDC
// TypeDebug is for storages that are used for debugging purposes.
TypeDebug
)

// ErrorBehavior describes what this storage type should do on error. This is
Expand Down Expand Up @@ -367,4 +365,16 @@ type PromConvertOptions interface {
// ResolutionThresholdForCounterNormalization returns resolution
// starting from which (inclusive) a normalization of counter values is performed.
ResolutionThresholdForCounterNormalization() time.Duration

// SetValueDecreaseTolerance sets relative tolerance against decoded time series value decrease.
SetValueDecreaseTolerance(value float64) PromConvertOptions

// ValueDecreaseTolerance returns relative tolerance against decoded time series value decrease.
ValueDecreaseTolerance() float64

// SetValueDecreaseToleranceUntil sets the timestamp (exclusive) until which the tolerance applies.
SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions

// ValueDecreaseToleranceUntil the timestamp (exclusive) until which the tolerance applies.
ValueDecreaseToleranceUntil() xtime.UnixNano
}

0 comments on commit e37f346

Please sign in to comment.