From 55aaf765823819805422884ef4ef85f4bae04b57 Mon Sep 17 00:00:00 2001 From: David Tsur Date: Mon, 4 Mar 2019 16:19:17 +0200 Subject: [PATCH] Rule component: Adding new API end point for rules and alerts in (#851) * Adding new API end point for rules and alerts * reusing methods in query api * Adding unitest for rule API (similar to prometheus rule/alert API end point unitest) * Adding SetCORS * Removing comments * Related to issue #850 --- benchmark/cmd/thanosbench/resources.go | 5 +- cmd/thanos/rule.go | 5 + pkg/query/api/v1.go | 124 ++++++------- pkg/query/api/v1_test.go | 12 +- pkg/rule/api/v1.go | 194 ++++++++++++++++++++ pkg/rule/api/v1_test.go | 240 +++++++++++++++++++++++++ 6 files changed, 510 insertions(+), 70 deletions(-) create mode 100644 pkg/rule/api/v1.go create mode 100644 pkg/rule/api/v1_test.go diff --git a/benchmark/cmd/thanosbench/resources.go b/benchmark/cmd/thanosbench/resources.go index 063c4e6607..f9da1c7ac0 100644 --- a/benchmark/cmd/thanosbench/resources.go +++ b/benchmark/cmd/thanosbench/resources.go @@ -12,6 +12,7 @@ import ( prom "github.com/prometheus/prometheus/config" "gopkg.in/yaml.v2" appsv1 "k8s.io/api/apps/v1" + "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -207,7 +208,7 @@ func createPrometheus(opts *opts, name string, bucket string) *appsv1.StatefulSe Name: name, Namespace: promNamespace, Labels: map[string]string{ - "app": name, + "app": name, "thanos-gossip-member": "true", }, } @@ -370,7 +371,7 @@ func createThanosQuery(opts *opts) (*v1.Service, *v1.Pod) { Name: "thanos-query", Namespace: thanosNamespace, Labels: map[string]string{ - "app": "thanos-query", + "app": "thanos-query", "thanos-gossip-member": "true", }, } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 612e1cf93e..a3847105b4 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "os/signal" + "path" "path/filepath" "sort" "strconv" @@ -29,6 +30,7 @@ import ( "github.com/improbable-eng/thanos/pkg/extprom" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/promclient" + "github.com/improbable-eng/thanos/pkg/rule/api" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/shipper" "github.com/improbable-eng/thanos/pkg/store" @@ -565,6 +567,9 @@ func runRule( ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix)) + api := v1.NewAPI(logger, mgr) + api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger) + mux := http.NewServeMux() registerMetrics(mux, reg) registerProfile(mux) diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index cf693dd2fb..61c1603b57 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -50,15 +50,15 @@ const ( statusError = "error" ) -type errorType string +type ErrorType string const ( - errorNone errorType = "" + errorNone ErrorType = "" errorTimeout = "timeout" errorCanceled = "canceled" errorExec = "execution" errorBadData = "bad_data" - errorInternal = "internal" + ErrorInternal = "internal" ) var corsHeaders = map[string]string{ @@ -68,31 +68,31 @@ var corsHeaders = map[string]string{ "Access-Control-Expose-Headers": "Date", } -type apiError struct { - typ errorType - err error +type ApiError struct { + Typ ErrorType + Err error } -func (e *apiError) Error() string { - return fmt.Sprintf("%s: %s", e.typ, e.err) +func (e *ApiError) Error() string { + return fmt.Sprintf("%s: %s", e.Typ, e.Err) } type response struct { Status status `json:"status"` Data interface{} `json:"data,omitempty"` - ErrorType errorType `json:"errorType,omitempty"` + ErrorType ErrorType `json:"ErrorType,omitempty"` Error string `json:"error,omitempty"` Warnings []string `json:"warnings,omitempty"` } // Enables cross-site script calls. -func setCORS(w http.ResponseWriter) { +func SetCORS(w http.ResponseWriter) { for h, v := range corsHeaders { w.Header().Set(h, v) } } -type apiFunc func(r *http.Request) (interface{}, []error, *apiError) +type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError) // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. @@ -151,13 +151,13 @@ func NewAPI( // Register the API's endpoints in the given router. func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger) { - instr := func(name string, f apiFunc) http.HandlerFunc { + instr := func(name string, f ApiFunc) http.HandlerFunc { hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - setCORS(w) + SetCORS(w) if data, warnings, err := f(r); err != nil { - respondError(w, err, data) + RespondError(w, err, data) } else if data != nil { - respond(w, data, warnings) + Respond(w, data, warnings) } else { w.WriteHeader(http.StatusNoContent) } @@ -183,7 +183,7 @@ type queryData struct { Warnings []error `json:"warnings,omitempty"` } -func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *apiError) { +func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *ApiError) { const dedupParam = "dedup" enableDeduplication = true @@ -191,13 +191,13 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool var err error enableDeduplication, err = strconv.ParseBool(val) if err != nil { - return false, &apiError{errorBadData, errors.Wrapf(err, "'%s' parameter", dedupParam)} + return false, &ApiError{errorBadData, errors.Wrapf(err, "'%s' parameter", dedupParam)} } } return enableDeduplication, nil } -func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (maxSourceResolution time.Duration, _ *apiError) { +func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (maxSourceResolution time.Duration, _ *ApiError) { const maxSourceResolutionParam = "max_source_resolution" maxSourceResolution = 0 * time.Second @@ -209,18 +209,18 @@ func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (max var err error maxSourceResolution, err = parseDuration(val) if err != nil { - return 0, &apiError{errorBadData, errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)} + return 0, &ApiError{errorBadData, errors.Wrapf(err, "'%s' parameter", maxSourceResolutionParam)} } } if maxSourceResolution < 0 { - return 0, &apiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)} + return 0, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)} } return maxSourceResolution, nil } -func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *apiError) { +func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *ApiError) { const partialResponseParam = "partial_response" enablePartialResponse = api.enablePartialResponse @@ -228,23 +228,23 @@ func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialRespons var err error enablePartialResponse, err = strconv.ParseBool(val) if err != nil { - return false, &apiError{errorBadData, errors.Wrapf(err, "'%s' parameter", partialResponseParam)} + return false, &ApiError{errorBadData, errors.Wrapf(err, "'%s' parameter", partialResponseParam)} } } return enablePartialResponse, nil } -func (api *API) options(r *http.Request) (interface{}, []error, *apiError) { +func (api *API) options(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, nil } -func (api *API) query(r *http.Request) (interface{}, []error, *apiError) { +func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) { var ts time.Time if t := r.FormValue("time"); t != "" { var err error ts, err = parseTime(t) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } } else { ts = api.now() @@ -255,7 +255,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -289,20 +289,20 @@ func (api *API) query(r *http.Request) (interface{}, []error, *apiError) { begin := api.now() qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, nil, &apiError{errorCanceled, res.Err} + return nil, nil, &ApiError{errorCanceled, res.Err} case promql.ErrQueryTimeout: - return nil, nil, &apiError{errorTimeout, res.Err} + return nil, nil, &ApiError{errorTimeout, res.Err} case promql.ErrStorage: - return nil, nil, &apiError{errorInternal, res.Err} + return nil, nil, &ApiError{ErrorInternal, res.Err} } - return nil, nil, &apiError{errorExec, res.Err} + return nil, nil, &ApiError{errorExec, res.Err} } api.instantQueryDuration.Observe(time.Since(begin).Seconds()) @@ -312,35 +312,35 @@ func (api *API) query(r *http.Request) (interface{}, []error, *apiError) { }, warnings, nil } -func (api *API) queryRange(r *http.Request) (interface{}, []error, *apiError) { +func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { start, err := parseTime(r.FormValue("start")) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } end, err := parseTime(r.FormValue("end")) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } if end.Before(start) { err := errors.New("end timestamp must not be before start time") - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } step, err := parseDuration(r.FormValue("step")) if err != nil { - return nil, nil, &apiError{errorBadData, errors.Wrap(err, "param step")} + return nil, nil, &ApiError{errorBadData, errors.Wrap(err, "param step")} } if step <= 0 { err := errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer") - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if end.Sub(start)/step > 11000 { err := errors.Errorf("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } ctx := r.Context() @@ -348,7 +348,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *apiError) { var cancel context.CancelFunc timeout, err := parseDuration(to) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } ctx, cancel = context.WithTimeout(ctx, timeout) @@ -393,18 +393,18 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *apiError) { step, ) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } res := qry.Exec(ctx) if res.Err != nil { switch res.Err.(type) { case promql.ErrQueryCanceled: - return nil, nil, &apiError{errorCanceled, res.Err} + return nil, nil, &ApiError{errorCanceled, res.Err} case promql.ErrQueryTimeout: - return nil, nil, &apiError{errorTimeout, res.Err} + return nil, nil, &ApiError{errorTimeout, res.Err} } - return nil, nil, &apiError{errorExec, res.Err} + return nil, nil, &ApiError{errorExec, res.Err} } api.rangeQueryDuration.Observe(time.Since(begin).Seconds()) @@ -414,12 +414,12 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *apiError) { }, warnings, nil } -func (api *API) labelValues(r *http.Request) (interface{}, []error, *apiError) { +func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) { ctx := r.Context() name := route.Param(ctx, "name") if !model.LabelNameRE.MatchString(name) { - return nil, nil, &apiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} + return nil, nil, &ApiError{errorBadData, fmt.Errorf("invalid label name: %q", name)} } enablePartialResponse, apiErr := api.parsePartialResponseParam(r) @@ -439,7 +439,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *apiError) { q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64) if err != nil { - return nil, nil, &apiError{errorExec, err} + return nil, nil, &ApiError{errorExec, err} } defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelValues") @@ -447,7 +447,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *apiError) { vals, err := q.LabelValues(name) if err != nil { - return nil, nil, &apiError{errorExec, err} + return nil, nil, &ApiError{errorExec, err} } return vals, warnings, nil @@ -458,13 +458,13 @@ var ( maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999) ) -func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { +func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) { if err := r.ParseForm(); err != nil { - return nil, nil, &apiError{errorInternal, errors.Wrap(err, "parse form")} + return nil, nil, &ApiError{ErrorInternal, errors.Wrap(err, "parse form")} } if len(r.Form["match[]"]) == 0 { - return nil, nil, &apiError{errorBadData, fmt.Errorf("no match[] parameter provided")} + return nil, nil, &ApiError{errorBadData, fmt.Errorf("no match[] parameter provided")} } var start time.Time @@ -472,7 +472,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var err error start, err = parseTime(t) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } } else { start = minTime @@ -483,7 +483,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { var err error end, err = parseTime(t) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } } else { end = maxTime @@ -493,7 +493,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { for _, s := range r.Form["match[]"] { matchers, err := promql.ParseMetricSelector(s) if err != nil { - return nil, nil, &apiError{errorBadData, err} + return nil, nil, &ApiError{errorBadData, err} } matcherSets = append(matcherSets, matchers) } @@ -521,7 +521,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { // TODO(bwplotka): Support downsampling? q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { - return nil, nil, &apiError{errorExec, err} + return nil, nil, &ApiError{errorExec, err} } defer runutil.CloseWithLogOnErr(api.logger, q, "queryable series") @@ -529,7 +529,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { for _, mset := range matcherSets { s, _, err := q.Select(&storage.SelectParams{}, mset...) if err != nil { - return nil, nil, &apiError{errorExec, err} + return nil, nil, &ApiError{errorExec, err} } sets = append(sets, s) } @@ -541,13 +541,13 @@ func (api *API) series(r *http.Request) (interface{}, []error, *apiError) { metrics = append(metrics, set.At().Labels()) } if set.Err() != nil { - return nil, nil, &apiError{errorExec, set.Err()} + return nil, nil, &ApiError{errorExec, set.Err()} } return metrics, warnings, nil } -func respond(w http.ResponseWriter, data interface{}, warnings []error) { +func Respond(w http.ResponseWriter, data interface{}, warnings []error) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -561,18 +561,18 @@ func respond(w http.ResponseWriter, data interface{}, warnings []error) { _ = json.NewEncoder(w).Encode(resp) } -func respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) { +func RespondError(w http.ResponseWriter, apiErr *ApiError, data interface{}) { w.Header().Set("Content-Type", "application/json") var code int - switch apiErr.typ { + switch apiErr.Typ { case errorBadData: code = http.StatusBadRequest case errorExec: code = 422 case errorCanceled, errorTimeout: code = http.StatusServiceUnavailable - case errorInternal: + case ErrorInternal: code = http.StatusInternalServerError default: code = http.StatusInternalServerError @@ -581,8 +581,8 @@ func respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) { _ = json.NewEncoder(w).Encode(&response{ Status: statusError, - ErrorType: apiErr.typ, - Error: apiErr.err.Error(), + ErrorType: apiErr.Typ, + Error: apiErr.Err.Error(), Data: data, }) } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index bcf17907b0..ca5dee2013 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -77,11 +77,11 @@ func TestEndpoints(t *testing.T) { start := time.Unix(0, 0) var tests = []struct { - endpoint apiFunc + endpoint ApiFunc params map[string]string query url.Values response interface{} - errType errorType + errType ErrorType }{ { endpoint: api.query, @@ -425,8 +425,8 @@ func TestEndpoints(t *testing.T) { if test.errType == errorNone { t.Fatalf("Unexpected error: %s", apiErr) } - if test.errType != apiErr.typ { - t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.typ) + if test.errType != apiErr.Typ { + t.Fatalf("Expected error of type %q but got type %q", test.errType, apiErr.Typ) } return } @@ -446,7 +446,7 @@ func TestEndpoints(t *testing.T) { func TestRespondSuccess(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - respond(w, "test", nil) + Respond(w, "test", nil) })) defer s.Close() @@ -483,7 +483,7 @@ func TestRespondSuccess(t *testing.T) { func TestRespondError(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - respondError(w, &apiError{errorTimeout, errors.New("message")}, "test") + RespondError(w, &ApiError{errorTimeout, errors.New("message")}, "test") })) defer s.Close() diff --git a/pkg/rule/api/v1.go b/pkg/rule/api/v1.go new file mode 100644 index 0000000000..be2dc5abf4 --- /dev/null +++ b/pkg/rule/api/v1.go @@ -0,0 +1,194 @@ +package v1 + +import ( + "fmt" + "net/http" + "time" + + "github.com/NYTimes/gziphandler" + qapi "github.com/improbable-eng/thanos/pkg/query/api" + "github.com/improbable-eng/thanos/pkg/tracing" + "github.com/prometheus/client_golang/prometheus" + + "github.com/go-kit/kit/log" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/rules" +) + +type API struct { + logger log.Logger + now func() time.Time + rulesRetriever rulesRetriever +} + +func NewAPI( + logger log.Logger, + rr rulesRetriever, +) *API { + return &API{ + logger: logger, + now: time.Now, + rulesRetriever: rr, + } +} + +func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger) { + instr := func(name string, f qapi.ApiFunc) http.HandlerFunc { + hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + qapi.SetCORS(w) + if data, warnings, err := f(r); err != nil { + qapi.RespondError(w, err, data) + } else if data != nil { + qapi.Respond(w, data, warnings) + } else { + w.WriteHeader(http.StatusNoContent) + } + }) + return prometheus.InstrumentHandler(name, tracing.HTTPMiddleware(tracer, name, logger, gziphandler.GzipHandler(hf))) + } + + r.Get("/alerts", instr("alerts", api.alerts)) + r.Get("/rules", instr("rules", api.rules)) + +} + +type rulesRetriever interface { + RuleGroups() []*rules.Group + AlertingRules() []*rules.AlertingRule +} + +func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { + ruleGroups := api.rulesRetriever.RuleGroups() + res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))} + for i, grp := range ruleGroups { + apiRuleGroup := &RuleGroup{ + Name: grp.Name(), + File: grp.File(), + Interval: grp.Interval().Seconds(), + Rules: []rule{}, + } + + for _, r := range grp.Rules() { + var enrichedRule rule + + lastError := "" + if r.LastError() != nil { + lastError = r.LastError().Error() + } + + switch rule := r.(type) { + case *rules.AlertingRule: + enrichedRule = alertingRule{ + Name: rule.Name(), + Query: rule.Query().String(), + Duration: rule.Duration().Seconds(), + Labels: rule.Labels(), + Annotations: rule.Annotations(), + Alerts: rulesAlertsToAPIAlerts(rule.ActiveAlerts()), + Health: rule.Health(), + LastError: lastError, + Type: "alerting", + } + case *rules.RecordingRule: + enrichedRule = recordingRule{ + Name: rule.Name(), + Query: rule.Query().String(), + Labels: rule.Labels(), + Health: rule.Health(), + LastError: lastError, + Type: "recording", + } + default: + err := fmt.Errorf("failed to assert type of rule '%v'", rule.Name()) + return nil, nil, &qapi.ApiError{qapi.ErrorInternal, err} + } + + apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) + } + res.RuleGroups[i] = apiRuleGroup + } + return res, nil, nil +} + +func (api *API) alerts(r *http.Request) (interface{}, []error, *qapi.ApiError) { + alertingRules := api.rulesRetriever.AlertingRules() + alerts := []*Alert{} + + for _, alertingRule := range alertingRules { + alerts = append( + alerts, + rulesAlertsToAPIAlerts(alertingRule.ActiveAlerts())..., + ) + } + + res := &AlertDiscovery{Alerts: alerts} + + return res, nil, nil +} + +type AlertDiscovery struct { + Alerts []*Alert `json:"alerts"` +} + +type Alert struct { + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + State string `json:"state"` + ActiveAt *time.Time `json:"activeAt,omitempty"` + Value float64 `json:"value"` +} + +func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert { + apiAlerts := make([]*Alert, len(rulesAlerts)) + for i, ruleAlert := range rulesAlerts { + apiAlerts[i] = &Alert{ + Labels: ruleAlert.Labels, + Annotations: ruleAlert.Annotations, + State: ruleAlert.State.String(), + ActiveAt: &ruleAlert.ActiveAt, + Value: ruleAlert.Value, + } + } + + return apiAlerts +} + +type RuleDiscovery struct { + RuleGroups []*RuleGroup `json:"groups"` +} + +type RuleGroup struct { + Name string `json:"name"` + File string `json:"file"` + // In order to preserve rule ordering, while exposing type (alerting or recording) + // specific properties, both alerting and recording rules are exposed in the + // same array. + Rules []rule `json:"rules"` + Interval float64 `json:"interval"` +} + +type rule interface{} + +type alertingRule struct { + Name string `json:"name"` + Query string `json:"query"` + Duration float64 `json:"duration"` + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + Alerts []*Alert `json:"alerts"` + Health rules.RuleHealth `json:"health"` + LastError string `json:"lastError,omitempty"` + Type string `json:"type"` +} + +type recordingRule struct { + Name string `json:"name"` + Query string `json:"query"` + Labels labels.Labels `json:"labels,omitempty"` + Health rules.RuleHealth `json:"health"` + LastError string `json:"lastError,omitempty"` + // Type of a recordingRule is always "recording". + Type string `json:"type"` +} diff --git a/pkg/rule/api/v1_test.go b/pkg/rule/api/v1_test.go new file mode 100644 index 0000000000..af4034bd8d --- /dev/null +++ b/pkg/rule/api/v1_test.go @@ -0,0 +1,240 @@ +package v1 + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "reflect" + "testing" + "time" + + "github.com/go-kit/kit/log" + qapi "github.com/improbable-eng/thanos/pkg/query/api" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/util/testutil" +) + +type rulesRetrieverMock struct { + testing *testing.T +} + +func (m rulesRetrieverMock) RuleGroups() []*rules.Group { + var ar rulesRetrieverMock + arules := ar.AlertingRules() + storage := testutil.NewStorage(m.testing) + //defer storage.Close() + + engineOpts := promql.EngineOpts{ + Logger: nil, + Reg: nil, + MaxConcurrent: 10, + MaxSamples: 10, + Timeout: 100 * time.Second, + } + + engine := promql.NewEngine(engineOpts) + opts := &rules.ManagerOptions{ + QueryFunc: rules.EngineQueryFunc(engine, storage), + Appendable: storage, + Context: context.Background(), + Logger: log.NewNopLogger(), + } + + var r []rules.Rule + + for _, alertrule := range arules { + r = append(r, alertrule) + } + + recordingExpr, err := promql.ParseExpr(`vector(1)`) + if err != nil { + m.testing.Fatalf("unable to parse alert expression: %s", err) + } + recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) + r = append(r, recordingRule) + + group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) + return []*rules.Group{group} +} + +func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { + expr1, err := promql.ParseExpr(`absent(test_metric3) != 1`) + if err != nil { + m.testing.Fatalf("unable to parse alert expression: %s", err) + } + expr2, err := promql.ParseExpr(`up == 1`) + if err != nil { + m.testing.Fatalf("Unable to parse alert expression: %s", err) + } + + rule1 := rules.NewAlertingRule( + "test_metric3", + expr1, + time.Second, + labels.Labels{}, + labels.Labels{}, + true, + log.NewNopLogger(), + ) + rule2 := rules.NewAlertingRule( + "test_metric4", + expr2, + time.Second, + labels.Labels{}, + labels.Labels{}, + true, + log.NewNopLogger(), + ) + var r []*rules.AlertingRule + r = append(r, rule1) + r = append(r, rule2) + return r +} + +func TestEndpoints(t *testing.T) { + suite, err := promql.NewTest(t, ` + load 1m + test_metric1{foo="bar"} 0+100x100 + test_metric1{foo="boo"} 1+0x100 + test_metric2{foo="boo"} 1+0x100 + `) + if err != nil { + t.Fatal(err) + } + defer suite.Close() + + if err := suite.Run(); err != nil { + t.Fatal(err) + } + + var algr rulesRetrieverMock + algr.testing = t + algr.AlertingRules() + algr.RuleGroups() + + t.Run("local", func(t *testing.T) { + var algr rulesRetrieverMock + algr.testing = t + algr.AlertingRules() + algr.RuleGroups() + api := NewAPI(nil, algr) + testEndpoints(t, api) + }) +} + +func testEndpoints(t *testing.T, api *API) { + + type test struct { + endpoint qapi.ApiFunc + params map[string]string + query url.Values + response interface{} + errType qapi.ErrorType + } + var tests = []test{ + { + endpoint: api.rules, + response: &RuleDiscovery{ + RuleGroups: []*RuleGroup{ + { + Name: "grp", + File: "/path/to/file", + Interval: 1, + Rules: []rule{ + alertingRule{ + Name: "test_metric3", + Query: "absent(test_metric3) != 1", + Duration: 1, + Labels: labels.Labels{}, + Annotations: labels.Labels{}, + Alerts: []*Alert{}, + Health: "unknown", + Type: "alerting", + }, + alertingRule{ + Name: "test_metric4", + Query: "up == 1", + Duration: 1, + Labels: labels.Labels{}, + Annotations: labels.Labels{}, + Alerts: []*Alert{}, + Health: "unknown", + Type: "alerting", + }, + recordingRule{ + Name: "recording-rule-1", + Query: "vector(1)", + Labels: labels.Labels{}, + Health: "unknown", + Type: "recording", + }, + }, + }, + }, + }, + }, + } + + methods := func(f qapi.ApiFunc) []string { + return []string{http.MethodGet} + } + + request := func(m string, q url.Values) (*http.Request, error) { + return http.NewRequest(m, fmt.Sprintf("http://example.com?%s", q.Encode()), nil) + } + for i, test := range tests { + for _, method := range methods(test.endpoint) { + // Build a context with the correct request params. + ctx := context.Background() + for p, v := range test.params { + ctx = route.WithParam(ctx, p, v) + } + t.Logf("run %d\t%s\t%q", i, method, test.query.Encode()) + + req, err := request(method, test.query) + if err != nil { + t.Fatal(err) + } + endpoint, errors, apiError := test.endpoint(req.WithContext(ctx)) + + if errors != nil { + t.Fatalf("Unexpected errors: %s", errors) + return + } + assertAPIError(t, apiError) + assertAPIResponse(t, endpoint, test.response) + } + } +} + +func assertAPIError(t *testing.T, got *qapi.ApiError) { + if got != nil { + t.Fatalf("Unexpected error: %s", got) + return + } +} + +func assertAPIResponse(t *testing.T, got interface{}, exp interface{}) { + if !reflect.DeepEqual(exp, got) { + respJSON, err := json.Marshal(got) + if err != nil { + t.Fatalf("failed to marshal response as JSON: %v", err.Error()) + } + + expectedRespJSON, err := json.Marshal(exp) + if err != nil { + t.Fatalf("failed to marshal expected response as JSON: %v", err.Error()) + } + + t.Fatalf( + "Response does not match, expected:\n%+v\ngot:\n%+v", + string(expectedRespJSON), + string(respJSON), + ) + } +}