diff --git a/src/integration/resources/coordinator_client.go b/src/integration/resources/coordinator_client.go index 5a58eeb2ee..b4567ba85c 100644 --- a/src/integration/resources/coordinator_client.go +++ b/src/integration/resources/coordinator_client.go @@ -29,6 +29,7 @@ import ( "io/ioutil" "net" "net/http" + "path" "strings" "time" @@ -317,7 +318,22 @@ func (c *CoordinatorClient) WaitForClusterReady() error { logger.Error("failed checking cluster readiness", zap.Error(err)) return err } - resp.Body.Close() + defer resp.Body.Close() + + if resp.StatusCode != 200 { + err = errors.New("non-200 status code received") + + body, rerr := ioutil.ReadAll(resp.Body) + if rerr != nil { + logger.Warn("failed parse response body", zap.Error(rerr)) + body = []byte("") + } + + logger.Error("failed to check cluster readiness", zap.Error(err), + zap.String("responseBody", string(body)), + ) + return err + } logger.Info("cluster ready to receive reads and writes") @@ -715,8 +731,8 @@ func (c *CoordinatorClient) query( } // InstantQuery runs an instant query with provided headers -func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers map[string][]string) (model.Vector, error) { - queryStr := fmt.Sprintf("%s?query=%s", route.QueryURL, req.QueryExpr) +func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers Headers) (model.Vector, error) { + queryStr := fmt.Sprintf("%s?query=%s", route.QueryURL, req.Query) if req.Time != nil { queryStr = fmt.Sprintf("%s&time=%d", queryStr, req.Time.Unix()) } @@ -745,21 +761,21 @@ type vectorResult struct { } // RangeQuery runs a range query with provided headers -func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string][]string) (model.Matrix, error) { - if req.StartTime.IsZero() { - req.StartTime = time.Now() +func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) { + if req.Start.IsZero() { + req.Start = time.Now() } - if req.EndTime.IsZero() { - req.EndTime = time.Now() + if req.End.IsZero() { + req.End = time.Now() } if req.Step == 0 { req.Step = 15 * time.Second // default step is 15 seconds. } queryStr := fmt.Sprintf( "%s?query=%s&start=%d&end=%d&step=%f", - route.QueryRangeURL, req.QueryExpr, - req.StartTime.Unix(), - req.EndTime.Unix(), + route.QueryRangeURL, req.Query, + req.Start.Unix(), + req.End.Unix(), req.Step.Seconds(), ) @@ -776,6 +792,84 @@ func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string return parsedResp.Data.Result, nil } +// LabelNames return matching label names based on the request. +func (c *CoordinatorClient) LabelNames( + req LabelNamesRequest, + headers Headers, +) (model.LabelNames, error) { + urlPathAndQuery := fmt.Sprintf("%s?%s", route.LabelNamesURL, req.String()) + resp, err := c.runQuery(urlPathAndQuery, headers) + if err != nil { + return nil, err + } + + var parsedResp labelResponse + if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil { + return nil, err + } + + labelNames := make(model.LabelNames, 0, len(parsedResp.Data)) + for _, label := range parsedResp.Data { + labelNames = append(labelNames, model.LabelName(label)) + } + + return labelNames, nil +} + +// LabelValues return matching label values based on the request. +func (c *CoordinatorClient) LabelValues( + req LabelValuesRequest, + headers Headers, +) (model.LabelValues, error) { + urlPathAndQuery := fmt.Sprintf("%s?%s", + path.Join(route.Prefix, "label", req.LabelName, "values"), + req.String()) + resp, err := c.runQuery(urlPathAndQuery, headers) + if err != nil { + return nil, err + } + + var parsedResp labelResponse + if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil { + return nil, err + } + + labelValues := make(model.LabelValues, 0, len(parsedResp.Data)) + for _, label := range parsedResp.Data { + labelValues = append(labelValues, model.LabelValue(label)) + } + + return labelValues, nil +} + +// Series returns matching series based on the request. +func (c *CoordinatorClient) Series( + req SeriesRequest, + headers Headers, +) ([]model.Metric, error) { + urlPathAndQuery := fmt.Sprintf("%s?%s", route.SeriesMatchURL, req.String()) + resp, err := c.runQuery(urlPathAndQuery, headers) + if err != nil { + return nil, err + } + + var parsedResp seriesResponse + if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil { + return nil, err + } + + series := make([]model.Metric, 0, len(parsedResp.Data)) + for _, labels := range parsedResp.Data { + labelSet := make(model.LabelSet) + for name, val := range labels { + labelSet[model.LabelName(name)] = model.LabelValue(val) + } + series = append(series, model.Metric(labelSet)) + } + + return series, nil +} + type jsonRangeQueryResponse struct { Status string Data matrixResult @@ -786,6 +880,16 @@ type matrixResult struct { Result model.Matrix } +type labelResponse struct { + Status string + Data []string +} + +type seriesResponse struct { + Status string + Data []map[string]string +} + func (c *CoordinatorClient) runQuery( query string, headers map[string][]string, ) (string, error) { diff --git a/src/integration/resources/docker/coordinator.go b/src/integration/resources/docker/coordinator.go index ab0a5ef577..4649101e37 100644 --- a/src/integration/resources/docker/coordinator.go +++ b/src/integration/resources/docker/coordinator.go @@ -222,7 +222,9 @@ func (c *coordinator) ApplyKVUpdate(update string) error { } func (c *coordinator) RunQuery( - verifier resources.ResponseVerifier, query string, headers map[string][]string, + verifier resources.ResponseVerifier, + query string, + headers resources.Headers, ) error { if c.resource.closed { return errClosed @@ -233,7 +235,7 @@ func (c *coordinator) RunQuery( func (c *coordinator) InstantQuery( req resources.QueryRequest, - headers map[string][]string, + headers resources.Headers, ) (model.Vector, error) { if c.resource.closed { return nil, errClosed @@ -244,7 +246,7 @@ func (c *coordinator) InstantQuery( // RangeQuery runs a range query with provided headers func (c *coordinator) RangeQuery( req resources.RangeQueryRequest, - headers map[string][]string, + headers resources.Headers, ) (model.Matrix, error) { if c.resource.closed { return nil, errClosed @@ -252,6 +254,39 @@ func (c *coordinator) RangeQuery( return c.client.RangeQuery(req, headers) } +// LabelNames return matching label names based on the request. +func (c *coordinator) LabelNames( + req resources.LabelNamesRequest, + headers resources.Headers, +) (model.LabelNames, error) { + if c.resource.closed { + return nil, errClosed + } + return c.client.LabelNames(req, headers) +} + +// LabelValues returns matching label values based on the request. +func (c *coordinator) LabelValues( + req resources.LabelValuesRequest, + headers resources.Headers, +) (model.LabelValues, error) { + if c.resource.closed { + return nil, errClosed + } + return c.client.LabelValues(req, headers) +} + +// Series returns matching series based on the request. +func (c *coordinator) Series( + req resources.SeriesRequest, + headers resources.Headers, +) ([]model.Metric, error) { + if c.resource.closed { + return nil, errClosed + } + return c.client.Series(req, headers) +} + func (c *coordinator) Close() error { if c.resource.closed { return errClosed diff --git a/src/integration/resources/inprocess/aggregator_test.go b/src/integration/resources/inprocess/aggregator_test.go index da7323dcf9..41e412dd8c 100644 --- a/src/integration/resources/inprocess/aggregator_test.go +++ b/src/integration/resources/inprocess/aggregator_test.go @@ -291,11 +291,11 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) { return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples) })) - queryHeaders := map[string][]string{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}} + queryHeaders := resources.Headers{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}} // Instant Query require.NoError(t, resources.Retry(func() error { - result, err := coord.InstantQuery(resources.QueryRequest{QueryExpr: "cpu"}, queryHeaders) + result, err := coord.InstantQuery(resources.QueryRequest{Query: "cpu"}, queryHeaders) if err != nil { return err } @@ -312,9 +312,9 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) { require.NoError(t, resources.Retry(func() error { result, err := coord.RangeQuery( resources.RangeQueryRequest{ - QueryExpr: "cpu", - StartTime: time.Now().Add(-30 * time.Second), - EndTime: time.Now(), + Query: "cpu", + Start: time.Now().Add(-30 * time.Second), + End: time.Now(), Step: 1 * time.Second, }, queryHeaders, diff --git a/src/integration/resources/inprocess/coordinator.go b/src/integration/resources/inprocess/coordinator.go index 1d736401ea..69ea3abb31 100644 --- a/src/integration/resources/inprocess/coordinator.go +++ b/src/integration/resources/inprocess/coordinator.go @@ -418,7 +418,7 @@ func (c *Coordinator) WriteProm(name string, tags map[string]string, samples []p func (c *Coordinator) RunQuery( verifier resources.ResponseVerifier, query string, - headers map[string][]string, + headers resources.Headers, ) error { return c.client.RunQuery(verifier, query, headers) } @@ -426,7 +426,7 @@ func (c *Coordinator) RunQuery( // InstantQuery runs an instant query with provided headers func (c *Coordinator) InstantQuery( req resources.QueryRequest, - headers map[string][]string, + headers resources.Headers, ) (model.Vector, error) { return c.client.InstantQuery(req, headers) } @@ -434,11 +434,35 @@ func (c *Coordinator) InstantQuery( // RangeQuery runs a range query with provided headers func (c *Coordinator) RangeQuery( req resources.RangeQueryRequest, - headers map[string][]string, + headers resources.Headers, ) (model.Matrix, error) { return c.client.RangeQuery(req, headers) } +// LabelNames return matching label names based on the request. +func (c *Coordinator) LabelNames( + req resources.LabelNamesRequest, + headers resources.Headers, +) (model.LabelNames, error) { + return c.client.LabelNames(req, headers) +} + +// LabelValues returns matching label values based on the request. +func (c *Coordinator) LabelValues( + req resources.LabelValuesRequest, + headers resources.Headers, +) (model.LabelValues, error) { + return c.client.LabelValues(req, headers) +} + +// Series returns matching series based on the request. +func (c *Coordinator) Series( + req resources.SeriesRequest, + headers resources.Headers, +) ([]model.Metric, error) { + return c.client.Series(req, headers) +} + func updateCoordinatorConfig( cfg config.Configuration, opts CoordinatorOptions, diff --git a/src/integration/resources/inprocess/coordinator_test.go b/src/integration/resources/inprocess/coordinator_test.go index 14f1512940..e9bc11028d 100644 --- a/src/integration/resources/inprocess/coordinator_test.go +++ b/src/integration/resources/inprocess/coordinator_test.go @@ -30,7 +30,11 @@ import ( "github.com/m3db/m3/src/msg/generated/proto/topicpb" "github.com/m3db/m3/src/msg/topic" "github.com/m3db/m3/src/query/generated/proto/admin" + "github.com/m3db/m3/src/query/generated/proto/prompb" + "github.com/m3db/m3/src/query/storage" + xtime "github.com/m3db/m3/src/x/time" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -73,15 +77,49 @@ func TestNewEmbeddedCoordinatorNotStarted(t *testing.T) { require.Error(t, err) } -func TestM3msgTopicFunctions(t *testing.T) { - dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{}) +func TestCoordinatorAPIs(t *testing.T) { + _, coord, closer := setupNodeAndCoordinator(t) + defer closer() + + testM3msgTopicFunctions(t, coord) + testAggPlacementFunctions(t, coord) + testMetadataAPIs(t, coord) +} + +func testMetadataAPIs(t *testing.T, coordinator resources.Coordinator) { + err := coordinator.WriteProm("cpu", map[string]string{"pod": "foo-1234"}, []prompb.Sample{ + {Value: 1, Timestamp: storage.TimeToPromTimestamp(xtime.Now())}, + }) require.NoError(t, err) - coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{}) + names, err := coordinator.LabelNames(resources.LabelNamesRequest{}, nil) require.NoError(t, err) + require.Equal(t, model.LabelNames{ + "__name__", + "pod", + }, names) + + values, err := coordinator.LabelValues(resources.LabelValuesRequest{ + LabelName: "__name__", + }, nil) + require.NoError(t, err) + require.Equal(t, model.LabelValues{"cpu"}, values) - require.NoError(t, coord.WaitForNamespace("")) + series, err := coordinator.Series(resources.SeriesRequest{ + MetadataRequest: resources.MetadataRequest{ + Match: "cpu", + }, + }, nil) + require.NoError(t, err) + require.Equal(t, []model.Metric{ + { + "__name__": "cpu", + "pod": "foo-1234", + }, + }, series) +} +func testM3msgTopicFunctions(t *testing.T, coord resources.Coordinator) { // init an m3msg topic m3msgTopicOpts := resources.M3msgTopicOptions{ Zone: "embedded", @@ -134,9 +172,6 @@ func TestM3msgTopicFunctions(t *testing.T) { getResp, err := coord.GetM3msgTopic(m3msgTopicOpts) require.NoError(t, err) validateEqualTopicResp(t, expectedAddResp, getResp) - - assert.NoError(t, coord.Close()) - assert.NoError(t, dbnode.Close()) } func validateEqualTopicResp(t *testing.T, expected, actual admin.TopicGetResponse) { @@ -149,15 +184,7 @@ func validateEqualTopicResp(t *testing.T, expected, actual admin.TopicGetRespons require.Equal(t, t1, t2) } -func TestAggPlacementFunctions(t *testing.T) { - dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{}) - require.NoError(t, err) - - coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{}) - require.NoError(t, err) - - require.NoError(t, coord.WaitForNamespace("")) - +func testAggPlacementFunctions(t *testing.T, coord resources.Coordinator) { placementOpts := resources.PlacementRequestOptions{ Service: resources.ServiceTypeM3Aggregator, Env: "default_env", @@ -235,9 +262,6 @@ func TestAggPlacementFunctions(t *testing.T) { } _, err = coord.GetPlacement(wrongPlacementOpts) require.NotNil(t, err) - - assert.NoError(t, coord.Close()) - assert.NoError(t, dbnode.Close()) } func validateEqualAggPlacement(t *testing.T, expected, actual *placementpb.Placement) { diff --git a/src/integration/resources/inprocess/dbnode_test.go b/src/integration/resources/inprocess/dbnode_test.go index f870908e26..806bb0ec1e 100644 --- a/src/integration/resources/inprocess/dbnode_test.go +++ b/src/integration/resources/inprocess/dbnode_test.go @@ -52,7 +52,7 @@ func TestNewDBNodeNoSetup(t *testing.T) { } func TestDBNode(t *testing.T) { - dbnode, closer := setupNode(t) + dbnode, _, closer := setupNodeAndCoordinator(t) defer closer() testHealth(t, dbnode) @@ -168,7 +168,7 @@ func validateTag(t *testing.T, tag ident.Tag, name string, value string) { require.Equal(t, value, tag.Value.String()) } -func setupNode(t *testing.T) (resources.Node, func()) { +func setupNodeAndCoordinator(t *testing.T) (resources.Node, resources.Coordinator, func()) { dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{GenerateHostID: true}) require.NoError(t, err) @@ -190,9 +190,10 @@ func setupNode(t *testing.T) (resources.Node, func()) { }) require.NoError(t, err) - require.NoError(t, dbnode.WaitForBootstrap()) + require.NoError(t, coord.WaitForShardsReady()) + require.NoError(t, coord.WaitForClusterReady()) - return dbnode, func() { + return dbnode, coord, func() { assert.NoError(t, coord.Close()) assert.NoError(t, dbnode.Close()) } diff --git a/src/integration/resources/types.go b/src/integration/resources/types.go index 05e40b3dca..9fd8304ae4 100644 --- a/src/integration/resources/types.go +++ b/src/integration/resources/types.go @@ -23,6 +23,9 @@ package resources import ( + "fmt" + "strconv" + "strings" "sync" "time" @@ -41,6 +44,9 @@ type ResponseVerifier func(int, map[string][]string, string, error) error // GoalStateVerifier verifies that the given results are valid. type GoalStateVerifier func(string, error) error +// Headers represents http headers. +type Headers map[string][]string + // Coordinator is a wrapper for a coordinator. It provides a wrapper on HTTP // endpoints that expose cluster management APIs as well as read and write // endpoints for series data. @@ -57,11 +63,17 @@ type Coordinator interface { // WriteProm writes a prometheus metric. WriteProm(name string, tags map[string]string, samples []prompb.Sample) error // RunQuery runs the given query with a given verification function. - RunQuery(verifier ResponseVerifier, query string, headers map[string][]string) error + RunQuery(verifier ResponseVerifier, query string, headers Headers) error // InstantQuery runs an instant query with provided headers - InstantQuery(req QueryRequest, headers map[string][]string) (model.Vector, error) + InstantQuery(req QueryRequest, headers Headers) (model.Vector, error) // RangeQuery runs a range query with provided headers - RangeQuery(req RangeQueryRequest, headers map[string][]string) (model.Matrix, error) + RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) + // LabelNames return matching label names based on the request. + LabelNames(req LabelNamesRequest, headers Headers) (model.LabelNames, error) + // LabelValues returns matching label values based on the request. + LabelValues(req LabelValuesRequest, headers Headers) (model.LabelValues, error) + // Series returns matching series based on the request. + Series(req SeriesRequest, headers Headers) ([]model.Metric, error) } // Admin is a wrapper for admin functions. @@ -262,20 +274,69 @@ func (a Aggregators) WaitForHealthy() error { // QueryRequest represents an instant query request type QueryRequest struct { - // QueryExpr is the Prometheus expression query string. - QueryExpr string + // Query is the Prometheus expression query string. + Query string // Time is the evaluation timestamp. It is optional. Time *time.Time } // RangeQueryRequest represents a range query request type RangeQueryRequest struct { - // QueryExpr is the Prometheus expression query string. - QueryExpr string - // StartTime is the start timestamp of the query range. The default value is time.Now(). - StartTime time.Time - // EndTime is the end timestamp of the query range. The default value is time.Now(). - EndTime time.Time + // Query is the Prometheus expression query string. + Query string + // Start is the start timestamp of the query range. The default value is time.Now(). + Start time.Time + // End is the end timestamp of the query range. The default value is time.Now(). + End time.Time // Step is the query resolution step width. It is default to 15 seconds. Step time.Duration } + +// MetadataRequest contains the parameters for making API requests related to metadata. +type MetadataRequest struct { + // Start is the start timestamp of labels to include. + Start time.Time + // End is the end timestamp of labels to include. + End time.Time + // Match is the series selector that selects series to read label names from. + Match string +} + +// LabelNamesRequest contains the parameters for making label names API calls. +type LabelNamesRequest struct { + MetadataRequest +} + +// LabelValuesRequest contains the parameters for making label values API calls. +type LabelValuesRequest struct { + MetadataRequest + + // LabelName is the name of the label to retrieve values for. + LabelName string +} + +// SeriesRequest contains the parameters for making series API calls. +type SeriesRequest struct { + MetadataRequest +} + +func (m *MetadataRequest) String() string { + var ( + start string + end string + parts []string + ) + if !m.Start.IsZero() { + start = strconv.Itoa(int(m.Start.Unix())) + parts = append(parts, fmt.Sprintf("start=%v", start)) + } + if !m.End.IsZero() { + end = strconv.Itoa(int(m.End.Unix())) + parts = append(parts, fmt.Sprintf("end=%v", end)) + } + if m.Match != "" { + parts = append(parts, fmt.Sprintf("match[]=%v", m.Match)) + } + + return strings.Join(parts, "&") +} diff --git a/src/query/api/v1/handler/prometheus/remote/match.go b/src/query/api/v1/handler/prometheus/remote/match.go index fbb4b32f2e..4cfd0cd0d0 100644 --- a/src/query/api/v1/handler/prometheus/remote/match.go +++ b/src/query/api/v1/handler/prometheus/remote/match.go @@ -27,7 +27,6 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" - "github.com/m3db/m3/src/query/api/v1/route" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser/promql" @@ -39,11 +38,6 @@ import ( "go.uber.org/zap" ) -const ( - // PromSeriesMatchURL is the url for remote prom series matcher handler. - PromSeriesMatchURL = route.Prefix + "/series" -) - // PromSeriesMatchHTTPMethods are the HTTP methods for this handler. var PromSeriesMatchHTTPMethods = []string{http.MethodGet, http.MethodPost} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 4dc65b9b96..42681c8ec5 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -47,6 +47,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/topic" "github.com/m3db/m3/src/query/api/v1/middleware" "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/api/v1/route" "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/util/queryhttp" xdebug "github.com/m3db/m3/src/x/debug" @@ -323,7 +324,7 @@ func (h *Handler) RegisterRoutes() error { // Series match endpoints. if err := h.registry.Register(queryhttp.RegisterOptions{ - Path: remote.PromSeriesMatchURL, + Path: route.SeriesMatchURL, Handler: remote.NewPromSeriesMatchHandler(h.options), Methods: remote.PromSeriesMatchHTTPMethods, MiddlewareOverride: native.WithQueryParams, diff --git a/src/query/api/v1/route/prom.go b/src/query/api/v1/route/prom.go index 9832cb3d0a..7a3e0d6042 100644 --- a/src/query/api/v1/route/prom.go +++ b/src/query/api/v1/route/prom.go @@ -35,4 +35,7 @@ const ( // QueryURL return the url for the query endpoint. QueryURL = Prefix + "/query" + + // SeriesMatchURL is the url for remote prom series matcher handler. + SeriesMatchURL = Prefix + "/series" )