Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k212] fix: special case the return values from a sharded first/last_over_time query #13667

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
Expand Down
60 changes: 59 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestMappingEquivalence(t *testing.T) {
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
t.Run(tc.query+"_range", func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
start,
Expand Down Expand Up @@ -125,6 +125,46 @@ func TestMappingEquivalence(t *testing.T) {
require.Equal(t, res.Data, shardedRes.Data)
}
})
t.Run(tc.query+"_instant", func(t *testing.T) {
// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(0, int64(rounds+1)),
time.Unix(0, int64(rounds+1)),
0,
0,
logproto.FORWARD,
uint32(limit),
nil,
nil,
)
require.NoError(t, err)
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: mapped,
})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

if tc.approximate {
approximatelyEqualsVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector)) //, tc.realtiveError)
} else {
require.Equal(t, res.Data, shardedRes.Data)
}
})
}
}

Expand Down Expand Up @@ -579,6 +619,24 @@ func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
}
}

// approximatelyEqualsVector ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEqualsVector(t *testing.T, as, bs promql.Vector) {
require.Len(t, bs, len(as))

for i := 0; i < len(as); i++ {
a := as[i]
b := bs[i]
require.Equal(t, a.Metric, b.Metric)

aSample := a.F
aSample = math.Round(aSample*1e6) / 1e6
bSample := b.F
bSample = math.Round(bSample*1e6) / 1e6
require.Equalf(t, aSample, bSample, "metric %s differs from %s at %d", a.Metric, b.Metric, i)
}
}

func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) {
require.Len(t, actual, len(expected))

Expand Down
68 changes: 44 additions & 24 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
mfl := false
if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
mfl = true
}
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -381,9 +385,31 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, errors.New("unexpected empty result")
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
seriesIndex := map[uint64]*promql.Series{}
func vectorsToSeries(vec promql.Vector, sm map[uint64]promql.Series) {
for _, p := range vec {
var (
series promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = sm[hash]
if !ok {
series = promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, 1),
}
sm[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
sm[hash] = series
}
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) {
vec := promql.Vector{}
if next {
vec = r.SampleVector()
Expand All @@ -393,8 +419,21 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
if len(vec) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
seriesIndex := map[uint64]promql.Series{}

if GetRangeType(q.params) == InstantType {
// an instant query sharded first/last_over_time can return a single vector
if mergeFirstLast {
vectorsToSeries(vec, seriesIndex)
series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
return result, stepEvaluator.Error()
}

sortByValue, err := Sortable(q.params)
if err != nil {
return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err)
Expand All @@ -412,26 +451,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

for next {
vec = r.SampleVector()
for _, p := range vec {
var (
series *promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = seriesIndex[hash]
if !ok {
series = &promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, stepCount),
}
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
}
vectorsToSeries(vec, seriesIndex)
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
Expand All @@ -444,7 +464,7 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, *s)
series = append(series, s)
}
result := promql.Matrix(series)
sort.Sort(result)
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type mergeOverTimeStepEvaluator struct {

// Next returns the first or last element within one step of each matrix.
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
var vec promql.Vector
vec := promql.Vector{}

e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
Expand All @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
// Merge other results
for i, m := range e.matrices {
for j, series := range m {

if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}
Expand Down Expand Up @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// special case instant queries
if e.step.Milliseconds() == 0 {
return true
}
return (ts-e.step.Milliseconds()) <= t && t < ts
}

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
if err != nil {
return nil, err
}

results = append(results, res)
}

Expand Down
Loading