Skip to content

Commit

Permalink
Make query sharding deterministic (#707)
Browse files Browse the repository at this point in the history
* Make query sharding deterministic

Query sharding was executing queries concurrently and appending their
results without any specific order. Unfortunately, basic mathematical
operations on floats are not conmutative. Given float numbers
a = 0.03298, b = 0.09894, the sum a+a+b differs from a+b+a.

We can't fix float arithmetics, but at least we can make the result
deterministic, so weird query results will be easier to debug.

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Oleg Zaytsev <[email protected]>

* Update comment in pkg/querier/queryrange/sharded_queryable.go

Co-authored-by: Mauro Stettler <[email protected]>

* Add conditional comment in labelsForShardsGenerator

Signed-off-by: Oleg Zaytsev <[email protected]>

* Comment why no mutex is needed

Signed-off-by: Oleg Zaytsev <[email protected]>

Co-authored-by: Mauro Stettler <[email protected]>
  • Loading branch information
colega and replay authored Jan 12, 2022
1 parent 802c0f3 commit 739c7a4
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 80 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* `-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout` default from `20m` to `1h`
* [CHANGE] Distributor: removed the `-distributor.shard-by-all-labels` configuration option. It is now assumed to be true. #698
* [FEATURE] Query Frontend: Add `cortex_query_fetched_chunks_total` per-user counter to expose the number of chunks fetched as part of queries. This metric can be enabled with the `-frontend.query-stats-enabled` flag (or its respective YAML config option `query_stats_enabled`). #31
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660
* [FEATURE] Query Frontend: Add experimental querysharding for the blocks storage (instant and range queries). You can now enable querysharding for blocks storage (`-store.engine=blocks`) by setting `-query-frontend.parallelize-shardable-queries` to `true`. The following additional config and exported metrics have been added. #79 #80 #100 #124 #140 #148 #150 #151 #153 #154 #155 #156 #157 #158 #159 #160 #163 #169 #172 #196 #205 #225 #226 #227 #228 #230 #235 #240 #239 #246 #244 #319 #330 #371 #385 #400 #458 #586 #630 #660 #707
* New config options:
* `-frontend.query-sharding-total-shards`: The amount of shards to use when doing parallelisation via query sharding.
* `-frontend.query-sharding-max-sharded-queries`: The max number of sharded queries that can be run for a given received query. 0 to disable limit.
Expand Down
123 changes: 101 additions & 22 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) {
}
}
}

func TestQueryShardingCorrectness(t *testing.T) {
var (
numSeries = 1000
Expand Down Expand Up @@ -477,11 +478,7 @@ func TestQueryShardingCorrectness(t *testing.T) {
}

// Create a queryable on the fixtures.
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: series,
}, nil
})
queryable := storageSeriesQueryable(series)

for testName, testData := range tests {
// Change scope to ensure it work fine when test cases are executed concurrently.
Expand Down Expand Up @@ -600,6 +597,83 @@ func (b byLabels) Less(i, j int) bool {
) < 0
}

func TestQueryshardingDeterminism(t *testing.T) {
const shards = 16

// These are "evil" floats found in production which are the result of a rate of 1 and 3 requests per 1m5s.
// We push them as a gauge here to simplify the test scenario.
const (
evilFloatA = 0.03298
evilFloatB = 0.09894
)
require.NotEqualf(t,
evilFloatA+evilFloatA+evilFloatA,
evilFloatA+evilFloatB+evilFloatA,
"This test is based on the fact that given a=%f and b=%f, then a+a+b != a+b+a. If that is not true, this test is not testing anything.", evilFloatA, evilFloatB,
)

var (
from = time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC)
step = 30 * time.Second
to = from.Add(step)
)

labelsForShard := labelsForShardsGenerator(labels.FromStrings(labels.MetricName, "metric"), shards)
storageSeries := []*promql.StorageSeries{
newSeries(labelsForShard(0), from, to, step, constant(evilFloatA)),
newSeries(labelsForShard(1), from, to, step, constant(evilFloatA)),
newSeries(labelsForShard(2), from, to, step, constant(evilFloatB)),
}

shardingware := newQueryShardingMiddleware(log.NewNopLogger(), newEngine(), mockLimits{totalShards: shards}, prometheus.NewPedanticRegistry())
downstream := &downstreamHandler{engine: newEngine(), queryable: storageSeriesQueryable(storageSeries)}

req := &PrometheusInstantQueryRequest{
Path: "/query",
Time: to.UnixMilli(),
Query: `sum(metric)`,
}

