Skip to content

Commit

Permalink
Fix ruler query failure reporting (cortexproject#4335)
Browse files Browse the repository at this point in the history
* This patch tries to fix problem with user-errors reported as internal errors, and adds integration test for it.

Signed-off-by: Peter Štibraný <[email protected]>

* Allow passing custom error-wrapping function to ErrorTranslateQueryable.

Signed-off-by: Peter Štibraný <[email protected]>

* Wrap errors returned by Queryable to custom wrapper.

This allows us to distinguish between those errors and errors returned by PromQL engine, and react appropriately.

Signed-off-by: Peter Štibraný <[email protected]>

* Improve ruler test to check for more scenarios.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <[email protected]>
Signed-off-by: Alvin Lin <[email protected]>
  • Loading branch information
pstibrany authored and alvinlin123 committed Jan 14, 2022
1 parent ff72ed6 commit 11a31e1
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 38 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## master / unreleased
* [FEATURE] Ruler: Add new `-ruler.query-stats-enabled` which when enabled will report the `cortex_ruler_query_seconds_total` as a per-user metric that tracks the sum of the wall time of executing queries in the ruler in seconds. #4317

* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4357
* [CHANGE] Update Go version to 1.16.6. #4362
* [CHANGE] Querier / ruler: Change `-querier.max-fetched-chunks-per-query` configuration to limit to maximum number of chunks that can be fetched in a single query. The number of chunks fetched by ingesters AND long-term storare combined should not exceed the value configured on `-querier.max-fetched-chunks-per-query`. #4260
* [CHANGE] Memberlist: the `memberlist_kv_store_value_bytes` has been removed due to values no longer being stored in-memory as encoded bytes. #4345
Expand All @@ -25,7 +25,7 @@
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4357
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335

## 1.10.0-rc.0 / 2021-06-28

Expand Down
12 changes: 8 additions & 4 deletions integration/e2e/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ const (
)

// NewMinio returns minio server, used as a local replacement for S3.
func NewMinio(port int, bktName string) *e2e.HTTPService {
func NewMinio(port int, bktNames ...string) *e2e.HTTPService {
minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master"
commands := []string{
"curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'",
"mkdir -p /data/%s && minio server --address :%v --quiet /data",
fmt.Sprintf("curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", minioKESGithubContent, minioKESGithubContent),
}

for _, bkt := range bktNames {
commands = append(commands, fmt.Sprintf("mkdir -p /data/%s", bkt))
}
commands = append(commands, fmt.Sprintf("minio server --address :%v --quiet /data", port))

m := e2e.NewHTTPService(
fmt.Sprintf("minio-%v", port),
images.Minio,
// Create the "cortex" bucket before starting minio
e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, port)),
e2e.NewCommandWithoutEntrypoint("sh", "-c", strings.Join(commands, " && ")),
e2e.NewHTTPReadinessProbe(port, "/minio/health/ready", 200, 200),
port,
)
Expand Down
5 changes: 5 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
}

defer res.Body.Close()

if res.StatusCode != 202 {
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

return nil
}

Expand Down
160 changes: 160 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,166 @@ func TestRulerAlertmanagerTLS(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_notifications_alertmanagers_discovered"}, e2e.WaitMissingMetrics))
}

func TestRulerMetricsForInvalidQueries(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(false),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "5",
// We need this to make limit work.
"-ingester.stream-chunks-when-using-blocks": "true",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

// Push some series to Cortex -- enough so that we can hit some limits.
for i := 0; i < 10; i++ {
series, _ := generateSeries("metric", time.Now(), prompb.Label{Name: "foo", Value: fmt.Sprintf("%d", i)})

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
// Syntactically correct expression (passes check in ruler), but failing because of invalid regex. This fails in PromQL engine.
"invalid_group": `label_replace(metric, "foo", "$1", "service", "[")`,

// This one fails in querier code, because of limits.
"too_many_chunks_group": `sum(metric)`,
} {
t.Run(groupName, func(t *testing.T) {
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Verify that evaluation of the rule failed.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// But these failures were not reported as "failed queries"
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test.
require.NoError(t, c.DeleteRuleGroup(namespace, groupName))

// Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears).
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

// Remember totalQueries for next test.
totalQueries = newTotalQueries
})
}

// Now let's upload a non-failing rule, and make sure that it works.
t.Run("real_error", func(t *testing.T) {
const groupName = "good_rule"
const expression = `sum(metric{foo=~"1|2"})`

require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule, and succeeded.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Still no failures.
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
require.NoError(t, s.Stop(ingester))

// We should start getting "real" failures now.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}))
})
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}

