diff --git a/src/aggregator/server/m3msg/server.go b/src/aggregator/server/m3msg/server.go index 924254d313..bf98e4850e 100644 --- a/src/aggregator/server/m3msg/server.go +++ b/src/aggregator/server/m3msg/server.go @@ -21,9 +21,7 @@ package m3msg import ( - "errors" "fmt" - "io" "github.com/m3db/m3/src/aggregator/aggregator" "github.com/m3db/m3/src/metrics/encoding" @@ -35,11 +33,6 @@ import ( "go.uber.org/zap" ) -type server struct { - aggregator aggregator.Aggregator - logger *zap.Logger -} - // NewServer creates a new M3Msg server. func NewServer( address string, @@ -49,49 +42,43 @@ func NewServer( if err := opts.Validate(); err != nil { return nil, err } - - s := &server{ - aggregator: aggregator, - logger: opts.InstrumentOptions().Logger(), + newMessageProcessor := func() consumer.MessageProcessor { + // construct a new messageProcessor per consumer so the internal protos can be reused across messages on the + // same connection. + return &messageProcessor{ + aggregator: aggregator, + logger: opts.InstrumentOptions().Logger(), + } } - - handler := consumer.NewConsumerHandler(s.Consume, opts.ConsumerOptions()) + handler := consumer.NewMessageHandler(newMessageProcessor, opts.ConsumerOptions()) return xserver.NewServer(address, handler, opts.ServerOptions()), nil } -func (s *server) Consume(c consumer.Consumer) { - var ( - pb = &metricpb.MetricWithMetadatas{} - union = &encoding.UnaggregatedMessageUnion{} - ) - for { - msg, err := c.Message() - if err != nil { - if !errors.Is(err, io.EOF) { - s.logger.Error("could not read message", zap.Error(err)) - } - break - } +type messageProcessor struct { + pb metricpb.MetricWithMetadatas + union encoding.UnaggregatedMessageUnion + aggregator aggregator.Aggregator + logger *zap.Logger +} - // Reset and reuse the protobuf message for unpacking. - protobuf.ReuseMetricWithMetadatasProto(pb) - if err = s.handleMessage(pb, union, msg); err != nil { - s.logger.Error("could not process message", - zap.Error(err), - zap.Uint64("shard", msg.ShardID()), - zap.String("proto", pb.String())) - } +func (m *messageProcessor) Process(msg consumer.Message) { + if err := m.handleMessage(&m.pb, &m.union, msg); err != nil { + m.logger.Error("could not process message", + zap.Error(err), + zap.Uint64("shard", msg.ShardID()), + zap.String("proto", m.pb.String())) } - c.Close() } -func (s *server) handleMessage( +func (m *messageProcessor) handleMessage( pb *metricpb.MetricWithMetadatas, union *encoding.UnaggregatedMessageUnion, msg consumer.Message, ) error { defer msg.Ack() + // Reset and reuse the protobuf message for unpacking. + protobuf.ReuseMetricWithMetadatasProto(&m.pb) // Unmarshal the message. if err := pb.Unmarshal(msg.Bytes()); err != nil { return err @@ -104,27 +91,27 @@ func (s *server) handleMessage( return err } u := union.CounterWithMetadatas.ToUnion() - return s.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas) + return m.aggregator.AddUntimed(u, union.CounterWithMetadatas.StagedMetadatas) case metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS: err := union.BatchTimerWithMetadatas.FromProto(pb.BatchTimerWithMetadatas) if err != nil { return err } u := union.BatchTimerWithMetadatas.ToUnion() - return s.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas) + return m.aggregator.AddUntimed(u, union.BatchTimerWithMetadatas.StagedMetadatas) case metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS: err := union.GaugeWithMetadatas.FromProto(pb.GaugeWithMetadatas) if err != nil { return err } u := union.GaugeWithMetadatas.ToUnion() - return s.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas) + return m.aggregator.AddUntimed(u, union.GaugeWithMetadatas.StagedMetadatas) case metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA: err := union.ForwardedMetricWithMetadata.FromProto(pb.ForwardedMetricWithMetadata) if err != nil { return err } - return s.aggregator.AddForwarded( + return m.aggregator.AddForwarded( union.ForwardedMetricWithMetadata.ForwardedMetric, union.ForwardedMetricWithMetadata.ForwardMetadata) case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA: @@ -132,7 +119,7 @@ func (s *server) handleMessage( if err != nil { return err } - return s.aggregator.AddTimed( + return m.aggregator.AddTimed( union.TimedMetricWithMetadata.Metric, union.TimedMetricWithMetadata.TimedMetadata) case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS: @@ -140,10 +127,12 @@ func (s *server) handleMessage( if err != nil { return err } - return s.aggregator.AddTimedWithStagedMetadatas( + return m.aggregator.AddTimedWithStagedMetadatas( union.TimedMetricWithMetadatas.Metric, union.TimedMetricWithMetadatas.StagedMetadatas) default: return fmt.Errorf("unrecognized message type: %v", pb.Type) } } + +func (m *messageProcessor) Close() {} diff --git a/src/cmd/services/m3comparator/main/querier.go b/src/cmd/services/m3comparator/main/querier.go index e18e7b546d..05bb7de229 100644 --- a/src/cmd/services/m3comparator/main/querier.go +++ b/src/cmd/services/m3comparator/main/querier.go @@ -135,7 +135,7 @@ type seriesGen struct { res time.Duration } -// FetchCompressed fetches timeseries data based on a query. +// FetchCompressedResult fetches timeseries data based on a query. func (q *querier) FetchCompressedResult( ctx context.Context, query *storage.FetchQuery, diff --git a/src/cmd/services/m3coordinator/server/m3msg/config.go b/src/cmd/services/m3coordinator/server/m3msg/config.go index 953f62a2d0..a37119cc97 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/config.go +++ b/src/cmd/services/m3coordinator/server/m3msg/config.go @@ -86,7 +86,7 @@ func (c handlerConfiguration) newHandler( ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts), BlockholePolicies: c.BlackholePolicies, }) - return consumer.NewMessageHandler(p, cOpts), nil + return consumer.NewMessageHandler(consumer.SingletonMessageProcessor(p), cOpts), nil } // NewOptions creates handler options. diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index f11de18ff5..b15cf8af9b 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -67,7 +67,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { s := server.NewServer( "a", - consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts), + consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts), server.NewOptions(), ) s.Serve(l) @@ -150,7 +150,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { s := server.NewServer( "a", - consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts), + consumer.NewMessageHandler(consumer.SingletonMessageProcessor(newProtobufProcessor(hOpts)), opts), server.NewOptions(), ) s.Serve(l) diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index e79460a98b..2a66488437 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -22,6 +22,7 @@ package config import ( "errors" + "math" "time" etcdclient "github.com/m3db/m3/src/cluster/client/etcd" @@ -43,6 +44,7 @@ import ( "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" "github.com/m3db/m3/src/x/opentracing" + xtime "github.com/m3db/m3/src/x/time" ) // BackendStorageType is an enum for different backends. @@ -355,6 +357,41 @@ type ConsolidationConfiguration struct { type PrometheusQueryConfiguration struct { // MaxSamplesPerQuery is the limit on fetched samples per query. MaxSamplesPerQuery *int `yaml:"maxSamplesPerQuery"` + + // Convert configures Prometheus time series conversions. + Convert *PrometheusConvertConfiguration `yaml:"convert"` +} + +// ConvertOptionsOrDefault creates storage.PromConvertOptions based on the given configuration. +func (c PrometheusQueryConfiguration) ConvertOptionsOrDefault() storage.PromConvertOptions { + opts := storage.NewPromConvertOptions() + if v := c.Convert; v != nil { + opts = opts.SetValueDecreaseTolerance(v.ValueDecreaseTolerance) + + // Default to max time so that it's always applicable if value + // decrease tolerance is non-zero. + toleranceUntil := xtime.UnixNano(math.MaxInt64) + if value := v.ValueDecreaseToleranceUntil; value != nil { + toleranceUntil = xtime.ToUnixNano(*value) + } + opts = opts.SetValueDecreaseToleranceUntil(toleranceUntil) + } + + return opts +} + +// PrometheusConvertConfiguration configures Prometheus time series conversions. +type PrometheusConvertConfiguration struct { + // ValueDecreaseTolerance allows for setting a specific amount of tolerance + // to avoid returning a decrease if it's below a certain tolerance. + // This is useful for applications that have precision issues emitting + // monotonic increasing data and will accidentally make it seem like the + // counter value decreases when it hasn't changed. + ValueDecreaseTolerance float64 `yaml:"valueDecreaseTolerance"` + + // ValueDecreaseToleranceUntil allows for setting a time threshold on + // which to apply the conditional value decrease threshold. + ValueDecreaseToleranceUntil *time.Time `yaml:"valueDecreaseToleranceUntil"` } // MaxSamplesPerQueryOrDefault returns the max samples per query or default. diff --git a/src/msg/consumer/consumer.go b/src/msg/consumer/consumer.go index 50a398488b..f880b67ec7 100644 --- a/src/msg/consumer/consumer.go +++ b/src/msg/consumer/consumer.go @@ -66,7 +66,7 @@ func (l *listener) Accept() (Consumer, error) { return nil, err } - return newConsumer(conn, l.msgPool, l.opts, l.m), nil + return newConsumer(conn, l.msgPool, l.opts, l.m, NewNoOpMessageProcessor), nil } type metrics struct { @@ -97,11 +97,12 @@ type consumer struct { w xio.ResettableWriter conn net.Conn - ackPb msgpb.Ack - closed bool - doneCh chan struct{} - wg sync.WaitGroup - m metrics + ackPb msgpb.Ack + closed bool + doneCh chan struct{} + wg sync.WaitGroup + m metrics + messageProcessor MessageProcessor } func newConsumer( @@ -109,6 +110,7 @@ func newConsumer( mPool *messagePool, opts Options, m metrics, + newMessageProcessorFn NewMessageProcessorFn, ) *consumer { var ( wOpts = xio.ResettableWriterOptions{ @@ -126,11 +128,12 @@ func newConsumer( decoder: proto.NewDecoder( conn, opts.DecoderOptions(), opts.ConnectionReadBufferSize(), ), - w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts), - conn: conn, - closed: false, - doneCh: make(chan struct{}), - m: m, + w: writerFn(newConnWithTimeout(conn, opts.ConnectionWriteTimeout(), time.Now), wOpts), + conn: conn, + closed: false, + doneCh: make(chan struct{}), + m: m, + messageProcessor: newMessageProcessorFn(), } } @@ -141,6 +144,9 @@ func (c *consumer) Init() { c.wg.Done() }() } +func (c *consumer) process(m Message) { + c.messageProcessor.Process(m) +} func (c *consumer) Message() (Message, error) { m := c.mPool.Get() @@ -230,6 +236,7 @@ func (c *consumer) Close() { close(c.doneCh) c.wg.Wait() c.conn.Close() + c.messageProcessor.Close() } type message struct { diff --git a/src/msg/consumer/handlers.go b/src/msg/consumer/handlers.go index 353799e259..572767a078 100644 --- a/src/msg/consumer/handlers.go +++ b/src/msg/consumer/handlers.go @@ -29,54 +29,27 @@ import ( "go.uber.org/zap" ) -type consumerHandler struct { - opts Options - mPool *messagePool - consumeFn ConsumeFn - m metrics -} - -// NewConsumerHandler creates a new server handler with consumerFn. -func NewConsumerHandler(consumeFn ConsumeFn, opts Options) server.Handler { - mPool := newMessagePool(opts.MessagePoolOptions()) - mPool.Init() - return &consumerHandler{ - consumeFn: consumeFn, - opts: opts, - mPool: mPool, - m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), - } -} - -func (h *consumerHandler) Handle(conn net.Conn) { - c := newConsumer(conn, h.mPool, h.opts, h.m) - c.Init() - h.consumeFn(c) -} - -func (h *consumerHandler) Close() {} - type messageHandler struct { - opts Options - mPool *messagePool - mp MessageProcessor - m metrics + opts Options + mPool *messagePool + newMessageProcessorFn NewMessageProcessorFn + m metrics } // NewMessageHandler creates a new server handler with messageFn. -func NewMessageHandler(mp MessageProcessor, opts Options) server.Handler { +func NewMessageHandler(newMessageProcessorFn NewMessageProcessorFn, opts Options) server.Handler { mPool := newMessagePool(opts.MessagePoolOptions()) mPool.Init() return &messageHandler{ - mp: mp, - opts: opts, - mPool: mPool, - m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), + newMessageProcessorFn: newMessageProcessorFn, + opts: opts, + mPool: mPool, + m: newConsumerMetrics(opts.InstrumentOptions().MetricsScope()), } } func (h *messageHandler) Handle(conn net.Conn) { - c := newConsumer(conn, h.mPool, h.opts, h.m) + c := newConsumer(conn, h.mPool, h.opts, h.m, h.newMessageProcessorFn) c.Init() var ( msgErr error @@ -87,7 +60,7 @@ func (h *messageHandler) Handle(conn net.Conn) { if msgErr != nil { break } - h.mp.Process(msg) + c.process(msg) } if msgErr != nil && msgErr != io.EOF { h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer") @@ -95,4 +68,4 @@ func (h *messageHandler) Handle(conn net.Conn) { c.Close() } -func (h *messageHandler) Close() { h.mp.Close() } +func (h *messageHandler) Close() {} diff --git a/src/msg/consumer/handlers_test.go b/src/msg/consumer/handlers_test.go index 63ae802c3c..1c7f9d99c5 100644 --- a/src/msg/consumer/handlers_test.go +++ b/src/msg/consumer/handlers_test.go @@ -28,6 +28,7 @@ import ( "github.com/m3db/m3/src/msg/generated/proto/msgpb" "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/x/server" + xtest "github.com/m3db/m3/src/x/test" "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" @@ -59,7 +60,7 @@ func TestServerWithMessageFn(t *testing.T) { l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - s := server.NewServer("a", NewMessageHandler(p, opts), server.NewOptions()) + s := server.NewServer("a", NewMessageHandler(SingletonMessageProcessor(p), opts), server.NewOptions()) s.Serve(l) conn, err := net.Dial("tcp", l.Addr().String()) @@ -88,57 +89,57 @@ func TestServerWithMessageFn(t *testing.T) { s.Close() } -func TestServerWithConsumeFn(t *testing.T) { +func TestServerMessageDifferentConnections(t *testing.T) { defer leaktest.Check(t)() - var ( - count = 0 - bytes []byte - closed bool - wg sync.WaitGroup - ) - consumeFn := func(c Consumer) { - for { - count++ - m, err := c.Message() - if err != nil { - break - } - bytes = m.Bytes() - m.Ack() - wg.Done() - } - c.Close() - closed = true - } + ctrl := xtest.NewController(t) + defer ctrl.Finish() l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) + var wg sync.WaitGroup + wg.Add(2) + handleMessage := func(m Message) { + wg.Done() + } + + mp1 := NewMockMessageProcessor(ctrl) + mp2 := NewMockMessageProcessor(ctrl) + mp1.EXPECT().Process(gomock.Any()).Do(handleMessage) + mp1.EXPECT().Close() + mp2.EXPECT().Process(gomock.Any()).Do(handleMessage) + mp2.EXPECT().Close() + // Set a large ack buffer size to make sure the background go routine // can flush it. opts := testOptions().SetAckBufferSize(100) - s := server.NewServer("a", NewConsumerHandler(consumeFn, opts), server.NewOptions()) - require.NoError(t, err) - s.Serve(l) + first := true + var mu sync.Mutex + newMessageProcessor := func() MessageProcessor { + mu.Lock() + defer mu.Unlock() + if first { + first = false + return mp1 + } + return mp2 + } - conn, err := net.Dial("tcp", l.Addr().String()) + s := server.NewServer("a", NewMessageHandler(newMessageProcessor, opts), server.NewOptions()) require.NoError(t, err) + require.NoError(t, s.Serve(l)) - wg.Add(1) - err = produce(conn, &testMsg1) + conn1, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + conn2, err := net.Dial("tcp", l.Addr().String()) require.NoError(t, err) - wg.Wait() - require.Equal(t, testMsg1.Value, bytes) - - var ack msgpb.Ack - testDecoder := proto.NewDecoder(conn, opts.DecoderOptions(), 10) - err = testDecoder.Decode(&ack) + err = produce(conn1, &testMsg1) + require.NoError(t, err) + err = produce(conn2, &testMsg1) require.NoError(t, err) - require.Equal(t, 1, len(ack.Metadata)) - require.Equal(t, testMsg1.Metadata, ack.Metadata[0]) + wg.Wait() s.Close() - require.True(t, closed) } diff --git a/src/msg/consumer/types.go b/src/msg/consumer/types.go index 79b7c243b1..bdd1ea3f06 100644 --- a/src/msg/consumer/types.go +++ b/src/msg/consumer/types.go @@ -129,6 +129,28 @@ type MessageProcessor interface { Close() } +// NewMessageProcessorFn creates a new MessageProcessor scoped to a single connection. Messages are processed serially +// in a connection. +type NewMessageProcessorFn func() MessageProcessor + +// SingletonMessageProcessor uses the same MessageProcessor for all connections. +func SingletonMessageProcessor(p MessageProcessor) NewMessageProcessorFn { + return func() MessageProcessor { + return p + } +} + +// NewNoOpMessageProcessor creates a new MessageProcessor that does nothing. +func NewNoOpMessageProcessor() MessageProcessor { + return &noOpMessageProcessor{} +} + +type noOpMessageProcessor struct{} + +func (n noOpMessageProcessor) Process(Message) {} + +func (n noOpMessageProcessor) Close() {} + // ConsumeFn processes the consumer. This is useful when user want to reuse // resource across messages received on the same consumer or have finer level // control on how to read messages from consumer. diff --git a/src/query/server/query.go b/src/query/server/query.go index 523a6e30f1..ea8e43b021 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -380,6 +380,8 @@ func Run(runOpts RunOptions) RunResult { } cfg.LookbackDuration = &lookbackDuration + promConvertOptions := cfg.Query.Prometheus.ConvertOptionsOrDefault() + readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools( instrumentOptions, cfg.ReadWorkerPool, @@ -401,7 +403,8 @@ func Run(runOpts RunOptions) RunResult { SetConsolidationFunc(consolidators.TakeLast). SetReadWorkerPool(readWorkerPool). SetWriteWorkerPool(writeWorkerPool). - SetSeriesConsolidationMatchOptions(matchOptions) + SetSeriesConsolidationMatchOptions(matchOptions). + SetPromConvertOptions(promConvertOptions) if runOpts.ApplyCustomTSDBOptions != nil { tsdbOpts, err = runOpts.ApplyCustomTSDBOptions(tsdbOpts, instrumentOptions) diff --git a/src/query/storage/options.go b/src/query/storage/options.go index 6b360b5f24..ea0237e9cf 100644 --- a/src/query/storage/options.go +++ b/src/query/storage/options.go @@ -20,7 +20,11 @@ package storage -import "time" +import ( + "time" + + xtime "github.com/m3db/m3/src/x/time" +) const ( defaultResolutionThresholdForCounterNormalization = time.Hour @@ -28,6 +32,9 @@ const ( type promConvertOptions struct { resolutionThresholdForCounterNormalization time.Duration + + valueDecreaseTolerance float64 + valueDecreaseToleranceUntil xtime.UnixNano } // NewPromConvertOptions builds a new PromConvertOptions with default values. @@ -46,3 +53,23 @@ func (o *promConvertOptions) SetResolutionThresholdForCounterNormalization(value func (o *promConvertOptions) ResolutionThresholdForCounterNormalization() time.Duration { return o.resolutionThresholdForCounterNormalization } + +func (o *promConvertOptions) SetValueDecreaseTolerance(value float64) PromConvertOptions { + opts := *o + opts.valueDecreaseTolerance = value + return &opts +} + +func (o *promConvertOptions) ValueDecreaseTolerance() float64 { + return o.valueDecreaseTolerance +} + +func (o *promConvertOptions) SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions { + opts := *o + opts.valueDecreaseToleranceUntil = value + return &opts +} + +func (o *promConvertOptions) ValueDecreaseToleranceUntil() xtime.UnixNano { + return o.valueDecreaseToleranceUntil +} diff --git a/src/query/storage/prom_converter.go b/src/query/storage/prom_converter.go index 115929a3d0..132c1970bc 100644 --- a/src/query/storage/prom_converter.go +++ b/src/query/storage/prom_converter.go @@ -48,6 +48,9 @@ func iteratorToPromResult( resolution = xtime.UnixNano(maxResolution) resolutionThreshold = promConvertOptions.ResolutionThresholdForCounterNormalization() + valueDecreaseTolerance = promConvertOptions.ValueDecreaseTolerance() + valueDecreaseToleranceUntil = promConvertOptions.ValueDecreaseToleranceUntil() + firstDP = true handleResets = false annotationPayload annotation.Payload @@ -61,6 +64,12 @@ func iteratorToPromResult( for iter.Next() { dp, _, _ := iter.Current() + if valueDecreaseTolerance > 0 && dp.TimestampNanos.Before(valueDecreaseToleranceUntil) { + if !firstDP && dp.Value < prevDP.Value && dp.Value > prevDP.Value*(1-valueDecreaseTolerance) { + dp.Value = prevDP.Value + } + } + if firstDP && maxResolution >= resolutionThreshold { firstAnnotation := iter.FirstAnnotation() if len(firstAnnotation) > 0 { @@ -85,8 +94,6 @@ func iteratorToPromResult( } else { cumulativeSum += dp.Value - prevDP.Value } - - prevDP = dp } else { samples = append(samples, prompb.Sample{ Timestamp: TimeToPromTimestamp(dp.TimestampNanos), @@ -94,6 +101,7 @@ func iteratorToPromResult( }) } + prevDP = dp firstDP = false } diff --git a/src/query/storage/prom_converter_test.go b/src/query/storage/prom_converter_test.go index a5a166fba3..4bd639ba99 100644 --- a/src/query/storage/prom_converter_test.go +++ b/src/query/storage/prom_converter_test.go @@ -304,7 +304,10 @@ func TestDecodeIteratorsWithEmptySeries(t *testing.T) { } func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) { - t0 := xtime.Now().Truncate(time.Hour) + var ( + t0 = xtime.Now().Truncate(time.Hour) + opts = NewPromConvertOptions() + ) tests := []struct { name string @@ -419,18 +422,106 @@ func TestSeriesIteratorsToPromResultNormalizeLowResCounters(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - testSeriesIteratorsToPromResultNormalize( - t, tt.isCounter, tt.maxResolution, tt.given, tt.want) + testSeriesIteratorsToPromResult( + t, tt.isCounter, tt.maxResolution, tt.given, tt.want, opts) + }) + } +} + +func TestSeriesIteratorsToPromResultValueDecreaseTolerance(t *testing.T) { + now := xtime.Now().Truncate(time.Hour) + + tests := []struct { + name string + given []float64 + tolerance float64 + until xtime.UnixNano + want []float64 + }{ + { + name: "no tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0, + until: 0, + want: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + }, + { + name: "low tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0.00000001, + until: now.Add(time.Hour), + want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 199.99}, + }, + { + name: "high tolerance", + given: []float64{187.80131100000006, 187.801311, 187.80131100000006, 187.801311, 200, 199.99}, + tolerance: 0.0001, + until: now.Add(time.Hour), + want: []float64{187.80131100000006, 187.80131100000006, 187.80131100000006, 187.80131100000006, 200, 200}, + }, + { + name: "tolerance expired", + given: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + tolerance: 0.0001, + until: now, + want: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + }, + { + name: "tolerance expires in the middle", + given: []float64{200, 199.99, 200, 199.99, 200, 199.99}, + tolerance: 0.0001, + until: now.Add(3 * time.Minute), + want: []float64{200, 200, 200, 199.99, 200, 199.99}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testSeriesIteratorsToPromResultValueDecreaseTolerance( + t, now, tt.given, tt.want, tt.tolerance, tt.until) + }) + } +} + +func testSeriesIteratorsToPromResultValueDecreaseTolerance( + t *testing.T, + now xtime.UnixNano, + input []float64, + expectedOutput []float64, + decreaseTolerance float64, + toleranceUntil xtime.UnixNano, +) { + var ( + given = make([]dts.Datapoint, 0, len(input)) + want = make([]prompb.Sample, 0, len(expectedOutput)) + ) + for i, v := range input { + given = append(given, dts.Datapoint{ + TimestampNanos: now.Add(time.Duration(i) * time.Minute), + Value: v, }) } + for i, v := range expectedOutput { + want = append(want, prompb.Sample{ + Timestamp: ms(now.Add(time.Duration(i) * time.Minute)), + Value: v, + }) + } + + opts := NewPromConvertOptions(). + SetValueDecreaseTolerance(decreaseTolerance). + SetValueDecreaseToleranceUntil(toleranceUntil) + + testSeriesIteratorsToPromResult(t, false, 0, given, want, opts) } -func testSeriesIteratorsToPromResultNormalize( +func testSeriesIteratorsToPromResult( t *testing.T, isCounter bool, maxResolution time.Duration, given []dts.Datapoint, want []prompb.Sample, + opts PromConvertOptions, ) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -480,9 +571,8 @@ func testSeriesIteratorsToPromResultNormalize( fetchResult, err := consolidators.NewSeriesFetchResult(it, nil, fetchResultMetadata) assert.NoError(t, err) - opts := models.NewTagOptions() res, err := SeriesIteratorsToPromResult( - context.Background(), fetchResult, nil, opts, NewPromConvertOptions()) + context.Background(), fetchResult, nil, models.NewTagOptions(), opts) require.NoError(t, err) verifyResult(t, want, res) } diff --git a/src/query/storage/types.go b/src/query/storage/types.go index 3d0a3f6733..27f8898a93 100644 --- a/src/query/storage/types.go +++ b/src/query/storage/types.go @@ -50,8 +50,6 @@ const ( TypeRemoteDC // TypeMultiDC is for storages that will aggregate multiple datacenters. TypeMultiDC - // TypeDebug is for storages that are used for debugging purposes. - TypeDebug ) // ErrorBehavior describes what this storage type should do on error. This is @@ -367,4 +365,16 @@ type PromConvertOptions interface { // ResolutionThresholdForCounterNormalization returns resolution // starting from which (inclusive) a normalization of counter values is performed. ResolutionThresholdForCounterNormalization() time.Duration + + // SetValueDecreaseTolerance sets relative tolerance against decoded time series value decrease. + SetValueDecreaseTolerance(value float64) PromConvertOptions + + // ValueDecreaseTolerance returns relative tolerance against decoded time series value decrease. + ValueDecreaseTolerance() float64 + + // SetValueDecreaseToleranceUntil sets the timestamp (exclusive) until which the tolerance applies. + SetValueDecreaseToleranceUntil(value xtime.UnixNano) PromConvertOptions + + // ValueDecreaseToleranceUntil the timestamp (exclusive) until which the tolerance applies. + ValueDecreaseToleranceUntil() xtime.UnixNano }