From 8e64e30405395d49d2996de82dc0ec6e25bf4951 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 11 Nov 2022 00:39:40 -0800 Subject: [PATCH] Stateless ruler restores alert state (#5230) * stateless ruler restores alert state Signed-off-by: Ben Ye * update e2e Signed-off-by: Ben Ye * update compatibility test Signed-off-by: Ben Ye * update changelog Signed-off-by: Ben Ye Signed-off-by: Ben Ye Co-authored-by: Ben Ye --- CHANGELOG.md | 1 + cmd/thanos/config.go | 3 + cmd/thanos/rule.go | 71 ++++++++-------- docs/components/rule.md | 13 +++ pkg/promclient/promclient.go | 23 +++++ pkg/rules/queryable.go | 150 +++++++++++++++++++++++++++++++++ test/e2e/compatibility_test.go | 93 ++++++++++++++++++-- test/e2e/e2ethanos/services.go | 35 ++++++-- test/e2e/rule_test.go | 135 +++++++++++++++++++++++++++++ 9 files changed, 476 insertions(+), 48 deletions(-) create mode 100644 pkg/rules/queryable.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 37abfc2758..2f97c7e253 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval. - [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend. +- [#5230](https://github.com/thanos-io/thanos/pull/5230) Rule: Stateless ruler support restoring `for` state from query API servers. The query API servers should be able to access the remote write storage. ### Added diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index ec26713aec..48d459cec9 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -182,6 +182,7 @@ type queryConfig struct { dnsSDInterval time.Duration httpMethod string dnsSDResolver string + step time.Duration } func (qc *queryConfig) registerFlag(cmd extkingpin.FlagClause) *queryConfig { @@ -198,6 +199,8 @@ func (qc *queryConfig) registerFlag(cmd extkingpin.FlagClause) *queryConfig { Default("POST").EnumVar(&qc.httpMethod, "GET", "POST") cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]"). Default("golang").Hidden().StringVar(&qc.dnsSDResolver) + cmd.Flag("query.default-step", "Default range query step to use. This is only used in stateless Ruler and alert state restoration."). + Default("1s").DurationVar(&qc.step) return qc } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 3e803be31d..0bd29b905d 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -85,12 +85,15 @@ type ruleConfig struct { rwConfig *extflag.PathOrContent - resendDelay time.Duration - evalInterval time.Duration - ruleFiles []string - objStoreConfig *extflag.PathOrContent - dataDir string - lset labels.Labels + resendDelay time.Duration + evalInterval time.Duration + outageTolerance time.Duration + forGracePeriod time.Duration + ruleFiles []string + objStoreConfig *extflag.PathOrContent + dataDir string + lset labels.Labels + ignoredLabelNames []string } func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -126,6 +129,12 @@ func registerRule(app *extkingpin.App) { Default("1m").DurationVar(&conf.resendDelay) cmd.Flag("eval-interval", "The default evaluation interval to use."). Default("1m").DurationVar(&conf.evalInterval) + cmd.Flag("for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). + Default("1h").DurationVar(&conf.outageTolerance) + cmd.Flag("for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period."). + Default("10m").DurationVar(&conf.forGracePeriod) + cmd.Flag("restore-ignored-label", "Label names to be ignored when restoring alerts from the remote storage. This is only used in stateless mode."). + StringsVar(&conf.ignoredLabelNames) conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) @@ -321,7 +330,10 @@ func runRule( extprom.WrapRegistererWithPrefix("thanos_rule_query_apis_", reg), dns.ResolverType(conf.query.dnsSDResolver), ) - var queryClients []*httpconfig.Client + var ( + queryClients []*httpconfig.Client + promClients []*promclient.Client + ) queryClientMetrics := extpromhttp.NewClientMetrics(extprom.WrapRegistererWith(prometheus.Labels{"client": "query"}, reg)) for _, cfg := range queryCfg { cfg.HTTPClientConfig.ClientMetrics = queryClientMetrics @@ -335,6 +347,7 @@ func runRule( return err } queryClients = append(queryClients, queryClient) + promClients = append(promClients, promclient.NewClient(queryClient, logger, "thanos-rule")) // Discover and resolve query addresses. addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) } @@ -377,7 +390,10 @@ func runRule( } fanoutStore := storage.NewFanout(logger, agentDB, remoteStore) appendable = fanoutStore - queryable = fanoutStore + // Use a separate queryable to restore the ALERTS firing states. + // We cannot use remoteStore directly because it uses remote read for + // query. However, remote read is not implemented in Thanos Receiver. + queryable = thanosrules.NewPromClientsQueryable(logger, queryClients, promClients, conf.query.httpMethod, conf.query.step, conf.ignoredLabelNames) } else { tsdbDB, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) if err != nil { @@ -495,14 +511,16 @@ func runRule( reg, conf.dataDir, rules.ManagerOptions{ - NotifyFunc: notifyFunc, - Logger: logger, - Appendable: appendable, - ExternalURL: nil, - Queryable: queryable, - ResendDelay: conf.resendDelay, + NotifyFunc: notifyFunc, + Logger: logger, + Appendable: appendable, + ExternalURL: nil, + Queryable: queryable, + ResendDelay: conf.resendDelay, + OutageTolerance: conf.outageTolerance, + ForGracePeriod: conf.forGracePeriod, }, - queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), + queryFuncCreator(logger, queryClients, promClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), conf.lset, // In our case the querying URL is the external URL because in Prometheus // --web.external-url points to it i.e. it points at something where the user @@ -774,24 +792,10 @@ func labelsTSDBToProm(lset labels.Labels) (res labels.Labels) { return res } -func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometheus.Counter, urls []*url.URL) []*url.URL { - set := make(map[string]struct{}) - deduplicated := make([]*url.URL, 0, len(urls)) - for _, u := range urls { - if _, ok := set[u.String()]; ok { - level.Warn(logger).Log("msg", "duplicate query address is provided", "addr", u.String()) - duplicatedQueriers.Inc() - continue - } - deduplicated = append(deduplicated, u) - set[u.String()] = struct{}{} - } - return deduplicated -} - func queryFuncCreator( logger log.Logger, queriers []*httpconfig.Client, + promClients []*promclient.Client, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, httpMethod string, @@ -812,15 +816,10 @@ func queryFuncCreator( panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) } - promClients := make([]*promclient.Client, 0, len(queriers)) - for _, q := range queriers { - promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) - } - return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { for _, i := range rand.Perm(len(queriers)) { promClient := promClients[i] - endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) + endpoints := thanosrules.RemoveDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) for _, i := range rand.Perm(len(endpoints)) { span, ctx := tracing.StartSpan(ctx, spanID) v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ diff --git a/docs/components/rule.md b/docs/components/rule.md index 1badd787e6..bef4f56b11 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -311,6 +311,12 @@ Flags: prefix for the regular Alertmanager API path. --data-dir="data/" data directory --eval-interval=1m The default evaluation interval to use. + --for-grace-period=10m Minimum duration between alert and restored + "for" state. This is maintained only for alerts + with configured "for" time greater than grace + period. + --for-outage-tolerance=1h Max time to tolerate prometheus outage for + restoring "for" state of alert. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -385,6 +391,9 @@ Flags: https://thanos.io/tip/components/rule.md/#configuration. If defined, it takes precedence over the '--query' and '--query.sd-files' flags. + --query.default-step=1s Default range query step to use. This is + only used in stateless Ruler and alert state + restoration. --query.http-method=POST HTTP method to use when sending queries. Possible options: [GET, POST] --query.sd-dns-interval=30s @@ -429,6 +438,10 @@ Flags: https://thanos.io/tip/thanos/logging.md/#configuration --resend-delay=1m Minimum amount of time to wait before resending an alert to Alertmanager. + --restore-ignored-label=RESTORE-IGNORED-LABEL ... + Label names to be ignored when restoring alerts + from the remote storage. This is only used in + stateless mode. --rule-file=rules/ ... Rule files that should be used by rule manager. Can be in glob format (repeated). Note that rules are not automatically detected, diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 22e48b41aa..748ebf189b 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -780,6 +780,29 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin return m.Data.Groups, nil } +// AlertsInGRPC returns the rules from Prometheus alerts API. It uses gRPC errors. +// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. +func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.AlertInstance, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/alerts") + + var m struct { + Data struct { + Alerts []*rulespb.AlertInstance `json:"alerts"` + } `json:"data"` + } + + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { + return nil, err + } + + // Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default. + for _, g := range m.Data.Alerts { + g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } + return m.Data.Alerts, nil +} + // MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors. func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) { u := *base diff --git a/pkg/rules/queryable.go b/pkg/rules/queryable.go new file mode 100644 index 0000000000..ccfc727b75 --- /dev/null +++ b/pkg/rules/queryable.go @@ -0,0 +1,150 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "math/rand" + "net/url" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/thanos/internal/cortex/querier/series" + "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type promClientsQueryable struct { + httpMethod string + step time.Duration + + logger log.Logger + promClients []*promclient.Client + queryClients []*httpconfig.Client + ignoredLabelNames []string + + duplicatedQuery prometheus.Counter +} +type promClientsQuerier struct { + ctx context.Context + mint, maxt int64 + step int64 + httpMethod string + + logger log.Logger + promClients []*promclient.Client + queryClients []*httpconfig.Client + restoreIgnoreLabels []string + + // We use a dummy counter here because the duplicated + // addresses are already tracked by rule evaluation part. + duplicatedQuery prometheus.Counter +} + +// NewPromClientsQueryable creates a queryable that queries queriers from Prometheus clients. +func NewPromClientsQueryable(logger log.Logger, queryClients []*httpconfig.Client, promClients []*promclient.Client, + httpMethod string, step time.Duration, ignoredLabelNames []string) *promClientsQueryable { + return &promClientsQueryable{ + logger: logger, + queryClients: queryClients, + promClients: promClients, + duplicatedQuery: promauto.With(nil).NewCounter(prometheus.CounterOpts{}), + httpMethod: httpMethod, + step: step, + ignoredLabelNames: ignoredLabelNames, + } +} + +// Querier returns a new Querier for the given time range. +func (q *promClientsQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &promClientsQuerier{ + ctx: ctx, + mint: mint, + maxt: maxt, + step: int64(q.step / time.Second), + httpMethod: q.httpMethod, + logger: q.logger, + queryClients: q.queryClients, + promClients: q.promClients, + restoreIgnoreLabels: q.ignoredLabelNames, + }, nil +} + +// Select implements storage.Querier interface. +func (q *promClientsQuerier) Select(_ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + query := storepb.PromMatchersToString(matchers...) + + for _, i := range rand.Perm(len(q.queryClients)) { + promClient := q.promClients[i] + endpoints := RemoveDuplicateQueryEndpoints(q.logger, q.duplicatedQuery, q.queryClients[i].Endpoints()) + for _, i := range rand.Perm(len(endpoints)) { + m, warns, err := promClient.QueryRange(q.ctx, endpoints[i], query, q.mint, q.maxt, q.step, promclient.QueryOptions{ + Deduplicate: true, + Method: q.httpMethod, + }) + + if err != nil { + level.Error(q.logger).Log("err", err, "query", q) + continue + } + if len(warns) > 0 { + level.Warn(q.logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + matrix := make([]*model.SampleStream, 0, m.Len()) + for _, metric := range m { + for _, label := range q.restoreIgnoreLabels { + delete(metric.Metric, model.LabelName(label)) + } + + matrix = append(matrix, &model.SampleStream{ + Metric: metric.Metric, + Values: metric.Values, + }) + } + + return series.MatrixToSeriesSet(matrix) + } + } + return storage.NoopSeriesSet() +} + +// LabelValues implements storage.LabelQuerier interface. +func (q *promClientsQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +// LabelNames implements storage.LabelQuerier interface. +func (q *promClientsQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +// Close implements storage.LabelQuerier interface. +func (q *promClientsQuerier) Close() error { + return nil +} + +// RemoveDuplicateQueryEndpoints removes duplicate endpoints from the list of urls. +func RemoveDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometheus.Counter, urls []*url.URL) []*url.URL { + set := make(map[string]struct{}) + deduplicated := make([]*url.URL, 0, len(urls)) + for _, u := range urls { + if _, ok := set[u.String()]; ok { + level.Warn(logger).Log("msg", "duplicate query address is provided", "addr", u.String()) + duplicatedQueriers.Inc() + continue + } + deduplicated = append(deduplicated, u) + set[u.String()] = struct{}{} + } + return deduplicated +} diff --git a/test/e2e/compatibility_test.go b/test/e2e/compatibility_test.go index 4a11b4f7f9..2599a50d30 100644 --- a/test/e2e/compatibility_test.go +++ b/test/e2e/compatibility_test.go @@ -21,8 +21,10 @@ import ( sdconfig "github.com/efficientgo/e2e/monitoring/promconfig/discovery/config" "github.com/efficientgo/e2e/monitoring/promconfig/discovery/targetgroup" e2eobs "github.com/efficientgo/e2e/observable" + common_cfg "github.com/prometheus/common/config" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/httpconfig" @@ -180,12 +182,13 @@ func TestAlertCompliance(t *testing.T) { t.Skip("This is an interactive test, using https://github.com/prometheus/compliance/tree/main/alert_generator. This tool is not optimized for CI runs (e.g. it infinitely retries, takes 38 minutes)") t.Run("stateful ruler", func(t *testing.T) { - e, err := e2e.NewDockerEnvironment("alert-compatibility") + e, err := e2e.NewDockerEnvironment("alert-compat") testutil.Ok(t, err) t.Cleanup(e.Close) // Start receive + Querier. receive := e2ethanos.NewReceiveBuilder(e, "receive").WithIngestionEnabled().Init() + rwEndpoint := e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")) querierBuilder := e2ethanos.NewQuerierBuilder(e, "query") compliance := e.Runnable("alert_generator_compliance_tester").WithPorts(map[string]int{"http": 8080}).Init(e2e.StartOptions{ @@ -220,12 +223,89 @@ func TestAlertCompliance(t *testing.T) { }) query := querierBuilder. + WithStoreAddresses(receive.InternalEndpoint("grpc"), ruler.InternalEndpoint("grpc")). + // We deduplicate by this, since alert compatibility tool requires clean metric without labels + // attached by receivers. + WithReplicaLabels("receive", "tenant_id"). + Init() + testutil.Ok(t, e2e.StartAndWaitReady(receive, query, ruler, compliance)) + + // Pull rules.yaml: + { + var stdout bytes.Buffer + testutil.Ok(t, compliance.Exec(e2e.NewCommand("cat", "/rules.yaml"), e2e.WithExecOptionStdout(&stdout))) + testutil.Ok(t, os.MkdirAll(filepath.Join(ruler.Dir(), "rules"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(filepath.Join(ruler.Dir(), "rules", "rules.yaml"), stdout.Bytes(), os.ModePerm)) + + // Reload ruler. + resp, err := http.Post("http://"+ruler.Endpoint("http")+"/-/reload", "", nil) + testutil.Ok(t, err) + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + testutil.Equals(t, http.StatusOK, resp.StatusCode) + } + alertCompatCfg := alertCompatConfig(rwEndpoint, query.InternalEndpoint("http"), ruler.InternalEndpoint("http")) + testutil.Ok(t, os.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatCfg), os.ModePerm)) + + fmt.Println(alertCompatCfg) + + testutil.Ok(t, compliance.Exec(e2e.NewCommand( + "/alert_generator_compliance_tester", "-config-file", filepath.Join(compliance.InternalDir(), "test-thanos.yaml")), + )) + }) + + t.Run("stateless ruler", func(t *testing.T) { + e, err := e2e.NewDockerEnvironment("alert-compat") + testutil.Ok(t, err) + t.Cleanup(e.Close) + + // Start receive + Querier. + receive := e2ethanos.NewReceiveBuilder(e, "receive").WithIngestionEnabled().Init() + rwEndpoint := e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")) + rwURL := urlParse(t, rwEndpoint) + rFuture := e2ethanos.NewRulerBuilder(e, "1") + query := e2ethanos.NewQuerierBuilder(e, "query"). WithStoreAddresses(receive.InternalEndpoint("grpc")). - WithRuleAddresses(ruler.InternalEndpoint("grpc")). // We deduplicate by this, since alert compatibility tool requires clean metric without labels // attached by receivers. WithReplicaLabels("receive", "tenant_id"). Init() + + compliance := e.Runnable("alert_generator_compliance_tester").WithPorts(map[string]int{"http": 8080}).Init(e2e.StartOptions{ + Image: "alert_generator_compliance_tester:latest", + Command: e2e.NewCommandRunUntilStop(), + }) + + ruler := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{compliance.InternalEndpoint("http")}, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }). + // Use default resend delay and eval interval, as the compliance spec requires this. + WithResendDelay("1m"). + WithEvalInterval("1m"). + WithReplicaLabel(""). + WithRestoreIgnoredLabels("tenant_id"). + InitStateless(filepath.Join(rFuture.InternalDir(), "rules"), []httpconfig.Config{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + query.InternalEndpoint("http"), + }, + Scheme: "http", + }, + }, + }, []*config.RemoteWriteConfig{ + {URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver"}, + }) + testutil.Ok(t, e2e.StartAndWaitReady(receive, query, ruler, compliance)) // Pull rules.yaml: @@ -244,9 +324,10 @@ func TestAlertCompliance(t *testing.T) { }() testutil.Equals(t, http.StatusOK, resp.StatusCode) } - testutil.Ok(t, os.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatConfig(receive, query)), os.ModePerm)) + alertCompatCfg := alertCompatConfig(rwEndpoint, query.InternalEndpoint("http"), query.InternalEndpoint("http")) + testutil.Ok(t, os.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatCfg), os.ModePerm)) - fmt.Println(alertCompatConfig(receive, query)) + fmt.Println(alertCompatCfg) testutil.Ok(t, compliance.Exec(e2e.NewCommand( "/alert_generator_compliance_tester", "-config-file", filepath.Join(compliance.InternalDir(), "test-thanos.yaml")), @@ -255,14 +336,14 @@ func TestAlertCompliance(t *testing.T) { } // nolint (it's still used in skipped test). -func alertCompatConfig(receive e2e.Runnable, query e2e.Runnable) string { +func alertCompatConfig(remoteWriteURL, queryURL, rulesURL string) string { return fmt.Sprintf(`settings: remote_write_url: '%s' query_base_url: 'http://%s' rules_and_alerts_api_base_url: 'http://%s' alert_reception_server_port: 8080 alert_message_parser: default -`, e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")), query.InternalEndpoint("http"), query.InternalEndpoint("http")) +`, remoteWriteURL, queryURL, rulesURL) } func newQueryFrontendRunnable(e e2e.Environment, name, downstreamURL string) *e2eobs.Observable { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c9f0403dec..258f8f21b4 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -601,11 +601,13 @@ type RulerBuilder struct { f e2e.FutureRunnable - amCfg []alert.AlertmanagerConfig - replicaLabel string - image string - resendDelay string - evalInterval string + amCfg []alert.AlertmanagerConfig + replicaLabel string + image string + resendDelay string + evalInterval string + forGracePeriod string + restoreIgnoredLabels []string } // NewRulerBuilder is a Ruler future that allows extra configuration before initialization. @@ -646,6 +648,16 @@ func (r *RulerBuilder) WithEvalInterval(evalInterval string) *RulerBuilder { return r } +func (r *RulerBuilder) WithForGracePeriod(forGracePeriod string) *RulerBuilder { + r.forGracePeriod = forGracePeriod + return r +} + +func (r *RulerBuilder) WithRestoreIgnoredLabels(labels ...string) *RulerBuilder { + r.restoreIgnoredLabels = labels + return r +} + func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []httpconfig.Config) *e2emon.InstrumentedRunnable { return r.initRule(internalRuleDir, queryCfg, nil) } @@ -685,6 +697,7 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co "--query.config": string(queryCfgBytes), "--query.sd-dns-interval": "1s", "--resend-delay": "5s", + "--for-grace-period": "1s", } if r.replicaLabel != "" { ruleArgs["--label"] = fmt.Sprintf(`%s="%s"`, replicaLabel, r.replicaLabel) @@ -698,6 +711,10 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co ruleArgs["--eval-interval"] = r.evalInterval } + if r.forGracePeriod != "" { + ruleArgs["--for-grace-period"] = r.forGracePeriod + } + if remoteWriteCfg != nil { rwCfgBytes, err := yaml.Marshal(struct { RemoteWriteConfigs []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"` @@ -708,9 +725,15 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co ruleArgs["--remote-write.config"] = string(rwCfgBytes) } + args := e2e.BuildArgs(ruleArgs) + + for _, label := range r.restoreIgnoredLabels { + args = append(args, "--restore-ignored-label="+label) + } + return e2emon.AsInstrumented(r.f.Init(wrapWithDefaults(e2e.StartOptions{ Image: r.image, - Command: e2e.NewCommand("rule", e2e.BuildArgs(ruleArgs)...), + Command: e2e.NewCommand("rule", args...), Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), })), "http") } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 8835e04d9a..2fcb24d83f 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -124,6 +124,22 @@ groups: - record: test_absent_metric expr: absent(nonexistent{job='thanos-receive'}) ` + + testAlertRuleHoldDuration = ` +groups: +- name: example_rule_hold_duration + interval: 1s + rules: + - alert: TestAlert_RuleHoldDuration + # It must be based on actual metric, otherwise call to StoreAPI would be not involved. + expr: absent(some_metric) + for: 2s + labels: + severity: page + annotations: + summary: "I always complain and allow partial response in query." +` + amTimeout = model.Duration(10 * time.Second) ) @@ -586,6 +602,125 @@ func TestRule_CanRemoteWriteData(t *testing.T) { }) } +func TestStatelessRulerAlertStateRestore(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("stateless-state") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + am := e2ethanos.NewAlertmanager(e, "1") + testutil.Ok(t, e2e.StartAndWaitReady(am)) + + receiver := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(receiver)) + rwURL := urlParse(t, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write"))) + + q := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc")). + WithReplicaLabels("replica", "receive").Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + rulesSubDir := "rules" + var rulers []*e2emon.InstrumentedRunnable + for i := 1; i <= 2; i++ { + rFuture := e2ethanos.NewRulerBuilder(e, fmt.Sprintf("%d", i)) + rulesPath := filepath.Join(rFuture.Dir(), rulesSubDir) + testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) + for i, rule := range []string{testAlertRuleHoldDuration} { + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule) + } + r := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + am.InternalEndpoint("http"), + }, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }).WithForGracePeriod("500ms"). + WithRestoreIgnoredLabels("tenant_id"). + InitStateless(filepath.Join(rFuture.InternalDir(), rulesSubDir), []httpconfig.Config{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + q.InternalEndpoint("http"), + }, + Scheme: "http", + }, + }, + }, []*config.RemoteWriteConfig{ + {URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver"}, + }) + rulers = append(rulers, r) + } + + // Start the ruler 1 first. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[0])) + + // Wait until the alert firing and ALERTS_FOR_STATE + // series has been written to receiver successfully. + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return "ALERTS_FOR_STATE" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, []model.Metric{ + { + "__name__": "ALERTS_FOR_STATE", + "alertname": "TestAlert_RuleHoldDuration", + "severity": "page", + "tenant_id": "default-tenant", + }, + }) + + var alerts []*rulespb.AlertInstance + client := promclient.NewDefaultClient() + err = runutil.Retry(time.Second*1, ctx.Done(), func() error { + alerts, err = client.AlertsInGRPC(ctx, urlParse(t, "http://"+rulers[0].Endpoint("http"))) + testutil.Ok(t, err) + if len(alerts) > 0 { + if alerts[0].State == rulespb.AlertState_FIRING { + return nil + } + } + return fmt.Errorf("alert is not firing") + }) + testutil.Ok(t, err) + // Record the alert active time. + alertActiveAt := alerts[0].ActiveAt + testutil.Ok(t, rulers[0].Stop()) + + // Start the ruler 2 now and ruler 2 should be able + // to restore the firing alert state. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[1])) + + // Wait for 4 rule evaluation iterations to make sure the alert state is restored. + testutil.Ok(t, rulers[1].WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(4), []string{"prometheus_rule_group_iterations_total"}, e2emon.WaitMissingMetrics())) + + // Wait until the alert is firing on the second ruler. + err = runutil.Retry(time.Second*1, ctx.Done(), func() error { + alerts, err = client.AlertsInGRPC(ctx, urlParse(t, "http://"+rulers[1].Endpoint("http"))) + testutil.Ok(t, err) + if len(alerts) > 0 { + if alerts[0].State == rulespb.AlertState_FIRING { + // The second ruler alert's active at time is the same as the previous one, + // which means the alert state is restored successfully. + if alertActiveAt.Unix() == alerts[0].ActiveAt.Unix() { + return nil + } else { + return fmt.Errorf("alert active time is not restored") + } + } + } + return fmt.Errorf("alert is not firing") + }) + testutil.Ok(t, err) +} + // TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations // which couldn't be sent to the remote write endpoint (e.g because receiver isn't available). func TestRule_CanPersistWALData(t *testing.T) {