From 681dd68b0e727395f3a2df65a3a059d70e7de884 Mon Sep 17 00:00:00 2001 From: Albert <26584478+albertteoh@users.noreply.github.com> Date: Mon, 21 Jun 2021 13:09:47 +1000 Subject: [PATCH] Add HTTP handler for metrics querying (#3095) * Add HTTP handler Signed-off-by: albertteoh * Address review comments Signed-off-by: albertteoh * Better func name Signed-off-by: albertteoh * Tidy up comments, functions and vars Signed-off-by: albertteoh * Wrap error string Signed-off-by: albertteoh * Address review comments Signed-off-by: albertteoh * nit: Revert ordering of params Signed-off-by: albertteoh * Improve comments Signed-off-by: albertteoh * More correct metrics query BNF syntax Signed-off-by: albertteoh * Address review comments Signed-off-by: albertteoh * Apply suggestions from code review Co-authored-by: Yuri Shkuro Signed-off-by: albertteoh * Fix tests Signed-off-by: albertteoh Co-authored-by: Yuri Shkuro --- cmd/query/app/handler_archive_test.go | 14 +- cmd/query/app/handler_deps_test.go | 28 +- cmd/query/app/handler_options.go | 9 + cmd/query/app/http_handler.go | 146 ++++++--- cmd/query/app/http_handler_test.go | 409 +++++++++++++++++++------- cmd/query/app/json_marshaler.go | 56 ++++ cmd/query/app/query_parser.go | 267 +++++++++++++++-- cmd/query/app/query_parser_test.go | 155 +++++++++- cmd/query/app/server.go | 2 +- 9 files changed, 882 insertions(+), 204 deletions(-) create mode 100644 cmd/query/app/json_marshaler.go diff --git a/cmd/query/app/handler_archive_test.go b/cmd/query/app/handler_archive_test.go index 6f2a99dabf9..a244f737050 100644 --- a/cmd/query/app/handler_archive_test.go +++ b/cmd/query/app/handler_archive_test.go @@ -36,7 +36,7 @@ func TestGetArchivedTrace_NotFound(t *testing.T) { for _, tc := range []spanstore.Reader{nil, mockReader} { archiveReader := tc // capture loop var t.Run(fmt.Sprint(archiveReader), func(t *testing.T) { - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse @@ -54,7 +54,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) { mockReader := &spanstoremocks.Reader{} mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { // make main reader return NotFound ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -69,7 +69,7 @@ func TestGetArchivedTraceSuccess(t *testing.T) { // Test failure in parsing trace ID. func TestArchiveTrace_BadTraceID(t *testing.T) { - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/badtraceid", []string{}, &response) assert.Error(t, err) @@ -83,7 +83,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) { Return(nil, spanstore.ErrTraceNotFound).Once() mockWriter := &spanstoremocks.Writer{} // Not actually going to write the trace, so no need to define mockWriter action - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { // make main reader return NotFound ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() @@ -94,7 +94,7 @@ func TestArchiveTrace_TraceNotFound(t *testing.T) { } func TestArchiveTrace_NoStorage(t *testing.T) { - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { var response structuredResponse err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response) assert.EqualError(t, err, `500 error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":500,"msg":"archive span storage was not configured"}]}`+"\n") @@ -105,7 +105,7 @@ func TestArchiveTrace_Success(t *testing.T) { mockWriter := &spanstoremocks.Writer{} mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(nil).Times(2) - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse @@ -118,7 +118,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) { mockWriter := &spanstoremocks.Writer{} mockWriter.On("WriteSpan", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*model.Span")). Return(errors.New("cannot save")).Times(2) - withTestServer(t, func(ts *testServer) { + withTestServer(func(ts *testServer) { ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse diff --git a/cmd/query/app/handler_deps_test.go b/cmd/query/app/handler_deps_test.go index 48ca9ccac6a..58449109680 100644 --- a/cmd/query/app/handler_deps_test.go +++ b/cmd/query/app/handler_deps_test.go @@ -308,14 +308,14 @@ func TestFilterDependencies(t *testing.T) { } func TestGetDependenciesSuccess(t *testing.T) { - server, _, mock := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}} endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) - mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1) + ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1) var response structuredResponse - err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response) + err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=queen", &response) assert.NotEmpty(t, response.Data) data := response.Data.([]interface{})[0] actual := data.(map[string]interface{}) @@ -326,30 +326,30 @@ func TestGetDependenciesSuccess(t *testing.T) { } func TestGetDependenciesCassandraFailure(t *testing.T) { - server, _, mock := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) - mock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1) + ts.dependencyReader.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1) var response structuredResponse - err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response) + err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response) assert.Error(t, err) } func TestGetDependenciesEndTimeParsingFailure(t *testing.T) { - server, _, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() var response structuredResponse - err := getJSON(server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response) + err := getJSON(ts.server.URL+"/api/dependencies?endTs=shazbot&service=testing", &response) assert.Error(t, err) } func TestGetDependenciesLookbackParsingFailure(t *testing.T) { - server, _, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() var response structuredResponse - err := getJSON(server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response) + err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing&lookback=shazbot", &response) assert.Error(t, err) } diff --git a/cmd/query/app/handler_options.go b/cmd/query/app/handler_options.go index 6f164156910..7e562f4eb47 100644 --- a/cmd/query/app/handler_options.go +++ b/cmd/query/app/handler_options.go @@ -20,6 +20,8 @@ import ( "github.com/opentracing/opentracing-go" "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" ) // HandlerOption is a function that sets some option on the APIHandler @@ -65,3 +67,10 @@ func (handlerOptions) Tracer(tracer opentracing.Tracer) HandlerOption { apiHandler.tracer = tracer } } + +// MetricsQueryService creates a HandlerOption that initializes MetricsQueryService. +func (handlerOptions) MetricsQueryService(mqs querysvc.MetricsQueryService) HandlerOption { + return func(apiHandler *APIHandler) { + apiHandler.metricsQueryService = mqs + } +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 04d3bea2a9b..6b3f098a8cd 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -18,12 +18,14 @@ package app import ( "context" "encoding/json" + "errors" "fmt" "net/http" "net/url" "strconv" "time" + "github.com/gogo/protobuf/proto" "github.com/gorilla/mux" "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" @@ -34,15 +36,23 @@ import ( uiconv "github.com/jaegertracing/jaeger/model/converter/json" ui "github.com/jaegertracing/jaeger/model/json" "github.com/jaegertracing/jaeger/pkg/multierror" + "github.com/jaegertracing/jaeger/plugin/metrics/disabled" + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - traceIDParam = "traceID" - endTsParam = "endTs" - lookbackParam = "lookback" - - defaultAPIPrefix = "api" + traceIDParam = "traceID" + endTsParam = "endTs" + lookbackParam = "lookback" + stepParam = "step" + rateParam = "ratePer" + quantileParam = "quantile" + groupByOperationParam = "groupByOperation" + + defaultAPIPrefix = "api" + prettyPrintIndent = " " ) // HTTPHandler handles http requests @@ -71,12 +81,13 @@ func NewRouter() *mux.Router { // APIHandler implements the query service public API by registering routes at httpPrefix type APIHandler struct { - queryService *querysvc.QueryService - queryParser queryParser - basePath string - apiPrefix string - logger *zap.Logger - tracer opentracing.Tracer + queryService *querysvc.QueryService + metricsQueryService querysvc.MetricsQueryService + queryParser queryParser + basePath string + apiPrefix string + logger *zap.Logger + tracer opentracing.Tracer } // NewAPIHandler returns an APIHandler @@ -115,6 +126,10 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) { // TODO - remove this when UI catches up aH.handleFunc(router, aH.getOperationsLegacy, "/services/{%s}/operations", serviceParam).Methods(http.MethodGet) aH.handleFunc(router, aH.dependencies, "/dependencies").Methods(http.MethodGet) + aH.handleFunc(router, aH.latencies, "/metrics/latencies").Methods(http.MethodGet) + aH.handleFunc(router, aH.calls, "/metrics/calls").Methods(http.MethodGet) + aH.handleFunc(router, aH.errors, "/metrics/errors").Methods(http.MethodGet) + aH.handleFunc(router, aH.minStep, "/metrics/minstep").Methods(http.MethodGet) } func (aH *APIHandler) handleFunc( @@ -176,7 +191,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { service := r.FormValue(serviceParam) if service == "" { - if aH.handleError(w, ErrServiceParameterRequired, http.StatusBadRequest) { + if aH.handleError(w, errServiceParameterRequired, http.StatusBadRequest) { return } } @@ -204,7 +219,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) { - tQuery, err := aH.queryParser.parse(r) + tQuery, err := aH.queryParser.parseTraceQueryParams(r) if aH.handleError(w, err, http.StatusBadRequest) { return } @@ -259,31 +274,13 @@ func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID) } func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) { - endTsMillis, err := strconv.ParseInt(r.FormValue(endTsParam), 10, 64) - if err != nil { - err = fmt.Errorf("unable to parse %s: %w", endTimeParam, err) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } - } - var lookback time.Duration - if formValue := r.FormValue(lookbackParam); len(formValue) > 0 { - lookback, err = time.ParseDuration(formValue + "ms") - if err != nil { - err = fmt.Errorf("unable to parse %s: %w", lookbackParam, err) - if aH.handleError(w, err, http.StatusBadRequest) { - return - } - } + dqp, err := aH.queryParser.parseDependenciesQueryParams(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return } service := r.FormValue(serviceParam) - if lookback == 0 { - lookback = defaultDependencyLookbackDuration - } - endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond) - - dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback) + dependencies, err := aH.queryService.GetDependencies(r.Context(), dqp.endTs, dqp.lookback) if aH.handleError(w, err, http.StatusInternalServerError) { return } @@ -295,6 +292,60 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, &structuredRes) } +func (aH *APIHandler) latencies(w http.ResponseWriter, r *http.Request) { + q, err := strconv.ParseFloat(r.FormValue(quantileParam), 64) + if err != nil { + aH.handleError(w, newParseError(err, quantileParam), http.StatusBadRequest) + return + } + aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) { + return aH.metricsQueryService.GetLatencies(ctx, &metricsstore.LatenciesQueryParameters{ + BaseQueryParameters: baseParams, + Quantile: q, + }) + }) +} + +func (aH *APIHandler) calls(w http.ResponseWriter, r *http.Request) { + aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) { + return aH.metricsQueryService.GetCallRates(ctx, &metricsstore.CallRateQueryParameters{ + BaseQueryParameters: baseParams, + }) + }) +} + +func (aH *APIHandler) errors(w http.ResponseWriter, r *http.Request) { + aH.metrics(w, r, func(ctx context.Context, baseParams metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error) { + return aH.metricsQueryService.GetErrorRates(ctx, &metricsstore.ErrorRateQueryParameters{ + BaseQueryParameters: baseParams, + }) + }) +} + +func (aH *APIHandler) minStep(w http.ResponseWriter, r *http.Request) { + minStep, err := aH.metricsQueryService.GetMinStepDuration(r.Context(), &metricsstore.MinStepDurationQueryParameters{}) + if aH.handleError(w, err, http.StatusInternalServerError) { + return + } + + structuredRes := structuredResponse{ + Data: minStep.Milliseconds(), + } + aH.writeJSON(w, r, &structuredRes) +} + +func (aH *APIHandler) metrics(w http.ResponseWriter, r *http.Request, getMetrics func(context.Context, metricsstore.BaseQueryParameters) (*metrics.MetricFamily, error)) { + requestParams, err := aH.queryParser.parseMetricsQueryParams(r) + if aH.handleError(w, err, http.StatusBadRequest) { + return + } + m, err := getMetrics(r.Context(), requestParams) + if aH.handleError(w, err, http.StatusInternalServerError) { + return + } + aH.writeJSON(w, r, m) +} + func (aH *APIHandler) convertModelToUI(trace *model.Trace, adjust bool) (*ui.Trace, *structuredError) { var errors []error if adjust { @@ -429,6 +480,9 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i if err == nil { return false } + if errors.Is(err, disabled.ErrDisabled) { + statusCode = http.StatusNotImplemented + } if statusCode == http.StatusInternalServerError { aH.logger.Error("HTTP handler, Internal Server Error", zap.Error(err)) } @@ -446,19 +500,19 @@ func (aH *APIHandler) handleError(w http.ResponseWriter, err error, statusCode i } func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response interface{}) { - marshall := json.Marshal - if prettyPrint := r.FormValue(prettyPrintParam); prettyPrint != "" && prettyPrint != "false" { - marshall = func(v interface{}) ([]byte, error) { - return json.MarshalIndent(v, "", " ") - } - } - resp, err := marshall(response) - if err != nil { - aH.handleError(w, fmt.Errorf("failed marshalling HTTP response to JSON: %w", err), http.StatusInternalServerError) - return + prettyPrintValue := r.FormValue(prettyPrintParam) + prettyPrint := prettyPrintValue != "" && prettyPrintValue != "false" + + var marshal jsonMarshaler + switch response.(type) { + case proto.Message: + marshal = newProtoJSONMarshaler(prettyPrint) + default: + marshal = newStructJSONMarshaler(prettyPrint) } + w.Header().Set("Content-Type", "application/json") - if _, err := w.Write(resp); err != nil { + if err := marshal(w, response); err != nil { aH.handleError(w, fmt.Errorf("failed writing HTTP response: %w", err), http.StatusInternalServerError) } } diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index fc4d51642cf..6cf27fb821f 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -29,6 +29,8 @@ import ( "time" "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" "github.com/stretchr/testify/assert" testHttp "github.com/stretchr/testify/http" "github.com/stretchr/testify/mock" @@ -41,7 +43,10 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" ui "github.com/jaegertracing/jaeger/model/json" + "github.com/jaegertracing/jaeger/plugin/metrics/disabled" + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + metricsmocks "github.com/jaegertracing/jaeger/storage/metricsstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -85,7 +90,7 @@ type structuredTraceResponse struct { Errors []structuredError `json:"errors"` } -func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) (*httptest.Server, *spanstoremocks.Reader, *depsmocks.Reader, *APIHandler) { +func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { return initializeTestServerWithOptions( queryOptions, append( @@ -101,19 +106,23 @@ func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, ) } -func initializeTestServerWithOptions(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) (*httptest.Server, *spanstoremocks.Reader, *depsmocks.Reader, *APIHandler) { +func initializeTestServerWithOptions(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions) r := NewRouter() handler := NewAPIHandler(qs, options...) handler.RegisterRoutes(r) - return httptest.NewServer(r), readStorage, dependencyStorage, handler + return &testServer{ + server: httptest.NewServer(r), + spanReader: readStorage, + dependencyReader: dependencyStorage, + handler: handler, + } } -func initializeTestServer(options ...HandlerOption) (*httptest.Server, *spanstoremocks.Reader, *depsmocks.Reader) { - https, sr, dr, _ := initializeTestServerWithHandler(querysvc.QueryServiceOptions{}, options...) - return https, sr, dr +func initializeTestServer(options ...HandlerOption) *testServer { + return initializeTestServerWithHandler(querysvc.QueryServiceOptions{}, options...) } type testServer struct { @@ -123,26 +132,20 @@ type testServer struct { server *httptest.Server } -func withTestServer(t *testing.T, doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { - server, spanReader, depReader, handler := initializeTestServerWithOptions(queryOptions, options...) - s := &testServer{ - spanReader: spanReader, - dependencyReader: depReader, - handler: handler, - server: server, - } - defer server.Close() - doTest(s) +func withTestServer(doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { + ts := initializeTestServerWithOptions(queryOptions, options...) + defer ts.server.Close() + doTest(ts) } func TestGetTraceSuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces/123456`, &response) + err := getJSON(ts.server.URL+`/api/traces/123456`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 0) } @@ -224,7 +227,7 @@ func TestWriteJSON(t *testing.T) { { name: "fail JSON marshal", data: struct{ Data float64 }{Data: math.Inf(1)}, - output: "{\"data\":null,\"total\":0,\"limit\":0,\"offset\":0,\"errors\":[{\"code\":500,\"msg\":\"failed marshalling HTTP response to JSON: json: unsupported value: +Inf\"}]}\n", + output: `failed marshalling HTTP response to JSON: json: unsupported value: +Inf`, }, { name: "fail http write", @@ -256,7 +259,7 @@ func TestWriteJSON(t *testing.T) { defer server.Close() out := get(server.URL + testCase.param) - assert.Equal(t, testCase.output, out) + assert.Contains(t, out, testCase.output) }) } } @@ -299,14 +302,14 @@ func TestGetTrace(t *testing.T) { jaegerTracer, jaegerCloser := jaeger.NewTracer("test", jaeger.NewConstSampler(true), reporter) defer jaegerCloser.Close() - server, readMock, _ := initializeTestServer(HandlerOptions.Tracer(jaegerTracer)) - defer server.Close() + ts := initializeTestServer(HandlerOptions.Tracer(jaegerTracer)) + defer ts.server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)). Return(makeMockTrace(t), nil).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces/123456aBC`+testCase.suffix, &response) // trace ID in mixed lower/upper case + err := getJSON(ts.server.URL+`/api/traces/123456aBC`+testCase.suffix, &response) // trace ID in mixed lower/upper case assert.NoError(t, err) assert.Len(t, response.Errors, 0) @@ -321,75 +324,75 @@ func TestGetTrace(t *testing.T) { } func TestGetTraceDBFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, errStorage).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces/123456`, &response) + err := getJSON(ts.server.URL+`/api/traces/123456`, &response) assert.Error(t, err) } func TestGetTraceNotFound(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces/123456`, &response) + err := getJSON(ts.server.URL+`/api/traces/123456`, &response) assert.EqualError(t, err, parsedError(404, "trace not found")) } func TestGetTraceAdjustmentFailure(t *testing.T) { - server, readMock, _, _ := initializeTestServerWithHandler( + ts := initializeTestServerWithHandler( querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { return trace, errAdjustment }), }, ) - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces/123456`, &response) + err := getJSON(ts.server.URL+`/api/traces/123456`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 1) assert.EqualValues(t, errAdjustment.Error(), response.Errors[0].Msg) } func TestGetTraceBadTraceID(t *testing.T) { - server, _, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() var response structuredResponse - err := getJSON(server.URL+`/api/traces/chumbawumba`, &response) + err := getJSON(ts.server.URL+`/api/traces/chumbawumba`, &response) assert.Error(t, err) } func TestSearchSuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return([]*model.Trace{mockTrace}, nil).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) + err := getJSON(ts.server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 0) } func TestSearchByTraceIDSuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Twice() var response structuredResponse - err := getJSON(server.URL+`/api/traces?traceID=1&traceID=2`, &response) + err := getJSON(ts.server.URL+`/api/traces?traceID=1&traceID=2`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 0) assert.Len(t, response.Data, 2) @@ -397,73 +400,73 @@ func TestSearchByTraceIDSuccess(t *testing.T) { func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { archiveReadMock := &spanstoremocks.Reader{} - server, readMock, _, _ := initializeTestServerWithOptions(querysvc.QueryServiceOptions{ + ts := initializeTestServerWithOptions(querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveReadMock, }) - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Twice() archiveReadMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(mockTrace, nil).Twice() var response structuredResponse - err := getJSON(server.URL+`/api/traces?traceID=1&traceID=2`, &response) + err := getJSON(ts.server.URL+`/api/traces?traceID=1&traceID=2`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 0) assert.Len(t, response.Data, 2) } func TestSearchByTraceIDNotFound(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, spanstore.ErrTraceNotFound).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces?traceID=1`, &response) + err := getJSON(ts.server.URL+`/api/traces?traceID=1`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 1) assert.Equal(t, structuredError{Msg: "trace not found", TraceID: ui.TraceID("0000000000000001")}, response.Errors[0]) } func TestSearchByTraceIDFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() whatsamattayou := "https://youtu.be/WrKFOCg13QQ" - readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). Return(nil, fmt.Errorf(whatsamattayou)).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces?traceID=1`, &response) + err := getJSON(ts.server.URL+`/api/traces?traceID=1`, &response) assert.EqualError(t, err, parsedError(500, whatsamattayou)) } func TestSearchModelConversionFailure(t *testing.T) { - server, readMock, _, _ := initializeTestServerWithOptions( + ts := initializeTestServerWithOptions( querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { return trace, errAdjustment }), }, ) - defer server.Close() - readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). + defer ts.server.Close() + ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return([]*model.Trace{mockTrace}, nil).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) + err := getJSON(ts.server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) assert.NoError(t, err) assert.Len(t, response.Errors, 1) assert.EqualValues(t, errAdjustment.Error(), response.Errors[0].Msg) } func TestSearchDBFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")). Return(nil, fmt.Errorf("whatsamattayou")).Once() var response structuredResponse - err := getJSON(server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) + err := getJSON(ts.server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response) assert.EqualError(t, err, parsedError(500, "whatsamattayou")) } @@ -487,24 +490,24 @@ func TestSearchFailures(t *testing.T) { } func testIndividualSearchFailures(t *testing.T, urlStr, errMsg string) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("Query", mock.AnythingOfType("spanstore.TraceQueryParameters")). + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("Query", mock.AnythingOfType("spanstore.TraceQueryParameters")). Return([]*model.Trace{}, nil).Once() var response structuredResponse - err := getJSON(server.URL+urlStr, &response) + err := getJSON(ts.server.URL+urlStr, &response) assert.EqualError(t, err, errMsg) } func TestGetServicesSuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() expectedServices := []string{"trifle", "bling"} - readMock.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + ts.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() var response structuredResponse - err := getJSON(server.URL+"/api/services", &response) + err := getJSON(ts.server.URL+"/api/services", &response) assert.NoError(t, err) actualServices := make([]string, len(expectedServices)) for i, s := range response.Data.([]interface{}) { @@ -514,23 +517,26 @@ func TestGetServicesSuccess(t *testing.T) { } func TestGetServicesStorageFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(nil, errStorage).Once() + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(nil, errStorage).Once() var response structuredResponse - err := getJSON(server.URL+"/api/services", &response) + err := getJSON(ts.server.URL+"/api/services", &response) assert.Error(t, err) } func TestGetOperationsSuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() expectedOperations := []spanstore.Operation{{Name: ""}, {Name: "get", SpanKind: "server"}} - readMock.On( + ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), - spanstore.OperationQueryParameters{ServiceName: "abc/trifle"}, + spanstore.OperationQueryParameters{ + ServiceName: "abc/trifle", + SpanKind: "server", + }, ).Return(expectedOperations, nil).Once() var response struct { @@ -541,7 +547,7 @@ func TestGetOperationsSuccess(t *testing.T) { Errors []structuredError `json:"errors"` } - err := getJSON(server.URL+"/api/operations?service=abc%2Ftrifle", &response) + err := getJSON(ts.server.URL+"/api/operations?service=abc%2Ftrifle&spanKind=server", &response) assert.NoError(t, err) assert.Equal(t, len(expectedOperations), len(response.Operations)) for i, op := range response.Operations { @@ -551,60 +557,257 @@ func TestGetOperationsSuccess(t *testing.T) { } func TestGetOperationsNoServiceName(t *testing.T) { - server, _, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() var response structuredResponse - err := getJSON(server.URL+"/api/operations", &response) + err := getJSON(ts.server.URL+"/api/operations", &response) assert.Error(t, err) } func TestGetOperationsStorageFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On( + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.OperationQueryParameters")).Return(nil, errStorage).Once() var response structuredResponse - err := getJSON(server.URL+"/api/operations?service=trifle", &response) + err := getJSON(ts.server.URL+"/api/operations?service=trifle", &response) assert.Error(t, err) } func TestGetOperationsLegacySuccess(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() + ts := initializeTestServer() + defer ts.server.Close() expectedOperationNames := []string{"", "get"} expectedOperations := []spanstore.Operation{ {Name: ""}, {Name: "get", SpanKind: "server"}, {Name: "get", SpanKind: "client"}} - readMock.On( + ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.OperationQueryParameters")).Return(expectedOperations, nil).Once() var response structuredResponse - err := getJSON(server.URL+"/api/services/abc%2Ftrifle/operations", &response) + err := getJSON(ts.server.URL+"/api/services/abc%2Ftrifle/operations", &response) assert.NoError(t, err) assert.ElementsMatch(t, expectedOperationNames, response.Data.([]interface{})) } func TestGetOperationsLegacyStorageFailure(t *testing.T) { - server, readMock, _ := initializeTestServer() - defer server.Close() - readMock.On( + ts := initializeTestServer() + defer ts.server.Close() + ts.spanReader.On( "GetOperations", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.OperationQueryParameters")).Return(nil, errStorage).Once() var response structuredResponse - err := getJSON(server.URL+"/api/services/trifle/operations", &response) + err := getJSON(ts.server.URL+"/api/services/trifle/operations", &response) assert.Error(t, err) } +func TestGetMetricsSuccess(t *testing.T) { + mr := &metricsmocks.Reader{} + apiHandlerOptions := []HandlerOption{ + HandlerOptions.MetricsQueryService(mr), + } + ts := initializeTestServer(apiHandlerOptions...) + defer ts.server.Close() + expectedLabel := &metrics.Label{ + Name: "service_name", + Value: "emailservice", + } + expectedMetricPoint := &metrics.MetricPoint{ + Timestamp: &types.Timestamp{Seconds: time.Now().Unix()}, + Value: &metrics.MetricPoint_GaugeValue{ + GaugeValue: &metrics.GaugeValue{ + Value: &metrics.GaugeValue_DoubleValue{DoubleValue: 0.9}, + }, + }, + } + expectedMetricsQueryResponse := &metrics.MetricFamily{ + Name: "the metrics", + Type: metrics.MetricType_GAUGE, + Metrics: []*metrics.Metric{ + { + Labels: []*metrics.Label{expectedLabel}, + MetricPoints: []*metrics.MetricPoint{expectedMetricPoint}, + }, + }, + } + + for _, tc := range []struct { + name string + urlPath string + mockedQueryMethod string + mockedQueryMethodParamType string + }{ + { + name: "latencies", + urlPath: "/api/metrics/latencies?service=emailservice&quantile=0.95", + mockedQueryMethod: "GetLatencies", + mockedQueryMethodParamType: "*metricsstore.LatenciesQueryParameters", + }, + { + name: "call rates", + urlPath: "/api/metrics/calls?service=emailservice", + mockedQueryMethod: "GetCallRates", + mockedQueryMethodParamType: "*metricsstore.CallRateQueryParameters", + }, + { + name: "error rates", + urlPath: "/api/metrics/errors?service=emailservice", + mockedQueryMethod: "GetErrorRates", + mockedQueryMethodParamType: "*metricsstore.ErrorRateQueryParameters", + }, + { + name: "error rates with pretty print", + urlPath: "/api/metrics/errors?service=emailservice&prettyPrint=true", + mockedQueryMethod: "GetErrorRates", + mockedQueryMethodParamType: "*metricsstore.ErrorRateQueryParameters", + }, + { + name: "error rates with spanKinds", + urlPath: "/api/metrics/errors?service=emailservice&spanKind=client", + mockedQueryMethod: "GetErrorRates", + mockedQueryMethodParamType: "*metricsstore.ErrorRateQueryParameters", + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Prepare + mr.On( + tc.mockedQueryMethod, + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfType(tc.mockedQueryMethodParamType), + ).Return(expectedMetricsQueryResponse, nil).Once() + + // Test + var response metrics.MetricFamily + err := getJSON(ts.server.URL+tc.urlPath, &response) + + // Verify + require.NoError(t, err) + assert.Equal(t, expectedMetricsQueryResponse, &response) + }) + } +} + +func TestMetricsReaderError(t *testing.T) { + metricsReader := &metricsmocks.Reader{} + apiHandlerOptions := []HandlerOption{ + HandlerOptions.MetricsQueryService(metricsReader), + } + ts := initializeTestServer(apiHandlerOptions...) + defer ts.server.Close() + + for _, tc := range []struct { + name string + urlPath string + mockedQueryMethod string + mockedQueryMethodParamType string + mockedResponse interface{} + wantErrorMessage string + }{ + { + urlPath: "/api/metrics/calls?service=emailservice", + mockedQueryMethod: "GetCallRates", + mockedQueryMethodParamType: "*metricsstore.CallRateQueryParameters", + mockedResponse: nil, + wantErrorMessage: "error fetching call rates", + }, + { + urlPath: "/api/metrics/minstep", + mockedQueryMethod: "GetMinStepDuration", + mockedQueryMethodParamType: "*metricsstore.MinStepDurationQueryParameters", + mockedResponse: time.Duration(0), + wantErrorMessage: "error fetching min step duration", + }, + } { + t.Run(tc.wantErrorMessage, func(t *testing.T) { + // Prepare + metricsReader.On( + tc.mockedQueryMethod, + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfType(tc.mockedQueryMethodParamType), + ).Return(tc.mockedResponse, fmt.Errorf(tc.wantErrorMessage)).Once() + + // Test + var response metrics.MetricFamily + err := getJSON(ts.server.URL+tc.urlPath, &response) + + // Verify + require.Error(t, err) + assert.Contains(t, err.Error(), tc.wantErrorMessage) + }) + } +} + +func TestMetricsQueryDisabled(t *testing.T) { + disabledReader, err := disabled.NewMetricsReader() + require.NoError(t, err) + + apiHandlerOptions := []HandlerOption{ + HandlerOptions.MetricsQueryService(disabledReader), + } + ts := initializeTestServer(apiHandlerOptions...) + defer ts.server.Close() + + for _, tc := range []struct { + name string + urlPath string + wantErrorMessage string + }{ + { + name: "metrics query disabled error returned when fetching latency metrics", + urlPath: "/api/metrics/latencies?service=emailservice&quantile=0.95", + wantErrorMessage: "metrics querying is currently disabled", + }, + { + name: "metrics query disabled error returned when fetching min step duration", + urlPath: "/api/metrics/minstep", + wantErrorMessage: "metrics querying is currently disabled", + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Test + var response interface{} + err := getJSON(ts.server.URL+tc.urlPath, &response) + + // Verify + require.Error(t, err) + assert.Contains(t, err.Error(), tc.wantErrorMessage) + }) + } +} + +func TestGetMinStep(t *testing.T) { + metricsReader := &metricsmocks.Reader{} + apiHandlerOptions := []HandlerOption{ + HandlerOptions.MetricsQueryService(metricsReader), + } + ts := initializeTestServer(apiHandlerOptions...) + defer ts.server.Close() + // Prepare + metricsReader.On( + "GetMinStepDuration", + mock.AnythingOfType("*context.valueCtx"), + mock.AnythingOfType("*metricsstore.MinStepDurationQueryParameters"), + ).Return(5*time.Millisecond, nil).Once() + + // Test + var response structuredResponse + err := getJSON(ts.server.URL+"/api/metrics/minstep", &response) + + // Verify + require.NoError(t, err) + assert.Equal(t, float64(5), response.Data) +} + // getJSON fetches a JSON document from a server via HTTP GET func getJSON(url string, out interface{}) error { req, err := http.NewRequest("GET", url, nil) @@ -652,6 +855,10 @@ func execJSON(req *http.Request, out interface{}) error { return nil } + if protoMessage, ok := out.(proto.Message); ok { + unmarshaler := new(jsonpb.Unmarshaler) + return unmarshaler.Unmarshal(resp.Body, protoMessage) + } decoder := json.NewDecoder(resp.Body) return decoder.Decode(out) } diff --git a/cmd/query/app/json_marshaler.go b/cmd/query/app/json_marshaler.go new file mode 100644 index 00000000000..3601a141b8b --- /dev/null +++ b/cmd/query/app/json_marshaler.go @@ -0,0 +1,56 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" +) + +type jsonMarshaler = func(writer io.Writer, response interface{}) error + +// newProtoJSONMarshaler returns a protobuf-friendly JSON marshaler that knows how to handle protobuf-specific +// field types such as "oneof" as well as dealing with NaNs which are not supported by JSON. +func newProtoJSONMarshaler(prettyPrint bool) jsonMarshaler { + marshaler := new(jsonpb.Marshaler) + if prettyPrint { + marshaler.Indent = prettyPrintIndent + } + return func(w io.Writer, response interface{}) error { + return marshaler.Marshal(w, response.(proto.Message)) + } +} + +// newStructJSONMarshaler returns a marshaler that uses the built-in encoding/json package for marshaling basic structs to JSON. +func newStructJSONMarshaler(prettyPrint bool) jsonMarshaler { + marshaler := json.Marshal + if prettyPrint { + marshaler = func(v interface{}) ([]byte, error) { + return json.MarshalIndent(v, "", prettyPrintIndent) + } + } + return func(w io.Writer, response interface{}) error { + resp, err := marshaler(response) + if err != nil { + return fmt.Errorf("failed marshalling HTTP response to JSON: %w", err) + } + _, err = w.Write(resp) + return err + } +} diff --git a/cmd/query/app/query_parser.go b/cmd/query/app/query_parser.go index 1aa5b6a9cf3..fae8928a080 100644 --- a/cmd/query/app/query_parser.go +++ b/cmd/query/app/query_parser.go @@ -17,6 +17,7 @@ package app import ( "encoding/json" + "errors" "fmt" "net/http" "strconv" @@ -24,6 +25,8 @@ import ( "time" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" + "github.com/jaegertracing/jaeger/storage/metricsstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -46,22 +49,68 @@ const ( var ( errMaxDurationGreaterThanMin = fmt.Errorf("'%s' should be greater than '%s'", maxDurationParam, minDurationParam) - // ErrServiceParameterRequired occurs when no service name is defined - ErrServiceParameterRequired = fmt.Errorf("parameter '%s' is required", serviceParam) + // errServiceParameterRequired occurs when no service name is defined. + errServiceParameterRequired = fmt.Errorf("parameter '%s' is required", serviceParam) + + jaegerToOtelSpanKind = map[string]string{ + "unspecified": metrics.SpanKind_SPAN_KIND_UNSPECIFIED.String(), + "internal": metrics.SpanKind_SPAN_KIND_INTERNAL.String(), + "server": metrics.SpanKind_SPAN_KIND_SERVER.String(), + "client": metrics.SpanKind_SPAN_KIND_CLIENT.String(), + "producer": metrics.SpanKind_SPAN_KIND_PRODUCER.String(), + "consumer": metrics.SpanKind_SPAN_KIND_CONSUMER.String(), + } ) -// queryParser handles the parsing of query parameters for traces -type queryParser struct { - traceQueryLookbackDuration time.Duration - timeNow func() time.Time +type ( + // queryParser handles the parsing of query parameters for traces. + queryParser struct { + traceQueryLookbackDuration time.Duration + timeNow func() time.Time + } + + traceQueryParameters struct { + spanstore.TraceQueryParameters + traceIDs []model.TraceID + } + + dependenciesQueryParameters struct { + endTs time.Time + lookback time.Duration + } + + durationParser = func(s string) (time.Duration, error) +) + +func newDurationStringParser() durationParser { + return func(s string) (time.Duration, error) { + return time.ParseDuration(s) + } } -type traceQueryParameters struct { - spanstore.TraceQueryParameters - traceIDs []model.TraceID +func newDurationUnitsParser(units time.Duration) durationParser { + return func(s string) (time.Duration, error) { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, err + } + return time.Duration(i) * (units), nil + } } -// parse takes a request and constructs a model of parameters +// parseTraceQueryParams takes a request and constructs a model of parameters. +// +// Why start/end parameters are expressed in microseconds: +// Span searches operate on span latencies, which are expressed as microseconds in the data model, hence why +// support for high accuracy in search query parameters is required. +// Microsecond precision is a legacy artifact from zipkin origins where timestamps and durations +// are in microseconds (see: https://zipkin.io/pages/instrumenting.html). +// +// Why duration parameters are expressed as duration strings like "1ms": +// The search UI itself does not insist on exact units because it supports string like 1ms. +// Go makes parsing duration strings like "1ms" very easy, hence why parsing of such strings is +// deferred to the backend rather than Jaeger UI. +// // Trace query syntax: // query ::= param | param '&' query // param ::= service | operation | limit | start | end | minDuration | maxDuration | tag | tags @@ -76,15 +125,15 @@ type traceQueryParameters struct { // key := strValue // keyValue := strValue ':' strValue // tags :== 'tags=' jsonMap -func (p *queryParser) parse(r *http.Request) (*traceQueryParameters, error) { +func (p *queryParser) parseTraceQueryParams(r *http.Request) (*traceQueryParameters, error) { service := r.FormValue(serviceParam) operation := r.FormValue(operationParam) - startTime, err := p.parseTime(startTimeParam, r) + startTime, err := p.parseTime(r, startTimeParam, time.Microsecond) if err != nil { return nil, err } - endTime, err := p.parseTime(endTimeParam, r) + endTime, err := p.parseTime(r, endTimeParam, time.Microsecond) if err != nil { return nil, err } @@ -104,12 +153,13 @@ func (p *queryParser) parse(r *http.Request) (*traceQueryParameters, error) { limit = int(limitParsed) } - minDuration, err := p.parseDuration(minDurationParam, r) + parser := newDurationStringParser() + minDuration, err := parseDuration(r, minDurationParam, parser, 0) if err != nil { return nil, err } - maxDuration, err := p.parseDuration(maxDurationParam, r) + maxDuration, err := parseDuration(r, maxDurationParam, parser, 0) if err != nil { return nil, err } @@ -143,36 +193,185 @@ func (p *queryParser) parse(r *http.Request) (*traceQueryParameters, error) { return traceQuery, nil } -func (p *queryParser) parseTime(param string, r *http.Request) (time.Time, error) { - value := r.FormValue(param) - if value == "" { - if param == startTimeParam { +// parseDependenciesQueryParams takes a request and constructs a model of dependencies query parameters. +// +// The dependencies API does not operate on the latency space, instead its timestamps are just time range selections, +// and the typical backend granularity of those is on the order of 15min or more. As such, microseconds aren't +// useful in this domain and milliseconds are sufficient for both times and durations. +func (p *queryParser) parseDependenciesQueryParams(r *http.Request) (dqp dependenciesQueryParameters, err error) { + dqp.endTs, err = p.parseTime(r, endTsParam, time.Millisecond) + if err != nil { + return dqp, err + } + + dqp.lookback, err = parseDuration(r, lookbackParam, newDurationUnitsParser(time.Millisecond), defaultDependencyLookbackDuration) + return dqp, err +} + +// parseMetricsQueryParams takes a request and constructs a model of metrics query parameters. +// +// Why the API is designed using an end time (endTs) and lookback: +// The typical usage of the metrics APIs is to view the most recent metrics from now looking +// back a certain period of time, given the value of metrics generally degrades with time. As such, the API +// is also designed to mirror the user interface inputs. +// +// Why times are expressed as unix milliseconds: +// - The minimum step size for Prometheus-compliant metrics backends is 1ms, +// hence millisecond precision on times is sufficient. +// - The metrics API is designed with one primary client in mind, the Jaeger UI. As it is a React.js application, +// the maximum supported built-in time precision is milliseconds, making it a convenient precision to use for such a client. +// +// Why durations are expressed as unix milliseconds: +// - Given the endTs time is expressed as milliseconds, it follows that lookback durations should use the +// same time units to compute the start time. +// - As above, the minimum step size for Prometheus-compliant metrics backends is 1ms. +// - Other durations are in milliseconds to maintain consistency of units with other parameters in the metrics APIs. +// - As the primary client for the metrics API is the Jaeger UI, it is programmatically simpler to supply the +// integer representations of durations in milliseconds rather than the human-readable representation such as "1ms". +// +// Metrics query syntax: +// query ::= services , [ '&' optionalParams ] +// optionalParams := param | param '&' optionalParams +// param ::= groupByOperation | endTs | lookback | step | ratePer | spanKinds +// services ::= service | service '&' services +// service ::= 'service=' strValue +// groupByOperation ::= 'groupByOperation=' boolValue +// endTs ::= 'endTs=' intValue in unix milliseconds +// lookback ::= 'lookback=' intValue duration in milliseconds +// step ::= 'step=' intValue duration in milliseconds +// ratePer ::= 'ratePer=' intValue duration in milliseconds +// spanKinds ::= spanKind | spanKind '&' spanKinds +// spanKind ::= 'spanKind=' spanKindType +// spanKindType ::= "unspecified" | "internal" | "server" | "client" | "producer" | "consumer" +func (p *queryParser) parseMetricsQueryParams(r *http.Request) (bqp metricsstore.BaseQueryParameters, err error) { + query := r.URL.Query() + services, ok := query[serviceParam] + if !ok { + return bqp, newParseError(errors.New("please provide at least one service name"), serviceParam) + } + bqp.ServiceNames = services + + bqp.GroupByOperation, err = parseBool(r, groupByOperationParam) + if err != nil { + return bqp, err + } + bqp.SpanKinds, err = parseSpanKinds(r, spanKindParam, defaultMetricsSpanKinds) + if err != nil { + return bqp, err + } + endTs, err := p.parseTime(r, endTsParam, time.Millisecond) + if err != nil { + return bqp, err + } + parser := newDurationUnitsParser(time.Millisecond) + lookback, err := parseDuration(r, lookbackParam, parser, defaultMetricsQueryLookbackDuration) + if err != nil { + return bqp, err + } + step, err := parseDuration(r, stepParam, parser, defaultMetricsQueryStepDuration) + if err != nil { + return bqp, err + } + ratePer, err := parseDuration(r, rateParam, parser, defaultMetricsQueryRateDuration) + if err != nil { + return bqp, err + } + bqp.EndTime = &endTs + bqp.Lookback = &lookback + bqp.Step = &step + bqp.RatePer = &ratePer + return bqp, err +} + +// parseTime parses the time parameter of an HTTP request that is represented the number of "units" since epoch. +// If the time parameter is empty, the current time will be returned. +func (p *queryParser) parseTime(r *http.Request, paramName string, units time.Duration) (time.Time, error) { + formValue := r.FormValue(paramName) + if formValue == "" { + if paramName == startTimeParam { return p.timeNow().Add(-1 * p.traceQueryLookbackDuration), nil } return p.timeNow(), nil } - micros, err := strconv.ParseInt(value, 10, 64) + t, err := strconv.ParseInt(formValue, 10, 64) if err != nil { - return time.Time{}, err + return time.Time{}, newParseError(err, paramName) } - return time.Unix(0, 0).Add(time.Duration(micros) * time.Microsecond), nil + return time.Unix(0, 0).Add(time.Duration(t) * units), nil } -func (p *queryParser) parseDuration(durationParam string, r *http.Request) (time.Duration, error) { - durationInput := r.FormValue(durationParam) - if len(durationInput) > 0 { - duration, err := time.ParseDuration(durationInput) - if err != nil { - return 0, fmt.Errorf("cannot not parse %s: %w", durationParam, err) +// parseDuration parses the duration parameter of an HTTP request using the provided durationParser. +// If the duration parameter is empty, the given defaultDuration will be returned. +func parseDuration(r *http.Request, paramName string, parse durationParser, defaultDuration time.Duration) (time.Duration, error) { + formValue := r.FormValue(paramName) + if formValue == "" { + return defaultDuration, nil + } + d, err := parse(formValue) + if err != nil { + return 0, newParseError(err, paramName) + } + return d, nil +} + +func parseBool(r *http.Request, paramName string) (b bool, err error) { + formVal := r.FormValue(paramName) + if formVal == "" { + return false, nil + } + b, err = strconv.ParseBool(formVal) + if err != nil { + return b, newParseError(err, paramName) + } + return b, nil +} + +// parseSpanKindParam parses the input span kinds to filter for in the metrics query. +// +// Valid input span kinds include: +// - "unspecified": when no span kind specified in span. +// - "internal": internal operation within an application, instead of application boundaries. +// - "server": server-side handling span. +// - "client": outbound service call span. +// - "producer": producer sending a message to broker. +// - "consumer": consumer consuming a message from a broker. +// +// The output span kinds are the string representations from the OpenTelemetry model/proto/metrics/otelspankind.proto. +// That is, the following map to the above valid inputs: +// - "SPAN_KIND_UNSPECIFIED" +// - "SPAN_KIND_INTERNAL" +// - "SPAN_KIND_SERVER" +// - "SPAN_KIND_CLIENT" +// - "SPAN_KIND_PRODUCER" +// - "SPAN_KIND_CONSUMER" +func parseSpanKinds(r *http.Request, paramName string, defaultSpanKinds []string) ([]string, error) { + query := r.URL.Query() + jaegerSpanKinds, ok := query[paramName] + if !ok { + return defaultSpanKinds, nil + } + otelSpanKinds, err := mapSpanKindsToOpenTelemetry(jaegerSpanKinds) + if err != nil { + return defaultSpanKinds, newParseError(err, paramName) + } + return otelSpanKinds, nil +} + +func mapSpanKindsToOpenTelemetry(spanKinds []string) ([]string, error) { + otelSpanKinds := make([]string, len(spanKinds)) + for i, spanKind := range spanKinds { + if v, ok := jaegerToOtelSpanKind[spanKind]; ok { + otelSpanKinds[i] = v + } else { + return otelSpanKinds, fmt.Errorf("unsupported span kind: '%s'", spanKind) } - return duration, nil } - return 0, nil + return otelSpanKinds, nil } func (p *queryParser) validateQuery(traceQuery *traceQueryParameters) error { if len(traceQuery.traceIDs) == 0 && traceQuery.ServiceName == "" { - return ErrServiceParameterRequired + return errServiceParameterRequired } if traceQuery.DurationMin != 0 && traceQuery.DurationMax != 0 { if traceQuery.DurationMax < traceQuery.DurationMin { @@ -195,7 +394,7 @@ func (p *queryParser) parseTags(simpleTags []string, jsonTags []string) (map[str for _, tags := range jsonTags { var fromJSON map[string]string if err := json.Unmarshal([]byte(tags), &fromJSON); err != nil { - return nil, fmt.Errorf("malformed 'tags' parameter, cannot unmarshal JSON: %s", err) + return nil, fmt.Errorf("malformed 'tags' parameter, cannot unmarshal JSON: %w", err) } for k, v := range fromJSON { retMe[k] = v @@ -203,3 +402,7 @@ func (p *queryParser) parseTags(simpleTags []string, jsonTags []string) (map[str } return retMe, nil } + +func newParseError(err error, paramName string) error { + return fmt.Errorf("unable to parse param '%s': %w", paramName, err) +} diff --git a/cmd/query/app/query_parser_test.go b/cmd/query/app/query_parser_test.go index bcf01af6b35..1f617cd10ee 100644 --- a/cmd/query/app/query_parser_test.go +++ b/cmd/query/app/query_parser_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/require" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -44,8 +45,8 @@ func TestParseTraceQuery(t *testing.T) { {"x?service=service&start=string", errParseInt, nil}, {"x?service=service&end=string", errParseInt, nil}, {"x?service=service&limit=string", errParseInt, nil}, - {"x?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20", `cannot not parse minDuration: time: missing unit in duration "?20"?$`, nil}, - {"x?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20s&maxDuration=30", `cannot not parse maxDuration: time: missing unit in duration "?30"?$`, nil}, + {"x?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20", `unable to parse param 'minDuration': time: missing unit in duration "?20"?$`, nil}, + {"x?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20s&maxDuration=30", `unable to parse param 'maxDuration': time: missing unit in duration "?30"?$`, nil}, {"x?service=service&start=0&end=0&operation=operation&limit=200&tag=k:v&tag=x:y&tag=k&log=k:v&log=k", `malformed 'tag' parameter, expecting key:value, received: k`, nil}, {"x?service=service&start=0&end=0&operation=operation&limit=200&minDuration=25s&maxDuration=1s", `'maxDuration' should be greater than 'minDuration'`, nil}, {"x?service=service&start=0&end=0&operation=operation&limit=200&tag=k:v&tag=x:y", noErr, @@ -155,7 +156,7 @@ func TestParseTraceQuery(t *testing.T) { return timeNow }, } - actualQuery, err := parser.parse(request) + actualQuery, err := parser.parseTraceQueryParams(request) if test.errMsg == "" { assert.NoError(t, err) if !assert.Equal(t, test.expectedQuery, actualQuery) { @@ -171,3 +172,151 @@ func TestParseTraceQuery(t *testing.T) { }) } } + +func TestParseBool(t *testing.T) { + for _, tc := range []struct { + input string + want bool + }{ + {"t", true}, + {"true", true}, + {"TRUE", true}, + {"True", true}, + {"T", true}, + {"1", true}, + {"f", false}, + {"false", false}, + {"FALSE", false}, + {"False", false}, + {"F", false}, + {"0", false}, + } { + t.Run(tc.input, func(t *testing.T) { + request, err := http.NewRequest(http.MethodGet, "x?service=foo&groupByOperation="+tc.input, nil) + require.NoError(t, err) + timeNow := time.Now() + parser := &queryParser{ + timeNow: func() time.Time { + return timeNow + }, + } + mqp, err := parser.parseMetricsQueryParams(request) + require.NoError(t, err) + assert.Equal(t, tc.want, mqp.GroupByOperation) + }) + } +} + +func TestParseDuration(t *testing.T) { + request, err := http.NewRequest(http.MethodGet, "x?service=foo&step=1000", nil) + require.NoError(t, err) + parser := &queryParser{ + timeNow: func() time.Time { + return time.Now() + }, + } + mqp, err := parser.parseMetricsQueryParams(request) + require.NoError(t, err) + assert.Equal(t, time.Second, *mqp.Step) +} + +func TestParseRepeatedServices(t *testing.T) { + request, err := http.NewRequest(http.MethodGet, "x?service=foo&service=bar", nil) + require.NoError(t, err) + parser := &queryParser{ + timeNow: func() time.Time { + return time.Now() + }, + } + mqp, err := parser.parseMetricsQueryParams(request) + require.NoError(t, err) + assert.Equal(t, []string{"foo", "bar"}, mqp.ServiceNames) +} + +func TestParseRepeatedSpanKinds(t *testing.T) { + q := "x?service=foo&spanKind=unspecified&spanKind=internal&spanKind=server&spanKind=client&spanKind=producer&spanKind=consumer" + request, err := http.NewRequest(http.MethodGet, q, nil) + require.NoError(t, err) + parser := &queryParser{ + timeNow: time.Now, + } + mqp, err := parser.parseMetricsQueryParams(request) + require.NoError(t, err) + assert.Equal(t, []string{ + metrics.SpanKind_SPAN_KIND_UNSPECIFIED.String(), + metrics.SpanKind_SPAN_KIND_INTERNAL.String(), + metrics.SpanKind_SPAN_KIND_SERVER.String(), + metrics.SpanKind_SPAN_KIND_CLIENT.String(), + metrics.SpanKind_SPAN_KIND_PRODUCER.String(), + metrics.SpanKind_SPAN_KIND_CONSUMER.String(), + }, mqp.SpanKinds) +} + +func TestParameterErrors(t *testing.T) { + ts := initializeTestServer() + defer ts.server.Close() + + for _, tc := range []struct { + name string + urlPath string + mockedQueryMethod string + mockedQueryMethodParamType string + wantErrorMessage string + }{ + { + name: "missing services", + urlPath: "/api/metrics/calls", + wantErrorMessage: `unable to parse param 'service': please provide at least one service name`, + }, + { + name: "invalid group by operation", + urlPath: "/api/metrics/calls?service=emailservice&groupByOperation=foo", + wantErrorMessage: `unable to parse param 'groupByOperation': strconv.ParseBool: parsing \"foo\": invalid syntax`, + }, + { + name: "invalid span kinds", + urlPath: "/api/metrics/calls?service=emailservice&spanKind=foo", + wantErrorMessage: `unable to parse param 'spanKind': unsupported span kind: 'foo'`, + }, + { + name: "empty span kind", + urlPath: "/api/metrics/calls?service=emailservice&spanKind=", + wantErrorMessage: `unable to parse param 'spanKind': unsupported span kind: ''`, + }, + { + name: "invalid quantile parameter", + urlPath: "/api/metrics/latencies?service=emailservice&quantile=foo", + wantErrorMessage: `unable to parse param 'quantile': strconv.ParseFloat: parsing \"foo\": invalid syntax`, + }, + { + name: "invalid endTs parameter", + urlPath: "/api/metrics/calls?service=emailservice&endTs=foo", + wantErrorMessage: `unable to parse param 'endTs': strconv.ParseInt: parsing \"foo\": invalid syntax`, + }, + { + name: "invalid lookback parameter", + urlPath: "/api/metrics/calls?service=emailservice&lookback=foo", + wantErrorMessage: `unable to parse param 'lookback': strconv.ParseInt: parsing \"foo\": invalid syntax`, + }, + { + name: "invalid step parameter", + urlPath: "/api/metrics/calls?service=emailservice&step=foo", + wantErrorMessage: `unable to parse param 'step': strconv.ParseInt: parsing \"foo\": invalid syntax`, + }, + { + name: "invalid ratePer parameter", + urlPath: "/api/metrics/calls?service=emailservice&ratePer=foo", + wantErrorMessage: `unable to parse param 'ratePer': strconv.ParseInt: parsing \"foo\": invalid syntax`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Test + var response metrics.MetricFamily + err := getJSON(ts.server.URL+tc.urlPath, &response) + + // Verify + require.Error(t, err) + assert.Contains(t, err.Error(), tc.wantErrorMessage) + }) + } +} diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index be9a562d7b7..3167091b5bc 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -126,10 +126,10 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. } func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) { - // TODO: Add HandlerOptions.MetricsQueryService apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(logger), HandlerOptions.Tracer(tracer), + HandlerOptions.MetricsQueryService(metricsQuerySvc), } apiHandler := NewAPIHandler(