Skip to content

Commit

Permalink
Add HTTP handler for metrics querying (#3095)
Browse files Browse the repository at this point in the history
* Add HTTP handler

Signed-off-by: albertteoh <[email protected]>

* Address review comments

Signed-off-by: albertteoh <[email protected]>

* Better func name

Signed-off-by: albertteoh <[email protected]>

* Tidy up comments, functions and vars

Signed-off-by: albertteoh <[email protected]>

* Wrap error string

Signed-off-by: albertteoh <[email protected]>

* Address review comments

Signed-off-by: albertteoh <[email protected]>

* nit: Revert ordering of params

Signed-off-by: albertteoh <[email protected]>

* Improve comments

Signed-off-by: albertteoh <[email protected]>

* More correct metrics query BNF syntax

Signed-off-by: albertteoh <[email protected]>

* Address review comments

Signed-off-by: albertteoh <[email protected]>

* Apply suggestions from code review

Co-authored-by: Yuri Shkuro <[email protected]>
Signed-off-by: albertteoh <[email protected]>

* Fix tests

Signed-off-by: albertteoh <[email protected]>

Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
albertteoh and yurishkuro authored Jun 21, 2021
1 parent 13885e5 commit 681dd68
Show file tree
Hide file tree
Showing 9 changed files with 882 additions and 204 deletions.
14 changes: 7 additions & 7 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) {
for _, tc := range []spanstore.Reader{nil, mockReader} {
archiveReader := tc // capture loop var
t.Run(fmt.Sprint(archiveReader), func(t *testing.T) {
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredResponse
Expand All @@ -54,7 +54,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -69,7 +69,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) {

// Test failure in parsing trace ID.
func TestArchiveTrace_BadTraceID(t *testing.T) {
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response)
assert.Error(t, err)
Expand All @@ -83,7 +83,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
Return(nil, spanstore.ErrTraceNotFound).Once()
mockWriter := &spanstoremocks.Writer{}
// Not actually going to write the trace, so no need to define mockWriter action
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
Expand All @@ -94,7 +94,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) {
}

func TestArchiveTrace_NoStorage(t *testing.T) {
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
assert.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n")
Expand All @@ -105,7 +105,7 @@ func TestArchiveTrace_Success(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand All @@ -118,7 +118,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter := &spanstoremocks.Writer{}
mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(t, func(ts *testServer) {
withTestServer(func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
Expand Down
28 changes: 14 additions & 14 deletions cmd/query/app/handler_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,14 @@ func TestFilterDependencies(t *testing.T) {
}

func TestGetDependenciesSuccess(t *testing.T) {
server, _, mock := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()
expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}}
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response)
assert.NotEmpty(t, response.Data)
data := response.Data.([]interface{})[0]
actual := data.(map[string]interface{})
Expand All @@ -326,30 +326,30 @@ func TestGetDependenciesSuccess(t *testing.T) {
}

func TestGetDependenciesCassandraFailure(t *testing.T) {
server, _, mock := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)
ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
assert.Error(t, err)
}

func TestGetDependenciesEndTimeParsingFailure(t *testing.T) {
server, _, _ := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response)
assert.Error(t, err)
}

func TestGetDependenciesLookbackParsingFailure(t *testing.T) {
server, _, _ := initializeTestServer()
defer server.Close()
ts := initializeTestServer()
defer ts.server.Close()

var response structuredResponse
err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response)
assert.Error(t, err)
}
9 changes: 9 additions & 0 deletions cmd/query/app/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (

"github.com/opentracing/opentracing-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
)

// HandlerOption is a function that sets some option on the APIHandler
Expand Down Expand Up @@ -65,3 +67,10 @@ func (handlerOptions) Tracer(tracer opentracing.Tracer) HandlerOption {
apiHandler.tracer = tracer
}
}

// MetricsQueryService creates a HandlerOption that initializes MetricsQueryService.
func (handlerOptions) MetricsQueryService(mqs querysvc.MetricsQueryService) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.metricsQueryService = mqs
}
}
146 changes: 100 additions & 46 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package app
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"time"

"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
Expand All @@ -34,15 +36,23 @@ import (
uiconv "github.com/jaegertracing/jaeger/model/converter/json"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
traceIDParam = "traceID"
endTsParam = "endTs"
lookbackParam = "lookback"

defaultAPIPrefix = "api"
traceIDParam = "traceID"
endTsParam = "endTs"
lookbackParam = "lookback"
stepParam = "step"
rateParam = "ratePer"
quantileParam = "quantile"
groupByOperationParam = "groupByOperation"

defaultAPIPrefix = "api"
prettyPrintIndent = " "
)

