Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default Prometheus query range route for Grafana integration #877

Merged
merged 11 commits into from
Sep 5, 2018
2 changes: 1 addition & 1 deletion docs/query_engine/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

* **URL**

/prom/native/read
/query_range

* **Method:**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ echo "Sleep for 30 seconds to let the remote write endpoint generate some data"

sleep 30

# Ensure Prometheus can proxy a Prometheus query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you also want to test m3query directly ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do, I'm going to do this in a follow up change if that's ok - its not trivial.

[ "$(curl -sSf localhost:9090/api/v1/query?query=prometheus_remote_storage_succeeded_samples_total | jq .data.result[].value[1])" != '"0"' ]


docker-compose -f docker-compose.yml down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes
15 changes: 8 additions & 7 deletions src/cmd/services/m3query/scripts/prom-m3-diff.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ m3port="localhost:7201"
promport="localhost:9090"
curl -fsS $promport/status > /dev/null || { echo "Prom port not open"; exit 1; }
curl -fsS $m3port/health > /dev/null || { echo "M3Query port not open"; exit 1; }
m3command="$m3port/api/v1/prom/native/read?start=$start&end=$end&step=$step&debug=true --data-urlencode target=$target"
promcommand="$promport/api/v1/query_range?start=$start&end=$end&step=$step --data-urlencode query=$target"
echo $m3command
echo $promcommand
curl -G $m3command > m3out
curl -G $promcommand > promout
jq ".[]|.tags,.datapoints" m3out > m3result
queryurl="/api/v1/query_range?start=$start&end=$end&step=$step --data-urlencode query=$target"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweet!