var lastVal float64
for i := 0; i <= 100; i++ {
shardedRes, err := shardingware.Wrap(downstream).Do(user.InjectOrgID(context.Background(), "test"), req)
require.NoError(t, err)

shardedPrometheusRes := shardedRes.(*PrometheusResponse)

sampleStreams, err := responseToSamples(shardedPrometheusRes)
require.NoError(t, err)

require.Lenf(t, sampleStreams, 1, "There should be 1 samples stream (query %d)", i)
require.Lenf(t, sampleStreams[0].Samples, 1, "There should be 1 sample in the first stream (query %d)", i)
val := sampleStreams[0].Samples[0].Value

if i > 0 {
require.Equalf(t, lastVal, val, "Value differs on query %d", i)
}
lastVal = val
}
}

// labelsForShardsGenerator returns a function that provides labels.Labels for the shard requested
// A single generator instance generates different label sets.
func labelsForShardsGenerator(base labels.Labels, shards uint64) func(shard uint64) labels.Labels {
i := 0
return func(shard uint64) labels.Labels {
for {
i++
ls := make(labels.Labels, len(base)+1)
copy(ls, base)
ls[len(ls)-1] = labels.Label{Name: "__test_shard_adjuster__", Value: fmt.Sprintf("adjusted to be %s by %d", sharding.FormatShardIDLabelValue(shard, shards), i)}
sort.Sort(ls)
// If this label value makes this labels combination fall into the desired shard, return it, otherwise keep trying.
if ls.Hash()%shards == shard {
return ls
}
}
}
}

// TestQuerySharding_FunctionCorrectness is the old test that probably at some point inspired the TestQuerySharding_Correctness,
// we keep it here since it adds more test cases.
func TestQuerySharding_FunctionCorrectness(t *testing.T) {
Expand Down Expand Up @@ -700,17 +774,13 @@ func TestQuerySharding_FunctionCorrectness(t *testing.T) {
const numShards = 4
for _, query := range mkQueries(tc.tpl, tc.fn, tc.rangeQuery, tc.args) {
t.Run(query, func(t *testing.T) {
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: []*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
},
}, nil
queryable := storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, start.Add(-lookbackDelta), end, step, factor(5)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, factor(7)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(12)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, start.Add(-lookbackDelta), end, step, factor(11)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, start.Add(-lookbackDelta), end, step, factor(8)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, start.Add(-lookbackDelta), end, step, arithmeticSequence(10)),
})

req := &PrometheusRangeQueryRequest{
Expand Down Expand Up @@ -1050,12 +1120,8 @@ func TestQuerySharding_ShouldReturnErrorInCorrectFormat(t *testing.T) {
queryableErr = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return nil, errors.New("fatal queryable error")
})
queryable = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: []*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)),
},
}, nil
queryable = storageSeriesQueryable([]*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}}, start.Add(-lookbackDelta), end, step, factor(5)),
})
queryableSlow = newMockShardedQueryable(
2,
Expand Down Expand Up @@ -1465,6 +1531,12 @@ func (h *downstreamHandler) Do(ctx context.Context, r Request) (Response, error)
}, nil
}

func storageSeriesQueryable(series []*promql.StorageSeries) storage.Queryable {
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{series: series}, nil
})
}

type querierMock struct {
series []*promql.StorageSeries
}
Expand Down Expand Up @@ -1625,6 +1697,13 @@ func stale(from, to time.Time, wrap generator) generator {
}
}

// constant returns a generator that generates a constant value
func constant(value float64) generator {
return func(ts int64) float64 {
return value
}
}

type seriesIteratorMock struct {
idx int
series []*promql.StorageSeries
Expand Down
108 changes: 58 additions & 50 deletions pkg/querier/queryrange/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,12 @@ func (q *shardedQuerier) Select(_ bool, hints *storage.SelectHints, matchers ...
// handleEmbeddedQueries concurrently executes the provided queries through the downstream handler.
// The returned storage.SeriesSet contains sorted series.
func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.SelectHints) storage.SeriesSet {
var (
jobs = concurrency.CreateJobsFromStrings(queries)
streamsMx sync.Mutex
streams []SampleStream
)
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEach(q.ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(job.(string)))
err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error {
idx := job.(int)
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
}
Expand All @@ -124,13 +121,9 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.
if err != nil {
return err
}
streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables.

q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers)

streamsMx.Lock()
streams = append(streams, resStreams...)
streamsMx.Unlock()

return nil
})

Expand All @@ -156,6 +149,14 @@ func (q *shardedQuerier) Close() error {
return nil
}

func createJobIndexes(l int) []interface{} {
jobs := make([]interface{}, l)
for j := 0; j < l; j++ {
jobs[j] = j
}
return jobs
}

