Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor][query] Propagate RawTraces flag to query service #6438

Merged
merged 17 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
},
RawTraces: request.GetRawTraces(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
Expand Down Expand Up @@ -66,11 +69,14 @@ func (h *Handler) internalFindTraces(
return errors.New("start time min and max are required parameters")
}

queryParams := &spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetSearchDepth()),
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
NumTraces: int(query.GetSearchDepth()),
},
RawTraces: query.GetRawTraces(),
}
if ts := query.GetStartTimeMin(); !ts.IsZero() {
queryParams.StartTimeMin = ts
Expand Down
54 changes: 37 additions & 17 deletions cmd/query/app/apiv3/http_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
)

const (
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
paramTimeMax = "query.start_time_max"
paramNumTraces = "query.num_traces"
paramDurationMin = "query.duration_min"
paramDurationMax = "query.duration_max"
paramTraceID = "trace_id" // get trace by ID
paramStartTime = "start_time"
paramEndTime = "end_time"
paramRawTraces = "raw_traces"
paramServiceName = "query.service_name" // find traces
paramOperationName = "query.operation_name"
paramTimeMin = "query.start_time_min"
paramTimeMax = "query.start_time_max"
paramNumTraces = "query.num_traces"
paramDurationMin = "query.duration_min"
paramDurationMax = "query.duration_max"
paramQueryRawTraces = "query.raw_traces"

routeGetTrace = "/api/v3/traces/{" + paramTraceID + "}"
routeFindTraces = "/api/v3/traces"
Expand Down Expand Up @@ -135,8 +137,10 @@
if h.tryParamError(w, err, paramTraceID) {
return
}
request := spanstore.GetTraceParameters{
TraceID: traceID,
request := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
},
}
http_query := r.URL.Query()
startTime := http_query.Get(paramStartTime)
Expand All @@ -155,6 +159,13 @@
}
request.EndTime = timeParsed.UTC()
}
if r := http_query.Get(paramRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
if h.tryParamError(w, err, paramRawTraces) {
return
}
request.RawTraces = rawTraces

Check warning on line 167 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L167

Added line #L167 was not covered by tests
}
trc, err := h.QueryService.GetTrace(r.Context(), request)
if h.tryHandleError(w, err, http.StatusInternalServerError) {
return
Expand All @@ -180,11 +191,13 @@
h.returnSpans(spans, w)
}

func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*spanstore.TraceQueryParameters, bool) {
queryParams := &spanstore.TraceQueryParameters{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
func (h *HTTPGateway) parseFindTracesQuery(q url.Values, w http.ResponseWriter) (*querysvc.TraceQueryParameters, bool) {
queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: q.Get(paramServiceName),
OperationName: q.Get(paramOperationName),
Tags: nil, // most curiously not supported by grpc-gateway
},
}

timeMin := q.Get(paramTimeMin)
Expand Down Expand Up @@ -227,6 +240,13 @@
}
queryParams.DurationMax = dur
}
if r := q.Get(paramQueryRawTraces); r != "" {
rawTraces, err := strconv.ParseBool(r)
if h.tryParamError(w, err, paramQueryRawTraces) {
return nil, true
}
queryParams.RawTraces = rawTraces

Check warning on line 248 in cmd/query/app/apiv3/http_gateway.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/apiv3/http_gateway.go#L248

Added line #L248 was not covered by tests
}
return queryParams, false
}

