Skip to content

Commit

Permalink
feat: aggregated metric volume queries
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney committed Oct 7, 2024
1 parent f1425b6 commit 0e97b99
Show file tree
Hide file tree
Showing 11 changed files with 1,014 additions and 256 deletions.
61 changes: 37 additions & 24 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,13 @@ func NewVolumeInstantQueryWithDefaults(matchers string) *logproto.VolumeRequest
}

type VolumeInstantQuery struct {
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
}

func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
Expand All @@ -589,11 +590,14 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
return nil, err
}

aggregatedMetrics := volumeAggregatedMetrics(r)

svInstantQuery := VolumeInstantQuery{
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}

svInstantQuery.Start, svInstantQuery.End, err = bounds(r)
Expand All @@ -609,13 +613,14 @@ func ParseVolumeInstantQuery(r *http.Request) (*VolumeInstantQuery, error) {
}

type VolumeRangeQuery struct {
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
Start time.Time
End time.Time
Step time.Duration
Query string
Limit uint32
TargetLabels []string
AggregateBy string
AggregatedMetrics bool
}

func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
Expand All @@ -634,14 +639,17 @@ func ParseVolumeRangeQuery(r *http.Request) (*VolumeRangeQuery, error) {
return nil, err
}

aggregatedMetrics := volumeAggregatedMetrics(r)

return &VolumeRangeQuery{
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
Start: result.Start,
End: result.End,
Step: result.Step,
Query: result.Query,
Limit: result.Limit,
TargetLabels: targetLabels(r),
AggregateBy: aggregateBy,
AggregatedMetrics: aggregatedMetrics,
}, nil
}

Expand Down Expand Up @@ -726,3 +734,8 @@ func volumeAggregateBy(r *http.Request) (string, error) {

return "", errors.New("invalid aggregation option")
}

func volumeAggregatedMetrics(r *http.Request) bool {
l := r.Form.Get("aggregatedMetrics")
return l == "true"
}
95 changes: 74 additions & 21 deletions pkg/loghttp/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,13 @@ func Test_QueryResponseUnmarshal(t *testing.T) {
}

func Test_ParseVolumeInstantQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -319,7 +319,7 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
url = `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
Expand Down Expand Up @@ -348,17 +348,47 @@ func Test_ParseVolumeInstantQuery(t *testing.T) {
require.EqualError(t, err, "invalid aggregation option")
})
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeInstantQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
}

func Test_ParseVolumeRangeQuery(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`
req := &http.Request{
URL: mustParseURL(`?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`,
),
URL: mustParseURL(url),
}

err := req.ParseForm()
Expand All @@ -379,13 +409,6 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
require.Equal(t, expected, actual)

t.Run("aggregate by", func(t *testing.T) {
url := `?query={foo="bar"}` +
`&start=2017-06-10T21:42:24.760738998Z` +
`&end=2017-07-10T21:42:24.760738998Z` +
`&limit=1000` +
`&step=3600` +
`&targetLabels=foo,bar`

t.Run("labels", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url + `&aggregateBy=labels`)}

Expand All @@ -407,5 +430,35 @@ func Test_ParseVolumeRangeQuery(t *testing.T) {
_, err = ParseVolumeRangeQuery(req)
require.EqualError(t, err, "invalid aggregation option")
})

t.Run("aggregated metrics", func(t *testing.T) {
req := &http.Request{URL: mustParseURL(url)}

err := req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=true`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.True(t, actual.AggregatedMetrics)

req = &http.Request{URL: mustParseURL(url + `&aggregatedMetrics=false`)}
err = req.ParseForm()
require.NoError(t, err)

actual, err = ParseVolumeRangeQuery(req)
require.NoError(t, err)

require.False(t, actual.AggregatedMetrics)
})
})
}
Loading

0 comments on commit 0e97b99

Please sign in to comment.