From 8ce9185921fd5225fa52189944103332df801238 Mon Sep 17 00:00:00 2001
From: Ryan Sammon <ryan.sammon@aiven.io>
Date: Thu, 28 Oct 2021 14:44:27 -0400
Subject: [PATCH] Add support for `last_over_time`

This `<aggregation>_over_time` function was added in
Prometheus v2.26.0.
---
 src/query/functions/temporal/aggregation.go   | 13 ++++++
 .../functions/temporal/aggregation_test.go    | 36 +++++++++++++++++
 src/query/functions/temporal/base.go          | 38 ++++++++++++------
 src/query/functions/temporal/base_test.go     | 14 +++++--
 src/query/parser/promql/matchers.go           |  2 +-
 src/query/parser/promql/parse_test.go         |  1 +
 .../compatibility/testdata/functions.test     | 40 +++++++++----------
 7 files changed, 108 insertions(+), 36 deletions(-)

diff --git a/src/query/functions/temporal/aggregation.go b/src/query/functions/temporal/aggregation.go
index 7a108db6f6..504076c162 100644
--- a/src/query/functions/temporal/aggregation.go
+++ b/src/query/functions/temporal/aggregation.go
@@ -52,6 +52,9 @@ const (
 	// StdVarType calculates the standard variance of all values in the specified interval.
 	StdVarType = "stdvar_over_time"
 
+	// LastType returns the most recent value in the specified interval.
+	LastType = "last_over_time"
+
 	// QuantileType calculates the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
 	QuantileType = "quantile_over_time"
 )
@@ -67,6 +70,7 @@ var (
 		SumType:    sumOverTime,
 		StdDevType: stddevOverTime,
 		StdVarType: stdvarOverTime,
+		LastType:   lastOverTime,
 	}
 )
 
@@ -220,6 +224,15 @@ func stdvarOverTime(values []float64) float64 {
 	return aux / count
 }
 