m3url="$m3port/$queryurl"
promurl="$promport/$queryurl"
echo $m3url
echo $promurl
curl -G $m3url > m3out
curl -G $promurl > promout
jq ".data.result|.[]|.metric,.values" m3out > m3result
jq ".data.result|.[]|.metric,.values" promout > promresult
echo "M3 file size" $(stat -f%z m3result)
echo "Prom file size" $(stat -f%z promresult)
70 changes: 47 additions & 23 deletions src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
const (
endParam = "end"
startParam = "start"
targetParam = "target"
queryParam = "query"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to work fine for the prometheus remote read end point?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, this is the native end point, not the remote read one. Never mind!

stepParam = "step"
debugParam = "debug"
endExclusiveParam = "end-exclusive"
Expand All @@ -61,11 +61,22 @@ func parseTime(r *http.Request, key string) (time.Time, error) {

// nolint: unparam
func parseDuration(r *http.Request, key string) (time.Duration, error) {
if d := r.FormValue(key); d != "" {
return time.ParseDuration(d)
str := r.FormValue(key)
if str == "" {
return 0, errors.ErrNotFound
}

return 0, errors.ErrNotFound
value, err := time.ParseDuration(str)
if err == nil {
return value, nil
}

// Try parsing as an integer value specifying seconds, the Prometheus default
if seconds, intErr := strconv.ParseInt(str, 10, 64); intErr == nil {
return time.Duration(seconds) * time.Second, nil
}

return 0, err
}

// parseParams parses all params from the GET request
Expand Down Expand Up @@ -98,11 +109,11 @@ func parseParams(r *http.Request) (models.RequestParams, *handler.ParseError) {
}
params.Step = step

target, err := parseTarget(r)
query, err := parseQuery(r)
if err != nil {
return params, handler.NewParseError(fmt.Errorf(formatErrStr, targetParam, err), http.StatusBadRequest)
return params, handler.NewParseError(fmt.Errorf(formatErrStr, queryParam, err), http.StatusBadRequest)
}
params.Target = target
params.Query = query

// Skip debug if unable to parse debug param
debugVal := r.FormValue(debugParam)
Expand All @@ -129,47 +140,57 @@ func parseParams(r *http.Request) (models.RequestParams, *handler.ParseError) {
return params, nil
}

func parseTarget(r *http.Request) (string, error) {
targetQueries, ok := r.URL.Query()[targetParam]
if !ok || len(targetQueries) == 0 || targetQueries[0] == "" {
return "", errors.ErrNoTargetFound
func parseQuery(r *http.Request) (string, error) {
queries, ok := r.URL.Query()[queryParam]
if !ok || len(queries) == 0 || queries[0] == "" {
return "", errors.ErrNoQueryFound
}

// TODO: currently, we only support one target at a time
if len(targetQueries) > 1 {
if len(queries) > 1 {
return "", errors.ErrBatchQuery
}

return targetQueries[0], nil
return queries[0], nil
}

func renderResultsJSON(w io.Writer, series []*ts.Series, params models.RequestParams) {
startIdx := 0
jw := json.NewWriter(w)
jw.BeginObject()

jw.BeginObjectField("status")
jw.WriteString("success")

jw.BeginObjectField("data")
jw.BeginObject()

jw.BeginObjectField("resultType")
jw.WriteString("matrix")

jw.BeginObjectField("result")
jw.BeginArray()
for _, s := range series {
jw.BeginObject()
jw.BeginObjectField("target")
jw.WriteString(s.Name())

jw.BeginObjectField("tags")
jw.BeginObjectField("metric")
jw.BeginObject()
for _, t := range s.Tags {
jw.BeginObjectField(t.Name)
jw.WriteString(t.Value)
}
jw.EndObject()

jw.BeginObjectField("datapoints")
jw.BeginObjectField("values")
jw.BeginArray()
vals := s.Values()
for i := startIdx; i < s.Len(); i++ {
length := s.Len()
for i := 0; i < length; i++ {
dp := vals.DatapointAt(i)
// Skip points before the query boundary. Ideal place to adjust these would be at the result node but that would make it inefficient
// since we would need to create another block just for the sake of restricting the bounds.
// Each series have the same start time so we just need to calculate the correct startIdx once
// NB(r): Removing the optimization of computing startIdx once just in case our assumptions are wrong,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit odd that you're seeing more points after deleting this. Might bear further investigation to see where our assumptions go bad

// we can always add this optimization back later. Without this code I see datapoints more often.
if dp.Timestamp.Before(params.Start) {
startIdx = i + 1
continue
}

Expand All @@ -183,11 +204,14 @@ func renderResultsJSON(w io.Writer, series []*ts.Series, params models.RequestPa
fixedStep, ok := s.Values().(ts.FixedResolutionMutableValues)
if ok {
jw.BeginObjectField("step_size_ms")
jw.WriteInt(int(util.DurationToMS(fixedStep.MillisPerStep())))
jw.WriteInt(int(fixedStep.Resolution() / time.Millisecond))
jw.EndObject()
}
}

jw.EndArray()

jw.EndObject()

jw.EndObject()
jw.Close()
}
110 changes: 107 additions & 3 deletions src/query/api/v1/handler/prometheus/native/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@
package native

import (
"bytes"
"encoding/json"
"net/http"
"net/url"
"testing"
"time"

xtest "github.com/m3db/m3/src/dbnode/x/test"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -37,7 +43,7 @@ const (
func defaultParams() url.Values {
vals := url.Values{}
now := time.Now()
vals.Add(targetParam, promQuery)
vals.Add(queryParam, promQuery)
vals.Add(startParam, now.Format(time.RFC3339))
vals.Add(endParam, string(now.Add(time.Hour).Format(time.RFC3339)))
vals.Add(stepParam, (time.Duration(10) * time.Second).String())
Expand All @@ -50,7 +56,7 @@ func TestParamParsing(t *testing.T) {

r, err := parseParams(req)
require.Nil(t, err, "unable to parse request")
require.Equal(t, promQuery, r.Target)
require.Equal(t, promQuery, r.Query)
}

func TestInvalidStart(t *testing.T) {
Expand All @@ -66,11 +72,109 @@ func TestInvalidStart(t *testing.T) {
func TestInvalidTarget(t *testing.T) {
req, _ := http.NewRequest("GET", PromReadURL, nil)
vals := defaultParams()
vals.Del(targetParam)
vals.Del(queryParam)
req.URL.RawQuery = vals.Encode()

p, err := parseParams(req)
require.NotNil(t, err, "unable to parse request")
assert.NotNil(t, p.Start)
require.Equal(t, err.Code(), http.StatusBadRequest)
}

func TestParseDuration(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, "/foo?step=10s", nil)
require.NoError(t, err)
v, err := parseDuration(r, stepParam)
require.NoError(t, err)
assert.Equal(t, 10*time.Second, v)
}

func TestParseDurationParsesIntAsSeconds(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, "/foo?step=30", nil)
require.NoError(t, err)
v, err := parseDuration(r, stepParam)
require.NoError(t, err)
assert.Equal(t, 30*time.Second, v)
}

func TestParseDurationError(t *testing.T) {
r, err := http.NewRequest(http.MethodGet, "/foo?step=bar10", nil)
require.NoError(t, err)
_, err = parseDuration(r, stepParam)
assert.Error(t, err)
}

func TestRenderResultsJSON(t *testing.T) {
start := time.Unix(1535948880, 0)

buffer := bytes.NewBuffer(nil)
params := models.RequestParams{}
series := []*ts.Series{
ts.NewSeries("foo", ts.NewFixedStepValues(10*time.Second, 2, 1, start), models.Tags{
models.Tag{Name: "bar", Value: "baz"},
models.Tag{Name: "qux", Value: "qaz"},
}),
ts.NewSeries("bar", ts.NewFixedStepValues(10*time.Second, 2, 2, start), models.Tags{
models.Tag{Name: "baz", Value: "bar"},
models.Tag{Name: "qaz", Value: "qux"},
}),
}

renderResultsJSON(buffer, series, params)

expected := mustPrettyJSON(t, `
{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"bar": "baz",
"qux": "qaz"
},
"values": [
[
1535948880,
"1"
],
[
1535948890,
"1"
]
],
"step_size_ms": 10000
},
{
"metric": {
"baz": "bar",
"qaz": "qux"
},
"values": [
[
1535948880,
"2"
],
[
1535948890,
"2"
]
],
"step_size_ms": 10000
}
]
}
}
`)
actual := mustPrettyJSON(t, buffer.String())
assert.Equal(t, expected, actual, xtest.Diff(expected, actual))
}

func mustPrettyJSON(t *testing.T, str string) string {
var unmarshalled map[string]interface{}
err := json.Unmarshal([]byte(str), &unmarshalled)
require.NoError(t, err)
pretty, err := json.MarshalIndent(unmarshalled, "", " ")
require.NoError(t, err)
return string(pretty)
}
8 changes: 5 additions & 3 deletions src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
)

const (
// PromReadURL is the url for native prom read handler
PromReadURL = handler.RoutePrefixV1 + "/prom/native/read"
// PromReadURL is the url for native prom read handler, this matches the
// default URL for the query range endpoint found on a Prometheus server
PromReadURL = handler.RoutePrefixV1 + "/query_range"

// PromReadHTTPMethod is the HTTP method used with this resource.
PromReadHTTPMethod = http.MethodGet
Expand Down Expand Up @@ -96,6 +97,7 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// TODO: Support multiple result types
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
renderResultsJSON(w, result, params)
}

Expand All @@ -109,7 +111,7 @@ func (h *PromReadHandler) read(reqCtx context.Context, w http.ResponseWriter, pa
opts.AbortCh = abortCh

// TODO: Capture timing
parser, err := promql.Parse(params.Target)
parser, err := promql.Parse(params.Query)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (h *Handler) RegisterRoutes() error {
h.Router.HandleFunc(openapi.URL, logged(&openapi.DocHandler{}).ServeHTTP).Methods(openapi.HTTPMethod)
h.Router.PathPrefix(openapi.StaticURLPrefix).Handler(logged(openapi.StaticHandler()))

// Prometheus remote read/write endpoints
promRemoteReadHandler := remote.NewPromReadHandler(h.engine, h.scope.Tagged(remoteSource))
promRemoteWriteHandler, err := remote.NewPromWriteHandler(h.storage, nil, h.scope.Tagged(remoteSource))
if err != nil {
Expand All @@ -121,6 +122,8 @@ func (h *Handler) RegisterRoutes() error {
h.Router.HandleFunc(remote.PromReadURL, logged(promRemoteReadHandler).ServeHTTP).Methods(remote.PromReadHTTPMethod)
h.Router.HandleFunc(remote.PromWriteURL, logged(promRemoteWriteHandler).ServeHTTP).Methods(remote.PromWriteHTTPMethod)
h.Router.HandleFunc(native.PromReadURL, logged(native.NewPromReadHandler(h.engine)).ServeHTTP).Methods(native.PromReadHTTPMethod)

// Native M3 search and write endpoints
h.Router.HandleFunc(handler.SearchURL, logged(handler.NewSearchHandler(h.storage)).ServeHTTP).Methods(handler.SearchHTTPMethod)
h.Router.HandleFunc(m3json.WriteJSONURL, logged(m3json.NewWriteJSONHandler(h.storage)).ServeHTTP).Methods(m3json.JSONWriteHTTPMethod)

Expand Down
6 changes: 3 additions & 3 deletions src/query/errors/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ var (
// ErrHeaderNotFound is returned when a header is not found
ErrHeaderNotFound = errors.New("header not found")
// ErrBatchQuery is returned when a batch query is found
ErrBatchQuery = errors.New("batch queries are currently not supported")
// ErrNoTargetFound is returned when a target is not found
ErrNoTargetFound = errors.New("no target found")
ErrBatchQuery = errors.New("batch queries are currently not supported")
// ErrNoQueryFound is returned when a target is not found
ErrNoQueryFound = errors.New("no query found")
)
2 changes: 1 addition & 1 deletion src/query/models/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type RequestParams struct {
Now time.Time
Timeout time.Duration
Step time.Duration
Target string
Query string
Debug bool
IncludeEnd bool
}
Expand Down
Loading