From 492f76d0bb4922eb09f3e3d15b1d191bc8b65377 Mon Sep 17 00:00:00 2001 From: Evan Yin Date: Thu, 3 Sep 2020 06:45:44 -0700 Subject: [PATCH] [m3query] Add graphite function support - groupByNodes (#2579) --- .../graphite/native/aggregation_functions.go | 49 ++++++++++++ .../native/aggregation_functions_test.go | 79 +++++++++++++++++++ .../graphite/native/builtin_functions.go | 1 + .../graphite/native/builtin_functions_test.go | 1 + 4 files changed, 130 insertions(+) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index d79d8d419c..9d2c1a2c2c 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -247,6 +247,55 @@ func groupByNode(ctx *common.Context, series singlePathSpec, node int, fname str metaSeries[key] = append(metaSeries[key], s) } + return applyFnToMetaSeries(ctx, series, metaSeries, fname) +} + +// Takes a serieslist and maps a callback to subgroups within as defined by multiple nodes +// +// &target=groupByNodes(ganglia.server*.*.cpu.load*,"sum",1,4) +// +// Would return multiple series which are each the result of applying the “sum” aggregation to groups joined on the +// nodes’ list (0 indexed) resulting in a list of targets like +// +// sumSeries(ganglia.server1.*.cpu.load5),sumSeries(ganglia.server1.*.cpu.load10),sumSeries(ganglia.server1.*.cpu.load15), +// sumSeries(ganglia.server2.*.cpu.load5),sumSeries(ganglia.server2.*.cpu.load10),sumSeries(ganglia.server2.*.cpu.load15),... +// +// NOTE: if len(nodes) = 0, aggregate all series into 1 series. +func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, nodes ...int) (ts.SeriesList, error) { + metaSeries := make(map[string][]*ts.Series) + + nodeLen := len(nodes) + if nodeLen == 0 { + key := "*" // put into single group, not ideal, but more graphite-ish. + for _, s := range series.Values { + metaSeries[key] = append(metaSeries[key], s) + } + } else { + for _, s := range series.Values { + parts := strings.Split(s.Name(), ".") + + var keys []string + for _, n := range nodes { + if n < 0 { + n = len(parts) + n + } + + if n >= len(parts) || n < 0 { + err := errors.NewInvalidParamsError(fmt.Errorf("could not group %s by nodes %v; not enough parts", s.Name(), nodes)) + return ts.NewSeriesList(), err + } + + keys = append(keys, parts[n]) + } + key := strings.Join(keys, ".") + metaSeries[key] = append(metaSeries[key], s) + } + } + + return applyFnToMetaSeries(ctx, series, metaSeries, fname) +} + +func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) { if fname == "" { fname = "sum" } diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 60bf009a73..fae525aa30 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -401,6 +401,85 @@ func TestGroupByNode(t *testing.T) { } } +func TestGroupByNodes(t *testing.T) { + var ( + start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") + end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") + ctx = common.NewContext(common.ContextOptions{Start: start, End: end}) + inputs = []*ts.Series{ + ts.NewSeries(ctx, "servers.foo-1.pod1.status.500", start, + ts.NewConstantValues(ctx, 2, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod1.status.500", start, + ts.NewConstantValues(ctx, 4, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-3.pod1.status.500", start, + ts.NewConstantValues(ctx, 6, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-1.pod2.status.500", start, + ts.NewConstantValues(ctx, 8, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod2.status.500", start, + ts.NewConstantValues(ctx, 10, 12, 10000)), + + ts.NewSeries(ctx, "servers.foo-1.pod1.status.400", start, + ts.NewConstantValues(ctx, 20, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-2.pod1.status.400", start, + ts.NewConstantValues(ctx, 30, 12, 10000)), + ts.NewSeries(ctx, "servers.foo-3.pod2.status.400", start, + ts.NewConstantValues(ctx, 40, 12, 10000)), + } + ) + defer ctx.Close() + + type result struct { + name string + sumOfVals float64 + } + + tests := []struct { + fname string + nodes []int + expectedResults []result + }{ + {"avg", []int{2, 4}, []result{ // test normal group by nodes + {"pod1.400", ((20 + 30) / 2) * 12}, + {"pod1.500", ((2 + 4 + 6) / 3) * 12}, + {"pod2.400", (40 / 1) * 12}, + {"pod2.500", ((8 + 10) / 2) * 12}, + }}, + {"max", []int{2, 4}, []result{ // test with different function + {"pod1.400", 30 * 12}, + {"pod1.500", 6 * 12}, + {"pod2.400", 40 * 12}, + {"pod2.500", 10 * 12}, + }}, + {"min", []int{2, -1}, []result{ // test negative index handling + {"pod1.400", 20 * 12}, + {"pod1.500", 2 * 12}, + {"pod2.400", 40 * 12}, + {"pod2.500", 8 * 12}, + }}, + {"sum", []int{}, []result{ // test empty slice handing. + {"*", (2 + 4 + 6 + 8 + 10 + 20 + 30 + 40) * 12}, + }}, + } + + for _, test := range tests { + outSeries, err := groupByNodes(ctx, singlePathSpec{ + Values: inputs, + }, test.fname, test.nodes...) + require.NoError(t, err) + require.Equal(t, len(test.expectedResults), len(outSeries.Values)) + + outSeries, _ = sortByName(ctx, singlePathSpec(outSeries)) + + for i, expected := range test.expectedResults { + series := outSeries.Values[i] + assert.Equal(t, expected.name, series.Name(), + "wrong name for %v %s (%d)", test.nodes, test.fname, i) + assert.Equal(t, expected.sumOfVals, series.SafeSum(), + "wrong result for %v %s (%d)", test.nodes, test.fname, i) + } + } +} + func TestWeightedAverage(t *testing.T) { ctx, _ := newConsolidationTestSeries() defer ctx.Close() diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 89092bd88d..a8541e7c0f 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1892,6 +1892,7 @@ func init() { MustRegisterFunction(fallbackSeries) MustRegisterFunction(group) MustRegisterFunction(groupByNode) + MustRegisterFunction(groupByNodes) MustRegisterFunction(highestAverage) MustRegisterFunction(highestCurrent) MustRegisterFunction(highestMax) diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 777b192120..29013c5c92 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -2934,6 +2934,7 @@ func TestFunctionsRegistered(t *testing.T) { "fallbackSeries", "group", "groupByNode", + "groupByNodes", "highestAverage", "highestCurrent", "highestMax",