Skip to content

Commit

Permalink
feat: hook up samples endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed May 31, 2024
1 parent eb84303 commit 33ead60
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/loghttp/patterns.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ func ParsePatternsQuery(r *http.Request) (*logproto.QueryPatternsRequest, error)

return req, nil
}

35 changes: 35 additions & 0 deletions pkg/loghttp/samples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package loghttp

import (
"net/http"

"github.com/grafana/loki/v3/pkg/logproto"
)

func ParseSamplesQuery(r *http.Request) (*logproto.QuerySamplesRequest, error) {
req := &logproto.QuerySamplesRequest{}

req.Query = query(r)
start, end, err := bounds(r)
if err != nil {
return nil, err
}
req.Start = start
req.End = end

calculatedStep, err := step(r, start, end)
if err != nil {
return nil, err
}
if calculatedStep <= 0 {
return nil, errZeroOrNegativeStep
}
// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (req.End.Sub(req.Start) / calculatedStep) > 11000 {
return nil, errStepTooSmall
}
req.Step = calculatedStep.Milliseconds()

return req, nil
}
122 changes: 122 additions & 0 deletions pkg/loghttp/samples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package loghttp

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
)

func TestParseSamplesQuery(t *testing.T) {
t.Parallel()

tests := []struct {
name string
path string
want *logproto.QuerySamplesRequest
wantErr bool
}{
{
name: "should correctly parse valid params",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=5s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (5 * time.Second).Milliseconds(),
},
},
{
name: "should default empty step param to sensible step for the range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should default start to zero for empty start param",
path: "/loki/api/v1/patterns?query={}&end=3600000000000",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(0, 0),
End: time.Unix(3600, 0),
Step: (14 * time.Second).Milliseconds(),
},
},
{
name: "should accept step with no units as seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Second).Milliseconds(),
},
},
{
name: "should accept step as string duration in seconds",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=15s",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (15 * time.Second).Milliseconds(),
},
},
{
name: "should correctly parse long duration for step",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=10h",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(3600, 0),
Step: (10 * time.Hour).Milliseconds(),
},
},
{
name: "should reject negative step value",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=-5s",
want: nil,
wantErr: true,
},
{
name: "should reject very small step for big range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=3600000000000&step=50ms",
want: nil,
wantErr: true,
},
{
name: "should accept very small step for small range",
path: "/loki/api/v1/patterns?query={}&start=100000000000&end=110000000000&step=50ms",
want: &logproto.QuerySamplesRequest{
Query: "{}",
Start: time.Unix(100, 0),
End: time.Unix(110, 0),
Step: (50 * time.Millisecond).Milliseconds(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, tt.path, nil)
require.NoError(t, err)
err = req.ParseForm()
require.NoError(t, err)

got, err := ParseSamplesQuery(req)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
assert.Equalf(t, tt.want, got, "Incorrect response from input path: %s", tt.path)
})
}
}
33 changes: 33 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
return result
}

func FromMetricsToLabels(metric model.Metric) labels.Labels {
return FromLabelAdaptersToLabels(FromMetricsToLabelAdapters(metric))
}

type byLabel []LabelAdapter

func (s byLabel) Len() int { return len(s) }
Expand Down Expand Up @@ -534,3 +538,32 @@ func (m *QueryPatternsRequest) LogToSpan(sp opentracing.Span) {
}
sp.LogFields(fields...)
}

func (m *QuerySamplesRequest) GetCachingOptions() (res definitions.CachingOptions) { return }

func (m *QuerySamplesRequest) WithStartEnd(start, end time.Time) definitions.Request {
clone := *m
clone.Start = start
clone.End = end
return &clone
}

func (m *QuerySamplesRequest) WithQuery(query string) definitions.Request {
clone := *m
clone.Query = query
return &clone
}

func (m *QuerySamplesRequest) WithStartEndForCache(start, end time.Time) resultscache.Request {
return m.WithStartEnd(start, end).(resultscache.Request)
}

func (m *QuerySamplesRequest) LogToSpan(sp opentracing.Span) {
fields := []otlog.Field{
otlog.String("query", m.GetQuery()),
otlog.String("start", m.Start.String()),
otlog.String("end", m.End.String()),
otlog.String("step", time.Duration(m.Step).String()),
}
sp.LogFields(fields...)
}
42 changes: 42 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package logproto

