Skip to content

Commit

Permalink
[query] Add Graphite sortByName natural and reverse params and generi…
Browse files Browse the repository at this point in the history
…c sortBy (#3367)
  • Loading branch information
robskillington authored Mar 17, 2021
1 parent b0e147d commit 4cd3435
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 105 deletions.
42 changes: 21 additions & 21 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func testAggregatedSeries(
}

// nil input -> nil output
for _, in := range [][]*ts.Series{nil, []*ts.Series{}} {
for _, in := range [][]*ts.Series{nil, {}} {
series, err := f(ctx, multiplePathSpecs(ts.SeriesList{
Values: in,
}))
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestVariadicSumSeries(t *testing.T) {
}, block.ResultMetadata{
Exhaustive: false,
LocalOnly: false,
Warnings: []block.Warning{block.Warning{Name: "foo", Message: "bar"}},
Warnings: []block.Warning{{Name: "foo", Message: "bar"}},
}), nil
}
return nil, fmt.Errorf("unexpected query: %s", query)
Expand Down Expand Up @@ -449,14 +449,14 @@ func TestAverageSeriesWithWildcards(t *testing.T) {
defer ctx.Close()

input := []common.TestSeries{
common.TestSeries{"web.host-1.avg-response.value", []float64{70.0, 20.0, 30.0, 40.0, 50.0}},
common.TestSeries{"web.host-2.avg-response.value", []float64{20.0, 30.0, 40.0, 50.0, 60.0}},
common.TestSeries{"web.host-3.avg-response.value", []float64{30.0, 40.0, 80.0, 60.0, 70.0}},
common.TestSeries{"web.host-4.num-requests.value", []float64{10.0, 10.0, 15.0, 10.0, 15.0}},
{"web.host-1.avg-response.value", []float64{70.0, 20.0, 30.0, 40.0, 50.0}},
{"web.host-2.avg-response.value", []float64{20.0, 30.0, 40.0, 50.0, 60.0}},
{"web.host-3.avg-response.value", []float64{30.0, 40.0, 80.0, 60.0, 70.0}},
{"web.host-4.num-requests.value", []float64{10.0, 10.0, 15.0, 10.0, 15.0}},
}
expected := []common.TestSeries{
common.TestSeries{"web.avg-response", []float64{40.0, 30.0, 50.0, 50.0, 60.0}},
common.TestSeries{"web.num-requests", []float64{10.0, 10.0, 15.0, 10.0, 15.0}},
{"web.avg-response", []float64{40.0, 30.0, 50.0, 50.0, 60.0}},
{"web.num-requests", []float64{10.0, 10.0, 15.0, 10.0, 15.0}},
}

start := consolidationStartTime
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestSumSeriesWithWildcards(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

expectedOutputs := []struct {
name string
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestApplyByNode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)
common.CompareOutputsAndExpected(t, 60000, start, test.expectedResults, outSeries.Values)
}
}
Expand Down Expand Up @@ -657,7 +657,7 @@ func TestAggregateWithWildcards(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

expectedOutputs := []struct {
name string
Expand Down Expand Up @@ -732,7 +732,7 @@ func TestGroupByNode(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
Expand Down Expand Up @@ -820,7 +820,7 @@ func TestGroupByNodes(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
Expand All @@ -837,17 +837,17 @@ func TestWeightedAverage(t *testing.T) {
defer ctx.Close()

means := []common.TestSeries{
common.TestSeries{"web.host-1.avg-response.mean", []float64{70.0, 20.0, 30.0, 0.0, 50.0}},
common.TestSeries{"web.host-2.avg-response.mean", []float64{20.0, 30.0, 40.0, 50.0, 60.0}},
common.TestSeries{"web.host-3.avg-response.mean", []float64{20.0, 30.0, 40.0, 50.0, 60.0}}, // no match
{Name: "web.host-1.avg-response.mean", Data: []float64{70.0, 20.0, 30.0, 0.0, 50.0}},
{Name: "web.host-2.avg-response.mean", Data: []float64{20.0, 30.0, 40.0, 50.0, 60.0}},
{Name: "web.host-3.avg-response.mean", Data: []float64{20.0, 30.0, 40.0, 50.0, 60.0}}, // no match
}
counts := []common.TestSeries{
common.TestSeries{"web.host-1.avg-response.count", []float64{1, 2, 3, 4, 5}},
common.TestSeries{"web.host-2.avg-response.count", []float64{10, 20, 30, 40, 50}},
common.TestSeries{"web.host-4.avg-response.count", []float64{10, 20, 30, 40, 50}}, // no match
{Name: "web.host-1.avg-response.count", Data: []float64{1, 2, 3, 4, 5}},
{Name: "web.host-2.avg-response.count", Data: []float64{10, 20, 30, 40, 50}},
{Name: "web.host-4.avg-response.count", Data: []float64{10, 20, 30, 40, 50}}, // no match
}
expected := []common.TestSeries{
common.TestSeries{"weightedAverage", []float64{24.5454, 29.0909, 39.0909, 45.4545, 59.0909}},
{Name: "weightedAverage", Data: []float64{24.5454, 29.0909, 39.0909, 45.4545, 59.0909}},
}

// normal series
Expand All @@ -866,7 +866,7 @@ func TestWeightedAverage(t *testing.T) {
output, err = weightedAverage(ctx, singlePathSpec(values), singlePathSpec(weights), 1)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, step, start,
[]common.TestSeries{{"weightedAverage", means[0].Data}}, output.Values)
[]common.TestSeries{{Name: "weightedAverage", Data: means[0].Data}}, output.Values)

// different steps should lead to error -- not supported yet
values = ts.SeriesList{Values: generateSeriesList(ctx, start, means, step)}
Expand Down
50 changes: 47 additions & 3 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,52 @@ func joinPathExpr(series ts.SeriesList) string {
return strings.Join(joined, ",")
}

// sortBy allows for sorting by an aggregation function.
func sortBy(ctx *common.Context, series singlePathSpec, fn string, reverse bool) (ts.SeriesList, error) {
var (
result ts.SeriesList
err error
)
if strings.HasPrefix(fn, "min") {
result, err = lowest(ctx, series, len(series.Values), fn)
} else {
result, err = highest(ctx, series, len(series.Values), fn)
}
if err != nil {
return ts.SeriesList{}, err
}
if reverse {
reverseSeries(result.Values)
}
return result, nil
}

func reverseSeries(s []*ts.Series) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}

// sortByName sorts timeseries results by their names
func sortByName(_ *common.Context, series singlePathSpec) (ts.SeriesList, error) {
func sortByName(_ *common.Context, series singlePathSpec, natural, reverse bool) (ts.SeriesList, error) {
sorted := make([]*ts.Series, len(series.Values))
for i := range series.Values {
sorted[i] = series.Values[i]
}

var sortedBy sort.Interface
if natural {
sortedBy = ts.SeriesByNameAndNaturalNumbers(sorted)
} else {
sortedBy = ts.SeriesByName(sorted)
}

if reverse {
sortedBy = sort.Reverse(sortedBy)
}

// Use sort.Stable for deterministic output.
sort.Stable(ts.SeriesByName(sorted))
sort.Stable(sortedBy)

r := ts.SeriesList(series)
r.Values = sorted
Expand Down Expand Up @@ -2514,9 +2551,16 @@ func init() {
MustRegisterFunction(removeEmptySeries)
MustRegisterFunction(scale)
MustRegisterFunction(scaleToSeconds)
MustRegisterFunction(sortBy).WithDefaultParams(map[uint8]interface{}{
2: "average", // fn
3: false, // reverse
})
MustRegisterFunction(sortByMaxima)
MustRegisterFunction(sortByMinima)
MustRegisterFunction(sortByName)
MustRegisterFunction(sortByName).WithDefaultParams(map[uint8]interface{}{
2: false, // natural
3: false, // reverse
})
MustRegisterFunction(sortByTotal)
MustRegisterFunction(squareRoot)
MustRegisterFunction(stdev).WithDefaultParams(map[uint8]interface{}{
Expand Down
Loading

0 comments on commit 4cd3435

Please sign in to comment.