diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index f28eaddde3f1a..615ddd0f3f47d 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) } } - return NewMergeLastOverTimeStepEvaluator(params, xs), nil default: return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ae313ea1fc48f..b1082f3d6ad76 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) { regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) - t.Run(tc.query, func(t *testing.T) { + t.Run(tc.query+"_range", func(t *testing.T) { params, err := NewLiteralParams( tc.query, start, @@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) { require.Equal(t, res.Data, shardedRes.Data) } }) + t.Run(tc.query+"_instant", func(t *testing.T) { + // for an instant query we set the start and end to the same timestamp + // plus set step and interval to 0 + params, err := NewLiteralParams( + tc.query, + time.Unix(0, int64(rounds+1)), + time.Unix(0, int64(rounds+1)), + 0, + 0, + logproto.FORWARD, + uint32(limit), + nil, + nil, + ) + require.NoError(t, err) + qry := regular.Query(params) + ctx := user.InjectOrgID(context.Background(), "fake") + + strategy := NewPowerOfTwoStrategy(ConstantShards(shards)) + mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg) + _, _, mapped, err := mapper.Parse(params.GetExpression()) + require.NoError(t, err) + + shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: mapped, + }) + + res, err := qry.Exec(ctx) + require.NoError(t, err) + + shardedRes, err := shardedQry.Exec(ctx) + require.NoError(t, err) + + if tc.approximate { + approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError) + } else { + require.Equal(t, res.Data, shardedRes.Data) + } + }) } } @@ -579,6 +619,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { } } +// approximatelyEqualsVector ensures two responses are approximately equal, +// up to 6 decimals precision per sample +func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) { + require.Len(t, bs, len(as)) + + for i := 0; i < len(as); i++ { + a := as[i] + b := bs[i] + require.Equal(t, a.Metric, b.Metric) + + aSample := a.F + aSample = math.Round(aSample*1e6) / 1e6 + bSample := b.F + bSample = math.Round(bSample*1e6) / 1e6 + require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i) + } +} + func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) { require.Len(t, actual, len(expected)) diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 0a26520b673c7..294fe81457de2 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ case SampleVector: maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) - return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries) + mfl := false + if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) { + mfl = true + } + return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl) case ProbabilisticQuantileVector: return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params) default: @@ -381,9 +385,31 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ return nil, errors.New("unexpected empty result") } -func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { - seriesIndex := map[uint64]*promql.Series{} +func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) { + for _, p := range vec { + var ( + series promql.Series + hash = p.Metric.Hash() + ok bool + ) + + series, ok = sm[hash] + if !ok { + series = promql.Series{ + Metric: p.Metric, + Floats: make([]promql.FPoint, 0, 1), + } + sm[hash] = series + } + series.Floats = append(series.Floats, promql.FPoint{ + T: p.T, + F: p.F, + }) + sm[hash] = series + } +} +func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) { vec := promql.Vector{} if next { vec = r.SampleVector() @@ -393,8 +419,21 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval if len(vec) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) } + seriesIndex := map[uint64]promql.Series{} if GetRangeType(q.params) == InstantType { + // an instant query sharded first/last_over_time can return a single vector + if mergeFirstLast { + vectorsToSeries(vec, seriesIndex) + series := make([]promql.Series, 0, len(seriesIndex)) + for _, s := range seriesIndex { + series = append(series, s) + } + result := promql.Matrix(series) + sort.Sort(result) + return result, stepEvaluator.Error() + } + sortByValue, err := Sortable(q.params) if err != nil { return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err) @@ -412,26 +451,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval for next { vec = r.SampleVector() - for _, p := range vec { - var ( - series *promql.Series - hash = p.Metric.Hash() - ok bool - ) - - series, ok = seriesIndex[hash] - if !ok { - series = &promql.Series{ - Metric: p.Metric, - Floats: make([]promql.FPoint, 0, stepCount), - } - seriesIndex[hash] = series - } - series.Floats = append(series.Floats, promql.FPoint{ - T: p.T, - F: p.F, - }) - } + vectorsToSeries(vec, seriesIndex) // as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. if len(seriesIndex) > maxSeries { return nil, logqlmodel.NewSeriesLimitError(maxSeries) @@ -444,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval series := make([]promql.Series, 0, len(seriesIndex)) for _, s := range seriesIndex { - series = append(series, *s) + series = append(series, s) } result := promql.Matrix(series) sort.Sort(result) diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 6d0329cacf8d2..4b6bbde55173c 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct { // Next returns the first or last element within one step of each matrix. func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { - var vec promql.Vector + vec := promql.Vector{} e.ts = e.ts.Add(e.step) if e.ts.After(e.end) { @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) { // Merge other results for i, m := range e.matrices { for j, series := range m { - if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { continue } @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) { // inRange returns true if t is in step range of ts. func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { + // special case instant queries + if e.step.Milliseconds() == 0 { + return true + } return (ts-e.step.Milliseconds()) <= t && t < ts } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 7c9d3233f4299..61a60e44f80fa 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu if err != nil { return nil, err } - results = append(results, res) }