// HTTPHandler handles http requests
Expand Down Expand Up @@ -71,12 +81,13 @@ func NewRouter() *mux.Router {

// APIHandler implements the query service public API by registering routes at httpPrefix
type APIHandler struct {
queryService *querysvc.QueryService
queryParser queryParser
basePath string
apiPrefix string
logger *zap.Logger
tracer opentracing.Tracer
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
queryParser queryParser
basePath string
apiPrefix string
logger *zap.Logger
tracer opentracing.Tracer
}

// NewAPIHandler returns an APIHandler
Expand Down Expand Up @@ -115,6 +126,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
// TODO - remove this when UI catches up
aH.handleFunc(router, aH.getOperationsLegacy, "/services/{%s}/operations", serviceParam).Methods(http.MethodGet)
aH.handleFunc(router, aH.dependencies, "/dependencies").Methods(http.MethodGet)
aH.handleFunc(router, aH.latencies, "/metrics/latencies").Methods(http.MethodGet)
aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet)
aH.handleFunc(router, aH.errors, "/metrics/errors").Methods(http.MethodGet)
aH.handleFunc(router, aH.minStep, "/metrics/minstep").Methods(http.MethodGet)
}

func (aH *APIHandler) handleFunc(
Expand Down Expand Up @@ -176,7 +191,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request
func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
service := r.FormValue(serviceParam)
if service == "" {
if aH.handleError(w, ErrServiceParameterRequired, http.StatusBadRequest) {
if aH.handleError(w, errServiceParameterRequired, http.StatusBadRequest) {
return
}
}
Expand Down Expand Up @@ -204,7 +219,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
}

func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
tQuery, err := aH.queryParser.parse(r)
tQuery, err := aH.queryParser.parseTraceQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
Expand Down Expand Up @@ -259,31 +274,13 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID)
}

func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
endTsMillis, err := strconv.ParseInt(r.FormValue(endTsParam), 10, 64)
if err != nil {
err = fmt.Errorf("unable to parse %s: %w", endTimeParam, err)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
}
var lookback time.Duration
if formValue := r.FormValue(lookbackParam); len(formValue) > 0 {
lookback, err = time.ParseDuration(formValue + "ms")
if err != nil {
err = fmt.Errorf("unable to parse %s: %w", lookbackParam, err)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
}
dqp, err := aH.queryParser.parseDependenciesQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
service := r.FormValue(serviceParam)

if lookback == 0 {
lookback = defaultDependencyLookbackDuration
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(r.Context(), dqp.endTs, dqp.lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -295,6 +292,60 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) latencies(w http.ResponseWriter, r *http.Request) {
q, err := strconv.ParseFloat(r.FormValue(quantileParam), 64)
if err != nil {
aH.handleError(w, newParseError(err, quantileParam), http.StatusBadRequest)
return
}
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetLatencies(ctx, &metricsstore.LatenciesQueryParameters{
BaseQueryParameters: baseParams,
Quantile: q,
})
})
}

func (aH *APIHandler) calls(w http.ResponseWriter, r *http.Request) {
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetCallRates(ctx, &metricsstore.CallRateQueryParameters{
BaseQueryParameters: baseParams,
})
})
}

func (aH *APIHandler) errors(w http.ResponseWriter, r *http.Request) {
aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) {
return aH.metricsQueryService.GetErrorRates(ctx, &metricsstore.ErrorRateQueryParameters{
BaseQueryParameters: baseParams,
})
})
}

func (aH *APIHandler) minStep(w http.ResponseWriter, r *http.Request) {
minStep, err := aH.metricsQueryService.GetMinStepDuration(r.Context(), &metricsstore.MinStepDurationQueryParameters{})
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}

structuredRes := structuredResponse{
Data: minStep.Milliseconds(),
}
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) metrics(w http.ResponseWriter, r *http.Request, getMetrics func(context.Context, metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error)) {
requestParams, err := aH.queryParser.parseMetricsQueryParams(r)
if aH.handleError(w, err, http.StatusBadRequest) {
return
}
m, err := getMetrics(r.Context(), requestParams)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
aH.writeJSON(w, r, m)
}

func (aH *APIHandler) convertModelToUI(trace *model.Trace, adjust bool) (*ui.Trace, *structuredError) {
var errors []error
if adjust {
Expand Down Expand Up @@ -429,6 +480,9 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i
if err == nil {
return false
}
if errors.Is(err, disabled.ErrDisabled) {
statusCode = http.StatusNotImplemented
}
if statusCode == http.StatusInternalServerError {
aH.logger.Error("HTTP handler, Internal Server Error", zap.Error(err))
}
Expand All @@ -446,19 +500,19 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i
}

func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response interface{}) {
marshall := json.Marshal
if prettyPrint := r.FormValue(prettyPrintParam); prettyPrint != "" && prettyPrint != "false" {
marshall = func(v interface{}) ([]byte, error) {
return json.MarshalIndent(v, "", " ")
}
}
resp, err := marshall(response)
if err != nil {
aH.handleError(w, fmt.Errorf("failed marshalling HTTP response to JSON: %w", err), http.StatusInternalServerError)
return
prettyPrintValue := r.FormValue(prettyPrintParam)
prettyPrint := prettyPrintValue != "" && prettyPrintValue != "false"

var marshal jsonMarshaler
switch response.(type) {
case proto.Message:
marshal = newProtoJSONMarshaler(prettyPrint)
default:
marshal = newStructJSONMarshaler(prettyPrint)
}

w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(resp); err != nil {
if err := marshal(w, response); err != nil {
aH.handleError(w, fmt.Errorf("failed writing HTTP response: %w", err), http.StatusInternalServerError)
}
}
Loading

0 comments on commit 681dd68

Please sign in to comment.