Skip to content

Commit

Permalink
fix: Correctly encode step when translating proto to http internally (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Jun 7, 2024
1 parent 691b174 commit 740551b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 3 deletions.
57 changes: 57 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,33 @@ func (m *ShardsRequest) LogToSpan(sp opentracing.Span) {
sp.LogFields(fields...)
}

func (m *DetectedFieldsRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *DetectedFieldsRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *DetectedFieldsRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *DetectedFieldsRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("step", time.Duration(m.Step).String()),
otlog.String("field_limit", fmt.Sprintf("%d", m.FieldLimit)),
otlog.String("line_limit", fmt.Sprintf("%d", m.LineLimit)),
}
sp.LogFields(fields...)
}

func (m *QueryPatternsRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QueryPatternsRequest) WithStartEnd(start, end time.Time) definitions.Request {
Expand Down Expand Up @@ -534,3 +561,33 @@ func (m *QueryPatternsRequest) LogToSpan(sp opentracing.Span) {
}
sp.LogFields(fields...)
}

func (m *DetectedLabelsRequest) GetStep() int64 { return 0 }

func (m *DetectedLabelsRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *DetectedLabelsRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *DetectedLabelsRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *DetectedLabelsRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
return m.WithStartEnd(start, end).(resultscache.Request)
}

func (m *DetectedLabelsRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
}
sp.LogFields(fields...)
}
10 changes: 8 additions & 2 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,10 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"line_limit": []string{fmt.Sprintf("%d", request.GetLineLimit())},
"field_limit": []string{fmt.Sprintf("%d", request.GetFieldLimit())},
"step": []string{fmt.Sprintf("%d", request.GetStep())},
}

if request.Step != 0 {
params["step"] = []string{fmt.Sprintf("%f", float64(request.Step)/float64(1e3))}
}

u := &url.URL{
Expand All @@ -940,7 +943,10 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
"query": []string{request.GetQuery()},
"start": []string{fmt.Sprintf("%d", request.Start.UnixNano())},
"end": []string{fmt.Sprintf("%d", request.End.UnixNano())},
"step": []string{fmt.Sprintf("%d", request.GetStep())},
}

if request.Step != 0 {
params["step"] = []string{fmt.Sprintf("%f", float64(request.Step)/float64(1e3))}
}

u := &url.URL{
Expand Down
53 changes: 52 additions & 1 deletion pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
strings "strings"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -202,6 +202,57 @@ func Test_codec_EncodeDecodeRequest(t *testing.T) {
Step: 30 * 1e3, // step is expected in ms; default is 0 or no step
AggregateBy: "series",
}, false},
{"detected_fields", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(ctx, &DetectedFieldsRequest{
logproto.DetectedFieldsRequest{
Query: `{foo="bar"}`,
Start: start,
End: end,
Step: 30 * 1e3, // step is expected in ms; default is 0 or no step
LineLimit: 100,
FieldLimit: 100,
},
"/loki/api/v1/detected_fields",
})
}, &DetectedFieldsRequest{
logproto.DetectedFieldsRequest{
Query: `{foo="bar"}`,
Start: start,
End: end,
Step: 30 * 1e3, // step is expected in ms; default is 0 or no step
LineLimit: 100,
FieldLimit: 100,
},
"/loki/api/v1/detected_fields",
}, false},
{"patterns", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(ctx, &logproto.QueryPatternsRequest{
Start: start,
End: end,
Step: 30 * 1e3, // step is expected in ms
})
}, &logproto.QueryPatternsRequest{
Start: start,
End: end,
Step: 30 * 1e3, // step is expected in ms; default is 0 or no step
}, false},
{"detected_labels", func() (*http.Request, error) {
return DefaultCodec.EncodeRequest(ctx, &DetectedLabelsRequest{
"/loki/api/v1/detected_labels",
logproto.DetectedLabelsRequest{
Query: `{foo="bar"}`,
Start: start,
End: end,
},
})
}, &DetectedLabelsRequest{
"/loki/api/v1/detected_labels",
logproto.DetectedLabelsRequest{
Query: `{foo="bar"}`,
Start: start,
End: end,
},
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 740551b

Please sign in to comment.