Expand Down
16 changes: 16 additions & 0 deletions cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func TestHTTPGatewayGetTraceMalformedInputErrors(t *testing.T) {
requestUrl: "/api/v3/traces/123?end_time=xyz",
expectedError: "malformed parameter end_time",
},
{
name: "TestGetTraceWithInvalidRawTraces",
requestUrl: "/api/v3/traces/123?raw_traces=foobar",
expectedError: "malformed parameter raw_traces",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -227,6 +232,7 @@ func mockFindQueries() (url.Values, *spanstore.TraceQueryParameters) {
func TestHTTPGatewayFindTracesErrors(t *testing.T) {
goodTimeV := time.Now()
goodTime := goodTimeV.Format(time.RFC3339Nano)
goodDuration := "1s"
timeRangeErr := fmt.Sprintf("%s and %s are required", paramTimeMin, paramTimeMax)
testCases := []struct {
name string
Expand Down Expand Up @@ -272,6 +278,16 @@ func TestHTTPGatewayFindTracesErrors(t *testing.T) {
params: map[string]string{paramTimeMin: goodTime, paramTimeMax: goodTime, paramDurationMax: "NaN"},
expErr: paramDurationMax,
},
{
name: "bad raw traces",
params: map[string]string{
paramTimeMin: goodTime,
paramTimeMax: goodTime,
paramDurationMax: goodDuration,
paramQueryRawTraces: "foobar",
},
expErr: paramQueryRawTraces,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
33 changes: 20 additions & 13 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ func (g *GRPCHandler) GetTrace(r *api_v2.GetTraceRequest, stream api_v2.QuerySer
if r.TraceID == (model.TraceID{}) {
return errUninitializedTraceID
}
query := spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
query := querysvc.GetTraceParameters{
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
},
RawTraces: r.RawTraces,
}
trace, err := g.queryService.GetTrace(stream.Context(), query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
Expand All @@ -119,6 +122,7 @@ func (g *GRPCHandler) ArchiveTrace(ctx context.Context, r *api_v2.ArchiveTraceRe
StartTime: r.StartTime,
EndTime: r.EndTime,
}

err := g.queryService.ArchiveTrace(ctx, query)
if errors.Is(err, spanstore.ErrTraceNotFound) {
g.logger.Warn(msgTraceNotFound, zap.Stringer("id", r.TraceID), zap.Error(err))
Expand All @@ -141,15 +145,18 @@ func (g *GRPCHandler) FindTraces(r *api_v2.FindTracesRequest, stream api_v2.Quer
if query == nil {
return status.Errorf(codes.InvalidArgument, "missing query")
}
queryParams := spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: int(query.SearchDepth),
queryParams := querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: int(query.SearchDepth),
},
RawTraces: query.RawTraces,
}
traces, err := g.queryService.FindTraces(stream.Context(), &queryParams)
if err != nil {
Expand Down
43 changes: 30 additions & 13 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
if len(tQuery.traceIDs) > 0 {
tracesFromStorage, uiErrors, err = aH.tracesByIDs(
r.Context(),
tQuery.traceIDs,
tQuery.StartTimeMin,
tQuery.StartTimeMax,
tQuery,
)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
Expand Down Expand Up @@ -264,14 +262,17 @@ func (aH *APIHandler) tracesToResponse(traces []*model.Trace, adjust bool, uiErr
}
}

func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID, startTime time.Time, endTime time.Time) ([]*model.Trace, []structuredError, error) {
func (aH *APIHandler) tracesByIDs(ctx context.Context, traceQuery *traceQueryParameters) ([]*model.Trace, []structuredError, error) {
var traceErrors []structuredError
retMe := make([]*model.Trace, 0, len(traceIDs))
for _, traceID := range traceIDs {
query := spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: startTime,
EndTime: endTime,
retMe := make([]*model.Trace, 0, len(traceQuery.traceIDs))
for _, traceID := range traceQuery.traceIDs {
query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: traceQuery.StartTimeMin,
EndTime: traceQuery.StartTimeMax,
},
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
RawTraces: traceQuery.RawTraces,
}
if trc, err := aH.queryService.GetTrace(ctx, query); err != nil {
if !errors.Is(err, spanstore.ErrTraceNotFound) {
Expand Down Expand Up @@ -428,8 +429,19 @@ func (aH *APIHandler) parseMicroseconds(w http.ResponseWriter, r *http.Request,
return time.Time{}, true
}

func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Request) (spanstore.GetTraceParameters, bool) {
query := spanstore.GetTraceParameters{}
func (aH *APIHandler) parseBool(w http.ResponseWriter, r *http.Request, boolKey string) (value bool, isValid bool) {
if boolString := r.FormValue(boolKey); boolString != "" {
b, err := parseBool(r, boolKey)
if aH.handleError(w, err, http.StatusBadRequest) {
return false, false
}
return b, true
}
return false, true
}

func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Request) (querysvc.GetTraceParameters, bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: this function is poorly designed. A better design is query_parser.parseTraceQueryParams, which simply returns errors. The aH.handleError should really be used only rarely, directly in the handler function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to address this in a future PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's one of these nice to have clean-ups, I'd rather have it as good-first-issue for someone other than you spending time on it.

query := querysvc.GetTraceParameters{}
traceID, ok := aH.parseTraceID(w, r)
if !ok {
return query, false
Expand All @@ -442,9 +454,14 @@ func (aH *APIHandler) parseGetTraceParameters(w http.ResponseWriter, r *http.Req
if !ok {
return query, false
}
raw, ok := aH.parseBool(w, r, rawParam)
if !ok {
return query, false
}
query.TraceID = traceID
query.StartTime = startTime
query.EndTime = endTime
query.RawTraces = raw
return query, true
}

Expand Down Expand Up @@ -485,7 +502,7 @@ func (aH *APIHandler) archiveTrace(w http.ResponseWriter, r *http.Request) {
}

// QueryService.ArchiveTrace can now archive this traceID.
err := aH.queryService.ArchiveTrace(r.Context(), query)
err := aH.queryService.ArchiveTrace(r.Context(), query.GetTraceParameters)
if errors.Is(err, spanstore.ErrTraceNotFound) {
aH.handleError(w, err, http.StatusNotFound)
return
Expand Down
37 changes: 37 additions & 0 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,43 @@ func TestGetTraceBadTimeWindow(t *testing.T) {
}
}

func TestGetTraceWithRawTracesParameter(t *testing.T) {
// TODO: extend the test cases to ensure raw traces are obtained
// when the flag is set once the differentiating logic has been implemented
tests := []struct {
rawTraces bool
}{
{
rawTraces: true,
},
{
rawTraces: false,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) {
ts := initializeTestServer(t)
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), spanstore.GetTraceParameters{
TraceID: mockTraceID,
}).Return(mockTrace, nil).Once()

var response structuredResponse
err := getJSON(fmt.Sprintf("%s/api/traces/%s?raw=%v", ts.server.URL, mockTraceID.String(), test.rawTraces), &response)
require.NoError(t, err)
assert.Empty(t, response.Errors)
})
}
}

func TestGetTraceBadRawTracesFlag(t *testing.T) {
ts := initializeTestServer(t)
var response structuredResponse
err := getJSON(ts.server.URL+`/api/traces/123456?raw=foobar`, &response)
require.Error(t, err)
require.ErrorContains(t, err, "400 error from server")
require.ErrorContains(t, err, "unable to parse param 'raw'")
}

func TestSearchSuccess(t *testing.T) {
ts := initializeTestServer(t)
ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Expand Down
Loading
Loading