+func lastOverTime(values []float64) float64 {
+	length := len(values)
+	if length == 0 {
+		return math.NaN()
+	}
+
+	return values[length-1]
+}
+
 func sumAndCount(values []float64) (float64, float64) {
 	sum := 0.0
 	count := 0.0
diff --git a/src/query/functions/temporal/aggregation_test.go b/src/query/functions/temporal/aggregation_test.go
index 9b2612e64f..ed1b515c9d 100644
--- a/src/query/functions/temporal/aggregation_test.go
+++ b/src/query/functions/temporal/aggregation_test.go
@@ -213,6 +213,42 @@ var aggregationTestCases = []testCase{
 			{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
 		},
 	},
+	{
+		name:   "last_over_time",
+		opType: LastType,
+		vals: [][]float64{
+			{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
+			{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
+		},
+		expected: [][]float64{
+			{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
+			{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
+		},
+	},
+	{
+		name:   "last_over_time leading NaNs",
+		opType: LastType,
+		vals: [][]float64{
+			{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
+			{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
+		},
+		expected: [][]float64{
+			{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
+			{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
+		},
+	},
+	{
+		name:   "last_over_time all NaNs",
+		opType: LastType,
+		vals: [][]float64{
+			{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
+			{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
+		},
+		expected: [][]float64{
+			{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
+			{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
+		},
+	},
 	{
 		name:   "quantile_over_time",
 		opType: QuantileType,
diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go
index 2fd50e9eb8..187b78fffc 100644
--- a/src/query/functions/temporal/base.go
+++ b/src/query/functions/temporal/base.go
@@ -208,7 +208,7 @@ func (c *baseNode) batchProcess(
 		idx = idx + batch.Size
 		p := c.makeProcessor.initialize(c.op.duration, c.transformOpts)
 		go func() {
-			err := parallelProcess(ctx, loopIndex, batch.Iter, builder, m, p, &mu)
+			err := parallelProcess(ctx, c.op.OpType(), loopIndex, batch.Iter, builder, m, p, &mu)
 			if err != nil {
 				mu.Lock()
 				// NB: this no-ops if the error is nil.
@@ -226,6 +226,7 @@ func (c *baseNode) batchProcess(
 
 func parallelProcess(
 	ctx context.Context,
+	opType string,
 	idx int,
 	iter block.SeriesIter,
 	builder block.Builder,
@@ -275,10 +276,15 @@ func parallelProcess(
 			decodeDuration += stats.DecodeDuration
 		}
 
-		// rename series to exclude their __name__ tag as
-		// part of function processing.
-		seriesMeta.Tags = seriesMeta.Tags.WithoutName()
-		seriesMeta.Name = seriesMeta.Tags.ID()
+		// The last_over_time function acts like offset;
+		// thus, it should keep the metric name.
+		// For all other functions,
+		// rename series to exclude their __name__ tag as part of function processing.
+		if opType != LastType {
+			seriesMeta.Tags = seriesMeta.Tags.WithoutName()
+			seriesMeta.Name = seriesMeta.Tags.ID()
+		}
+
 		values = values[:0]
 		for i := 0; i < blockMeta.steps; i++ {
 			iterBounds := iterationBounds{
@@ -342,14 +348,22 @@ func (c *baseNode) singleProcess(
 		return nil, err
 	}
 
+	// The last_over_time function acts like offset;
+	// thus, it should keep the metric name.
+	// For all other functions,
 	// rename series to exclude their __name__ tag as part of function processing.
-	resultSeriesMeta := make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
-	for _, m := range seriesIter.SeriesMeta() {
-		tags := m.Tags.WithoutName()
-		resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
-			Name: tags.ID(),
-			Tags: tags,
-		})
+	var resultSeriesMeta []block.SeriesMeta
+	if c.op.OpType() != LastType {
+		resultSeriesMeta = make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
+		for _, m := range seriesIter.SeriesMeta() {
+			tags := m.Tags.WithoutName()
+			resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
+				Name: tags.ID(),
+				Tags: tags,
+			})
+		}
+	} else {
+		resultSeriesMeta = seriesIter.SeriesMeta()
 	}
 
 	meta := b.Meta()
diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go
index 00fa1732c5..d7de453f2e 100644
--- a/src/query/functions/temporal/base_test.go
+++ b/src/query/functions/temporal/base_test.go
@@ -151,9 +151,17 @@ func testTemporalFunc(t *testing.T, opGen opGenerator, tests []testCase) {
 						Value: []byte("v2"),
 					}})}
 
-				// NB: name should be dropped from series tags, and the name
-				// should be the updated ID.
-				expectedSeriesMetas := []block.SeriesMeta{metaOne, metaTwo}
+				// The last_over_time function acts like offset;
+				// thus, it should keep the metric name.
+				// For all other functions,
+				// name should be dropped from series tags,
+				// and the name should be the updated ID.
+				var expectedSeriesMetas []block.SeriesMeta
+				if tt.opType != LastType {
+					expectedSeriesMetas = []block.SeriesMeta{metaOne, metaTwo}
+				} else {
+					expectedSeriesMetas = seriesMetas
+				}
 				require.Equal(t, expectedSeriesMetas, sink.Metas)
 			})
 		}
diff --git a/src/query/parser/promql/matchers.go b/src/query/parser/promql/matchers.go
index abfd94818a..6bcf32113b 100644
--- a/src/query/parser/promql/matchers.go
+++ b/src/query/parser/promql/matchers.go
@@ -261,7 +261,7 @@ func NewFunctionExpr(
 
 	case temporal.AvgType, temporal.CountType, temporal.MinType,
 		temporal.MaxType, temporal.SumType, temporal.StdDevType,
-		temporal.StdVarType:
+		temporal.StdVarType, temporal.LastType:
 		p, err = temporal.NewAggOp(argValues, name)
 		return p, true, err
 
diff --git a/src/query/parser/promql/parse_test.go b/src/query/parser/promql/parse_test.go
index 9c9c4df9d9..0872a7a653 100644
--- a/src/query/parser/promql/parse_test.go
+++ b/src/query/parser/promql/parse_test.go
@@ -469,6 +469,7 @@ var temporalParseTests = []struct {
 	{"sum_over_time(up[5m])", temporal.SumType},
 	{"stddev_over_time(up[5m])", temporal.StdDevType},
 	{"stdvar_over_time(up[5m])", temporal.StdVarType},
+	{"last_over_time(up[5m])", temporal.LastType},
 	{"quantile_over_time(0.2, up[5m])", temporal.QuantileType},
 	{"irate(up[5m])", temporal.IRateType},
 	{"idelta(up[5m])", temporal.IDeltaType},
diff --git a/src/query/test/compatibility/testdata/functions.test b/src/query/test/compatibility/testdata/functions.test
index b289c81408..ea54af385b 100644
--- a/src/query/test/compatibility/testdata/functions.test
+++ b/src/query/test/compatibility/testdata/functions.test
@@ -593,26 +593,26 @@ load 10s
 	data{type="some_nan3"} NaN 0 1
 	data{type="only_nan"} NaN NaN NaN
 
-# Failing with keepNaN feature. eval instant at 1m min_over_time(data[1m])
-#	{type="numbers"} 0
-#	{type="some_nan"} 0
-#	{type="some_nan2"} 1
-#	{type="some_nan3"} 0
-#	{type="only_nan"} NaN
-
-# Failing with keepNaN feature. eval instant at 1m max_over_time(data[1m])
-#	{type="numbers"} 3
-#	{type="some_nan"} 2
-#	{type="some_nan2"} 2
-#	{type="some_nan3"} 1
-#	{type="only_nan"} NaN
-
-#eval instant at 1m last_over_time(data[1m])
-#	data{type="numbers"} 3
-#	data{type="some_nan"} NaN
-#	data{type="some_nan2"} 1
-#	data{type="some_nan3"} 1
-#	data{type="only_nan"} NaN
+eval instant at 1m min_over_time(data[1m])
+	{type="numbers"} 0
+	{type="some_nan"} 0
+	{type="some_nan2"} 1
+	{type="some_nan3"} 0
+	# Failing with keepNaN feature. {type="only_nan"} NaN
+
+eval instant at 1m max_over_time(data[1m])
+	{type="numbers"} 3
+	{type="some_nan"} 2
+	{type="some_nan2"} 2
+	{type="some_nan3"} 1
+	# Failing with keepNaN feature. {type="only_nan"} NaN
+
+eval instant at 1m last_over_time(data[1m])
+	data{type="numbers"} 3
+	data{type="some_nan2"} 1
+	data{type="some_nan3"} 1
+	# Failing with keepNaN feature. data{type="some_nan"} NaN
+	# Failing with keepNaN feature. data{type="only_nan"} NaN
 
 clear