type responseHeadersTracker struct {
headersMx sync.Mutex
headers map[string][]string
Expand Down Expand Up @@ -199,9 +200,14 @@ func (t *responseHeadersTracker) getHeaders() []*PrometheusResponseHeader {
// results.
//
// The returned storage.SeriesSet series is sorted.
func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *storage.SelectHints) storage.SeriesSet {
func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *storage.SelectHints) storage.SeriesSet {
totalLen := 0
for _, r := range results {
totalLen += len(r)
}

var (
set = make([]storage.Series, 0, len(results))
set = make([]storage.Series, 0, totalLen)
step int64
)

Expand All @@ -210,48 +216,50 @@ func newSeriesSetFromEmbeddedQueriesResults(results []SampleStream, hints *stora
step = hints.Step
}

for _, stream := range results {
// We add an extra 10 items to account for some stale markers that could be injected.
// We're trading a lower chance of reallocation in case stale markers are added for a
// slightly higher memory utilisation.
samples := make([]model.SamplePair, 0, len(stream.Samples)+10)

for idx, sample := range stream.Samples {
// When an embedded query is executed by PromQL engine, any stale marker in the time-series
// data is used the engine to stop applying the lookback delta but the stale marker is removed
// from the query results. The result of embedded queries, which we are processing in this function,
// is then used as input to run an outer query in the PromQL engine. This data will not contain
// the stale marker (because has been removed when running the embedded query) but we still need
// the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries
// results. For this reason, here we do inject a stale marker at the beginning of each gap in the
// embedded queries results.
if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step {
for _, result := range results {
for _, stream := range result {
// We add an extra 10 items to account for some stale markers that could be injected.
// We're trading a lower chance of reallocation in case stale markers are added for a
// slightly higher memory utilisation.
samples := make([]model.SamplePair, 0, len(stream.Samples)+10)

for idx, sample := range stream.Samples {
// When an embedded query is executed by PromQL engine, any stale marker in the time-series
// data is used the engine to stop applying the lookback delta but the stale marker is removed
// from the query results. The result of embedded queries, which we are processing in this function,
// is then used as input to run an outer query in the PromQL engine. This data will not contain
// the stale marker (because has been removed when running the embedded query) but we still need
// the PromQL engine to not apply the lookback delta when there are gaps in the embedded queries
// results. For this reason, here we do inject a stale marker at the beginning of each gap in the
// embedded queries results.
if step > 0 && idx > 0 && sample.TimestampMs > stream.Samples[idx-1].TimestampMs+step {
samples = append(samples, model.SamplePair{
Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
}

samples = append(samples, model.SamplePair{
Timestamp: model.Time(stream.Samples[idx-1].TimestampMs + step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}

samples = append(samples, model.SamplePair{
Timestamp: model.Time(sample.TimestampMs),
Value: model.SampleValue(sample.Value),
})
}
// In case the embedded query processed series which all ended before the end of the query time range,
// we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it
// simple, it's safe always to add an extra stale marker at the end of the query results.
//
// This could result in an extra sample (stale marker) after the end of the query time range, but that's
// not a problem when running the outer query because it will just be discarded.
if len(samples) > 0 && step > 0 {
samples = append(samples, model.SamplePair{
Timestamp: samples[len(samples)-1].Timestamp + model.Time(step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
}

// In case the embedded query processed series which all ended before the end of the query time range,
// we don't want the outer query to apply the lookback at the end of the embedded query results. To keep it
// simple, it's safe always to add an extra stale marker at the end of the query results.
//
// This could result into an extra sample (stale marker) after the end of the query time range, but that's
// not a problem when running the outer query because it will just be discarded.
if len(samples) > 0 && step > 0 {
samples = append(samples, model.SamplePair{
Timestamp: samples[len(samples)-1].Timestamp + model.Time(step),
Value: model.SampleValue(math.Float64frombits(value.StaleNaN)),
})
set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples))
}

set = append(set, series.NewConcreteSeries(mimirpb.FromLabelAdaptersToLabels(stream.Labels), samples))
}
return series.NewConcreteSeriesSet(set)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/sharded_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func TestNewSeriesSetFromEmbeddedQueriesResults(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
set := newSeriesSetFromEmbeddedQueriesResults(testData.input, testData.hints)
set := newSeriesSetFromEmbeddedQueriesResults([][]SampleStream{testData.input}, testData.hints)
actual, err := seriesSetToSampleStreams(set)
require.NoError(t, err)
assertEqualSampleStream(t, testData.expected, actual)
Expand Down
7 changes: 1 addition & 6 deletions pkg/querier/queryrange/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -558,11 +557,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
}

// Create a queryable on the fixtures.
queryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &querierMock{
series: series,
}, nil
})
queryable := storageSeriesQueryable(series)

// Create a downstream handler serving range queries based on the provided queryable.
downstream := &downstreamHandler{
Expand Down

0 comments on commit 739c7a4

Please sign in to comment.