func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
// Prepare rule group with invalid rule.
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
}
}

func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
t.Helper()

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewQuerierHandler(

api := v1.NewAPI(
engine,
querier.NewErrorTranslateQueryable(queryable), // Translate errors to errors expected by API.
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
nil, // No remote write support.
exemplarQueryable,
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
Expand Down
72 changes: 52 additions & 20 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,72 +69,103 @@ func TranslateToPromqlAPIError(err error) error {
}
}

func NewErrorTranslateQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
return errorTranslateQueryable{q}
// ErrTranslateFn is used to translate or wrap error before returning it by functions in
// storage.SampleAndChunkQueryable interface.
// Input error may be nil.
type ErrTranslateFn func(err error) error

func NewErrorTranslateQueryable(q storage.Queryable) storage.Queryable {
return NewErrorTranslateQueryableWithFn(q, TranslateToPromqlAPIError)
}

func NewErrorTranslateQueryableWithFn(q storage.Queryable, fn ErrTranslateFn) storage.Queryable {
return errorTranslateQueryable{q: q, fn: fn}
}

func NewErrorTranslateSampleAndChunkQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
return NewErrorTranslateSampleAndChunkQueryableWithFn(q, TranslateToPromqlAPIError)
}

func NewErrorTranslateSampleAndChunkQueryableWithFn(q storage.SampleAndChunkQueryable, fn ErrTranslateFn) storage.SampleAndChunkQueryable {
return errorTranslateSampleAndChunkQueryable{q: q, fn: fn}
}

type errorTranslateQueryable struct {
q storage.SampleAndChunkQueryable
q storage.Queryable
fn ErrTranslateFn
}

func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := e.q.Querier(ctx, mint, maxt)
return errorTranslateQuerier{q: q}, TranslateToPromqlAPIError(err)
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
}

type errorTranslateSampleAndChunkQueryable struct {
q storage.SampleAndChunkQueryable
fn ErrTranslateFn
}

func (e errorTranslateSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := e.q.Querier(ctx, mint, maxt)
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
}

func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
func (e errorTranslateSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := e.q.ChunkQuerier(ctx, mint, maxt)
return errorTranslateChunkQuerier{q: q}, TranslateToPromqlAPIError(err)
return errorTranslateChunkQuerier{q: q, fn: e.fn}, e.fn(err)
}

type errorTranslateQuerier struct {
q storage.Querier
q storage.Querier
fn ErrTranslateFn
}

func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name, matchers...)
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) Close() error {
return TranslateToPromqlAPIError(e.q.Close())
return e.fn(e.q.Close())
}

func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateSeriesSet{s}
return errorTranslateSeriesSet{s: s, fn: e.fn}
}

type errorTranslateChunkQuerier struct {
q storage.ChunkQuerier
q storage.ChunkQuerier
fn ErrTranslateFn
}

func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name, matchers...)
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) Close() error {
return TranslateToPromqlAPIError(e.q.Close())
return e.fn(e.q.Close())
}

func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateChunkSeriesSet{s}
return errorTranslateChunkSeriesSet{s: s, fn: e.fn}
}

type errorTranslateSeriesSet struct {
s storage.SeriesSet
s storage.SeriesSet
fn ErrTranslateFn
}

func (e errorTranslateSeriesSet) Next() bool {
Expand All @@ -146,15 +177,16 @@ func (e errorTranslateSeriesSet) At() storage.Series {
}

func (e errorTranslateSeriesSet) Err() error {
return TranslateToPromqlAPIError(e.s.Err())
return e.fn(e.s.Err())
}

func (e errorTranslateSeriesSet) Warnings() storage.Warnings {
return e.s.Warnings()
}

type errorTranslateChunkSeriesSet struct {
s storage.ChunkSeriesSet
s storage.ChunkSeriesSet
fn ErrTranslateFn
}

func (e errorTranslateChunkSeriesSet) Next() bool {
Expand All @@ -166,7 +198,7 @@ func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries {
}

func (e errorTranslateChunkSeriesSet) Err() error {
return TranslateToPromqlAPIError(e.s.Err())
return e.fn(e.s.Err())
}

func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestApiStatusCodes(t *testing.T) {
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
} {
t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) {
r := createPrometheusAPI(errorTranslateQueryable{q: q})
r := createPrometheusAPI(NewErrorTranslateSampleAndChunkQueryable(q))
rec := httptest.NewRecorder()

req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil)
Expand Down
Loading

0 comments on commit 11a31e1

Please sign in to comment.