Skip to content

Commit

Permalink
[m3query] Add graphite function support - groupByNodes (#2579)
Browse files Browse the repository at this point in the history
  • Loading branch information
yyin-sc authored Sep 3, 2020
1 parent 69b8421 commit 492f76d
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,55 @@ func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname str
metaSeries[key] = append(metaSeries[key], s)
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
}

// Takes a serieslist and maps a callback to subgroups within as defined by multiple nodes
//
// &target=groupByNodes(ganglia.server*.*.cpu.load*,"sum",1,4)
//
// Would return multiple series which are each the result of applying the “sum” aggregation to groups joined on the
// nodes’ list (0 indexed) resulting in a list of targets like
//
// sumSeries(ganglia.server1.*.cpu.load5),sumSeries(ganglia.server1.*.cpu.load10),sumSeries(ganglia.server1.*.cpu.load15),
// sumSeries(ganglia.server2.*.cpu.load5),sumSeries(ganglia.server2.*.cpu.load10),sumSeries(ganglia.server2.*.cpu.load15),...
//
// NOTE: if len(nodes) = 0, aggregate all series into 1 series.
func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) {
metaSeries := make(map[string][]*ts.Series)

nodeLen := len(nodes)
if nodeLen == 0 {
key := "*" // put into single group, not ideal, but more graphite-ish.
for _, s := range series.Values {
metaSeries[key] = append(metaSeries[key], s)
}
} else {
for _, s := range series.Values {
parts := strings.Split(s.Name(), ".")

var keys []string
for _, n := range nodes {
if n < 0 {
n = len(parts) + n
}

if n >= len(parts) || n < 0 {
err := errors.NewInvalidParamsError(fmt.Errorf("could not group %s by nodes %v; not enough parts", s.Name(), nodes))
return ts.NewSeriesList(), err
}

keys = append(keys, parts[n])
}
key := strings.Join(keys, ".")
metaSeries[key] = append(metaSeries[key], s)
}
}

return applyFnToMetaSeries(ctx, series, metaSeries, fname)
}

func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) {
if fname == "" {
fname = "sum"
}
Expand Down
79 changes: 79 additions & 0 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,85 @@ func TestGroupByNode(t *testing.T) {
}
}

func TestGroupByNodes(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end})
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start,
ts.NewConstantValues(ctx, 2, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start,
ts.NewConstantValues(ctx, 4, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod1.status.500", start,
ts.NewConstantValues(ctx, 6, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-1.pod2.status.500", start,
ts.NewConstantValues(ctx, 8, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod2.status.500", start,
ts.NewConstantValues(ctx, 10, 12, 10000)),

ts.NewSeries(ctx, "servers.foo-1.pod1.status.400", start,
ts.NewConstantValues(ctx, 20, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-2.pod1.status.400", start,
ts.NewConstantValues(ctx, 30, 12, 10000)),
ts.NewSeries(ctx, "servers.foo-3.pod2.status.400", start,
ts.NewConstantValues(ctx, 40, 12, 10000)),
}
)
defer ctx.Close()

type result struct {
name string
sumOfVals float64
}

tests := []struct {
fname string
nodes []int
expectedResults []result
}{
{"avg", []int{2, 4}, []result{ // test normal group by nodes
{"pod1.400", ((20 + 30) / 2) * 12},
{"pod1.500", ((2 + 4 + 6) / 3) * 12},
{"pod2.400", (40 / 1) * 12},
{"pod2.500", ((8 + 10) / 2) * 12},
}},
{"max", []int{2, 4}, []result{ // test with different function
{"pod1.400", 30 * 12},
{"pod1.500", 6 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 10 * 12},
}},
{"min", []int{2, -1}, []result{ // test negative index handling
{"pod1.400", 20 * 12},
{"pod1.500", 2 * 12},
{"pod2.400", 40 * 12},
{"pod2.500", 8 * 12},
}},
{"sum", []int{}, []result{ // test empty slice handing.
{"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12},
}},
}

for _, test := range tests {
outSeries, err := groupByNodes(ctx, singlePathSpec{
Values: inputs,
}, test.fname, test.nodes...)
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name(),
"wrong name for %v %s (%d)", test.nodes, test.fname, i)
assert.Equal(t, expected.sumOfVals, series.SafeSum(),
"wrong result for %v %s (%d)", test.nodes, test.fname, i)
}
}
}

func TestWeightedAverage(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ func init() {
MustRegisterFunction(fallbackSeries)
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand Down
1 change: 1 addition & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2934,6 +2934,7 @@ func TestFunctionsRegistered(t *testing.T) {
"fallbackSeries",
"group",
"groupByNode",
"groupByNodes",
"highestAverage",
"highestCurrent",
"highestMax",
Expand Down

0 comments on commit 492f76d

Please sign in to comment.