Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add support for last_over_time #3884

Merged
merged 1 commit into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
// StdVarType calculates the standard variance of all values in the specified interval.
StdVarType = "stdvar_over_time"

// LastType returns the most recent value in the specified interval.
LastType = "last_over_time"

// QuantileType calculates the φ-quantile (0 ≤ φ ≤ 1) of the values in the specified interval.
QuantileType = "quantile_over_time"
)
Expand All @@ -67,6 +70,7 @@ var (
SumType: sumOverTime,
StdDevType: stddevOverTime,
StdVarType: stdvarOverTime,
LastType: lastOverTime,
}
)

Expand Down Expand Up @@ -220,6 +224,15 @@ func stdvarOverTime(values []float64) float64 {
return aux / count
}

func lastOverTime(values []float64) float64 {
length := len(values)
if length == 0 {
return math.NaN()
}

return values[length-1]
}

func sumAndCount(values []float64) (float64, float64) {
sum := 0.0
count := 0.0
Expand Down
36 changes: 36 additions & 0 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,42 @@ var aggregationTestCases = []testCase{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
},
{
name: "last_over_time",
opType: LastType,
vals: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
expected: [][]float64{
{nan, 1, 2, 3, 4, 0, 1, 2, 3, 4},
{5, 6, 7, 8, 9, 5, 6, 7, 8, 9},
},
},
{
name: "last_over_time leading NaNs",
opType: LastType,
vals: [][]float64{
{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
},
expected: [][]float64{
{nan, 1, nan, 3, nan, nan, 2, nan, nan, nan},
{5, nan, nan, nan, nan, nan, nan, 7, nan, nan},
},
},
{
name: "last_over_time all NaNs",
opType: LastType,
vals: [][]float64{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
expected: [][]float64{
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
{nan, nan, nan, nan, nan, nan, nan, nan, nan, nan},
},
},
{
name: "quantile_over_time",
opType: QuantileType,
Expand Down
38 changes: 26 additions & 12 deletions src/query/functions/temporal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (c *baseNode) batchProcess(
idx = idx + batch.Size
p := c.makeProcessor.initialize(c.op.duration, c.transformOpts)
go func() {
err := parallelProcess(ctx, loopIndex, batch.Iter, builder, m, p, &mu)
err := parallelProcess(ctx, c.op.OpType(), loopIndex, batch.Iter, builder, m, p, &mu)
if err != nil {
mu.Lock()
// NB: this no-ops if the error is nil.
Expand All @@ -226,6 +226,7 @@ func (c *baseNode) batchProcess(

func parallelProcess(
ctx context.Context,
opType string,
idx int,
iter block.SeriesIter,
builder block.Builder,
Expand Down Expand Up @@ -275,10 +276,15 @@ func parallelProcess(
decodeDuration += stats.DecodeDuration
}

// rename series to exclude their __name__ tag as
// part of function processing.
seriesMeta.Tags = seriesMeta.Tags.WithoutName()
seriesMeta.Name = seriesMeta.Tags.ID()
// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// rename series to exclude their __name__ tag as part of function processing.
if opType != LastType {
seriesMeta.Tags = seriesMeta.Tags.WithoutName()
seriesMeta.Name = seriesMeta.Tags.ID()
}

values = values[:0]
for i := 0; i < blockMeta.steps; i++ {
iterBounds := iterationBounds{
Expand Down Expand Up @@ -342,14 +348,22 @@ func (c *baseNode) singleProcess(
return nil, err
}

// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// rename series to exclude their __name__ tag as part of function processing.
resultSeriesMeta := make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
for _, m := range seriesIter.SeriesMeta() {
tags := m.Tags.WithoutName()
resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
Name: tags.ID(),
Tags: tags,
})
var resultSeriesMeta []block.SeriesMeta
if c.op.OpType() != LastType {
resultSeriesMeta = make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta()))
for _, m := range seriesIter.SeriesMeta() {
tags := m.Tags.WithoutName()
resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{
Name: tags.ID(),
Tags: tags,
})
}
} else {
resultSeriesMeta = seriesIter.SeriesMeta()
}

meta := b.Meta()
Expand Down
14 changes: 11 additions & 3 deletions src/query/functions/temporal/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,17 @@ func testTemporalFunc(t *testing.T, opGen opGenerator, tests []testCase) {
Value: []byte("v2"),
}})}

// NB: name should be dropped from series tags, and the name
// should be the updated ID.
expectedSeriesMetas := []block.SeriesMeta{metaOne, metaTwo}
// The last_over_time function acts like offset;
// thus, it should keep the metric name.
// For all other functions,
// name should be dropped from series tags,
// and the name should be the updated ID.
var expectedSeriesMetas []block.SeriesMeta
if tt.opType != LastType {
expectedSeriesMetas = []block.SeriesMeta{metaOne, metaTwo}
} else {
expectedSeriesMetas = seriesMetas
}
require.Equal(t, expectedSeriesMetas, sink.Metas)
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/parser/promql/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func NewFunctionExpr(

case temporal.AvgType, temporal.CountType, temporal.MinType,
temporal.MaxType, temporal.SumType, temporal.StdDevType,
temporal.StdVarType:
temporal.StdVarType, temporal.LastType:
p, err = temporal.NewAggOp(argValues, name)
return p, true, err

Expand Down
1 change: 1 addition & 0 deletions src/query/parser/promql/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ var temporalParseTests = []struct {
{"sum_over_time(up[5m])", temporal.SumType},
{"stddev_over_time(up[5m])", temporal.StdDevType},
{"stdvar_over_time(up[5m])", temporal.StdVarType},
{"last_over_time(up[5m])", temporal.LastType},
{"quantile_over_time(0.2, up[5m])", temporal.QuantileType},
{"irate(up[5m])", temporal.IRateType},
{"idelta(up[5m])", temporal.IDeltaType},
Expand Down
40 changes: 20 additions & 20 deletions src/query/test/compatibility/testdata/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -593,26 +593,26 @@ load 10s
data{type="some_nan3"} NaN 0 1
data{type="only_nan"} NaN NaN NaN

# Failing with keepNaN feature. eval instant at 1m min_over_time(data[1m])
# {type="numbers"} 0
# {type="some_nan"} 0
# {type="some_nan2"} 1
# {type="some_nan3"} 0
# {type="only_nan"} NaN

# Failing with keepNaN feature. eval instant at 1m max_over_time(data[1m])
# {type="numbers"} 3
# {type="some_nan"} 2
# {type="some_nan2"} 2
# {type="some_nan3"} 1
# {type="only_nan"} NaN

#eval instant at 1m last_over_time(data[1m])
# data{type="numbers"} 3
# data{type="some_nan"} NaN
# data{type="some_nan2"} 1
# data{type="some_nan3"} 1
# data{type="only_nan"} NaN
eval instant at 1m min_over_time(data[1m])
{type="numbers"} 0
{type="some_nan"} 0
{type="some_nan2"} 1
{type="some_nan3"} 0
# Failing with keepNaN feature. {type="only_nan"} NaN

eval instant at 1m max_over_time(data[1m])
{type="numbers"} 3
{type="some_nan"} 2
{type="some_nan2"} 2
{type="some_nan3"} 1
# Failing with keepNaN feature. {type="only_nan"} NaN

eval instant at 1m last_over_time(data[1m])
data{type="numbers"} 3
data{type="some_nan2"} 1
data{type="some_nan3"} 1
# Failing with keepNaN feature. data{type="some_nan"} NaN
# Failing with keepNaN feature. data{type="only_nan"} NaN

clear

Expand Down