import (
"encoding/json"
"sort"
"strings"
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.

"github.com/buger/jsonparser"
"github.com/cespare/xxhash/v2"
"github.com/dustin/go-humanize"
jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -192,3 +194,43 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) {
func NewPatternSeries(pattern string, samples []*PatternSample) *PatternSeries {
return &PatternSeries{Pattern: pattern, Samples: samples}
}

// UnmarshalJSON implements the json.Unmarshaler interface.
// QuerySamplesResponse json representation is different from the proto
func (r *QuerySamplesResponse) UnmarshalJSON(data []byte) error {
return jsonparser.ObjectEach(
data,
func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
if string(key) == "data" {
var m []model.SampleStream
if err := json.Unmarshal(value, &m); err != nil {
return err
}
series := make([]Series, len(m))

for i, s := range m {
lbls := FromMetricsToLabels(s.Metric)

newSeries := Series{
Labels: s.Metric.String(),
StreamHash: lbls.Hash(),
Samples: make([]Sample, len(s.Values)),
}

for j, samplePair := range s.Values {
newSeries.Samples[j] = Sample{
Timestamp: samplePair.Timestamp.UnixNano(),
Value: float64(samplePair.Value),
}
}

series[i] = newSeries
}

r.Series = series
}

return nil
},
)
}
56 changes: 56 additions & 0 deletions pkg/logproto/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logproto
import (
"testing"

"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -74,3 +75,58 @@ func TestQueryPatternsResponse_UnmarshalJSON(t *testing.T) {
require.Nil(t, err)
require.Equal(t, expectedSeries, r.Series)
}

func TestQuerySamplesResponse_UnmarshalJSON(t *testing.T) {
mockData := []byte(`{
"status": "success",
"data": [{
"metric": {
"foo": "bar"
},
"values": [
[0.001, "1"],
[0.002, "2"]
]
},
{
"metric": {
"foo": "baz",
"bar": "qux"
},
"values": [
[0.003, "3"],
[0.004, "4"]
]
}]
}`)

lbls1, err := syntax.ParseLabels(`{foo="bar"}`)
require.NoError(t, err)
lbls2, err := syntax.ParseLabels(`{bar="qux", foo="baz"}`)
require.NoError(t, err)

expectedSamples := []Series{
{
Labels: lbls1.String(),
Samples: []Sample{
{Timestamp: 1e6, Value: 1}, // 1ms after epoch in ns
{Timestamp: 2e6, Value: 2}, // 2ms after epoch in ns
},
StreamHash: lbls1.Hash(),
},
{
Labels: lbls2.String(),
Samples: []Sample{
{Timestamp: 3e6, Value: 3}, // 3ms after epoch in ns
{Timestamp: 4e6, Value: 4}, // 4ms after epoch in ns
},
StreamHash: lbls2.Hash(),
},
}

r := &QuerySamplesResponse{}
err = r.UnmarshalJSON(mockData)

require.Nil(t, err)
require.Equal(t, expectedSamples, r.Series)
}
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
router.Path("/loki/api/v1/index/volume").Methods("GET", "POST").Handler(volumeHTTPMiddleware.Wrap(httpHandler))
router.Path("/loki/api/v1/index/volume_range").Methods("GET", "POST").Handler(volumeRangeHTTPMiddleware.Wrap(httpHandler))
router.Path("/loki/api/v1/patterns").Methods("GET", "POST").Handler(httpHandler)
router.Path("/loki/api/v1/explore/query_range").Methods("GET", "POST").Handler(httpHandler)

router.Path("/api/prom/query").Methods("GET", "POST").Handler(
middleware.Merge(
Expand Down Expand Up @@ -1105,6 +1106,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.Server.HTTP.Path("/loki/api/v1/label/{name}/values").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/series").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/patterns").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/explore/query_range").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_labels").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/detected_fields").Methods("GET", "POST").Handler(frontendHandler)
t.Server.HTTP.Path("/loki/api/v1/index/stats").Methods("GET", "POST").Handler(frontendHandler)
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ func (h *Handler) Do(ctx context.Context, req queryrangebase.Request) (queryrang
}

return &queryrange.DetectedLabelsResponse{Response: result}, nil
case *logproto.QuerySamplesRequest:
result, err := h.api.SamplesHandler(ctx, concrete)
if err != nil {
return nil, err
}
return &queryrange.QuerySamplesResponse{
Response: result,
}, nil
default:
return nil, fmt.Errorf("unsupported query type %T", req)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,19 @@ func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPat
return resp, nil
}

func (q *QuerierAPI) SamplesHandler(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) {
resp, err := q.querier.SelectMetricSamples(ctx, req)
if err != nil {
return nil, err
}
if resp == nil { // Some stores don't implement this
return &logproto.QuerySamplesResponse{
Series: []logproto.Series{},
}, nil
}
return resp, nil
}

func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Querier interface {
DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error)
Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error)
DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error)
SelectMetricSamples(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QueryPatternsResponse, error)
SelectMetricSamples(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error)
}

type Limits querier_limits.Limits
Expand Down
Loading

0 comments on commit 33ead60

Please sign in to comment.