Skip to content

Commit

Permalink
[query] Implemented the Graphite applyByNode function (#2654)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddywahle authored Oct 5, 2020
1 parent 08117d2 commit 84f5b98
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 8 deletions.
10 changes: 9 additions & 1 deletion src/query/graphite/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ import (

// QueryEngine is the generic engine interface.
type QueryEngine interface {

// FetchByQuery retrieves one or more time series based on a query.
FetchByQuery(
ctx context.Context,
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error)

// Storage returns the engine's storage object.
Storage() storage.Storage
}

// The Engine for running queries
Expand All @@ -46,11 +51,14 @@ func NewEngine(storage storage.Storage) *Engine {
}
}

// FetchByQuery retrieves one or more time series based on a query
func (e *Engine) FetchByQuery(
ctx context.Context,
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error) {
return e.storage.FetchByQuery(ctx, query, options)
}

func (e *Engine) Storage() storage.Storage {
return e.storage
}
153 changes: 148 additions & 5 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ package native
import (
"fmt"
"math"
"runtime"
"sort"
"strings"
"sync"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/ts"
xerrors "github.com/m3db/m3/src/x/errors"
)

func wrapPathExpr(wrapper string, series ts.SeriesList) string {
Expand Down Expand Up @@ -111,7 +115,7 @@ func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.S
vals.SetValueAt(i, value)
}
}

// The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists
// Based on Graphite source code (link below)
// https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901
Expand All @@ -137,7 +141,7 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
divisorSeries := divisorSeriesList.Values[0]
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -150,7 +154,6 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
return r, nil
}


// divideSeriesLists divides one series list by another series list
func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) {
if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) {
Expand All @@ -161,7 +164,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
divisorSeries := divisorSeriesList.Values[idx]
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand Down Expand Up @@ -278,6 +281,146 @@ func combineSeriesWithWildcards(
return r, nil
}

// splits a slice into chunks
func chunkArrayHelper(slice []string, numChunks int) [][]string {
divided := make([][]string, 0, numChunks)

chunkSize := (len(slice) + numChunks - 1) / numChunks

for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize

if end > len(slice) {
end = len(slice)
}

divided = append(divided, slice[i:end])
}

return divided
}

func evaluateTarget(ctx *common.Context, target string) (ts.SeriesList, error) {
eng := NewEngine(ctx.Engine.Storage())
expression, err := eng.Compile(target)
if err != nil {
return ts.NewSeriesList(), err
}
return expression.Execute(ctx)
}

/*
applyByNode takes a seriesList and applies some complicated function (described by a string), replacing templates with unique
prefixes of keys from the seriesList (the key is all nodes up to the index given as `nodeNum`).
If the `newName` parameter is provided, the name of the resulting series will be given by that parameter, with any
"%" characters replaced by the unique prefix.
Example:
`applyByNode(servers.*.disk.bytes_free,1,"divideSeries(%.disk.bytes_free,sumSeries(%.disk.bytes_*))")`
Would find all series which match `servers.*.disk.bytes_free`, then trim them down to unique series up to the node
given by nodeNum, then fill them into the template function provided (replacing % by the prefixes).
Additional Examples:
Given keys of
- `stats.counts.haproxy.web.2XX`
- `stats.counts.haproxy.web.3XX`
- `stats.counts.haproxy.web.5XX`
- `stats.counts.haproxy.microservice.2XX`
- `stats.counts.haproxy.microservice.3XX`
- `stats.counts.haproxy.microservice.5XX`
The following will return the rate of 5XX's per service:
`applyByNode(stats.counts.haproxy.*.*XX, 3, "asPercent(%.5XX, sumSeries(%.*XX))", "%.pct_5XX")`
The output series would have keys `stats.counts.haproxy.web.pct_5XX` and `stats.counts.haproxy.microservice.pct_5XX`.
*/
func applyByNode(ctx *common.Context, seriesList singlePathSpec, nodeNum int, templateFunction string, newName string) (ts.SeriesList, error) {
// using this as a set
prefixMap := map[string]struct{}{}
for _, series := range seriesList.Values {
var (
name = series.Name()

partsSeen int
prefix string
)

for i, c := range name {
if c == '.' {
partsSeen++
if partsSeen == nodeNum+1 {
prefix = name[:i]
break
}
}
}

if len(prefix) == 0 {
continue
}

prefixMap[prefix] = struct{}{}
}

// transform to slice
var prefixes []string
for p := range prefixMap {
prefixes = append(prefixes, p)
}
sort.Strings(prefixes)

var (
mu sync.Mutex
wg sync.WaitGroup
multiErr xerrors.MultiError

output = make([]*ts.Series, 0, len(prefixes))
maxConcurrency = runtime.NumCPU() / 2
)
for _, prefixChunk := range chunkArrayHelper(prefixes, maxConcurrency) {
if multiErr.LastError() != nil {
return ts.NewSeriesList(), multiErr.LastError()
}

for _, prefix := range prefixChunk {
newTarget := strings.ReplaceAll(templateFunction, "%", prefix)
wg.Add(1)
go func() {
defer wg.Done()
resultSeriesList, err := evaluateTarget(ctx, newTarget)

if err != nil {
mu.Lock()
multiErr = multiErr.Add(err)
mu.Unlock()
return
}

mu.Lock()
for _, resultSeries := range resultSeriesList.Values {
if newName != "" {
resultSeries = resultSeries.RenamedTo(strings.ReplaceAll(newName, "%", prefix))
}
resultSeries.Specification = prefix
output = append(output, resultSeries)
}
mu.Unlock()
}()
}
wg.Wait()
}

r := ts.NewSeriesList()
r.Values = output
return r, nil
}

// groupByNode takes a serieslist and maps a callback to subgroups within as defined by a common node
//
// &target=groupByNode(foo.by-function.*.*.cpu.load5,2,"sumSeries")
Expand Down
112 changes: 110 additions & 2 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/graphite/ts"

xgomock "github.com/m3db/m3/src/x/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/golang/mock/gomock"
)

var (
Expand Down Expand Up @@ -140,6 +142,8 @@ type mockEngine struct {
query string,
options storage.FetchOptions,
) (*storage.FetchResult, error)

storage storage.Storage
}

func (e mockEngine) FetchByQuery(
Expand All @@ -150,6 +154,10 @@ func (e mockEngine) FetchByQuery(
return e.fn(ctx, query, opts)
}

func (e mockEngine) Storage() storage.Storage {
return nil
}

func TestVariadicSumSeries(t *testing.T) {
expr, err := Compile("sumSeries(foo.bar.*, foo.baz.*)")
require.NoError(t, err)
Expand Down Expand Up @@ -314,7 +322,6 @@ func TestDivideSeriesLists(t *testing.T) {
require.Error(t, err)
}


func TestAverageSeriesWithWildcards(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down Expand Up @@ -391,6 +398,107 @@ func TestSumSeriesWithWildcards(t *testing.T) {
}
}

func TestApplyByNode(t *testing.T) {
var (
ctrl = xgomock.NewController(t)
store = storage.NewMockStorage(ctrl)
engine = NewEngine(store)
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine})
millisPerStep = 60000
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})),
ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{1, 2, 3})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{99, 98, 97})),
}
)

defer ctrl.Finish()
defer ctx.Close()

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_used", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s1.disk.bytes_*", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s1.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, 60000, []float64{90, 80, 70})),
ts.NewSeries(ctx, "servers.s1.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{10, 20, 30}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_used", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2)

store.EXPECT().FetchByQuery(gomock.Any(), "servers.s2.disk.bytes_*", gomock.Any()).Return(
&storage.FetchResult{SeriesList: []*ts.Series{
ts.NewSeries(ctx, "servers.s2.disk.bytes_free", start,
common.NewTestSeriesValues(ctx, 60000, []float64{99, 98, 97})),
ts.NewSeries(ctx, "servers.s2.disk.bytes_used", start,
common.NewTestSeriesValues(ctx, 60000, []float64{1, 2, 3}))}}, nil).Times(2)

tests := []struct {
nodeNum int
templateFunction string
newName string
expectedResults []common.TestSeries
}{
{
nodeNum: 1,
templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))",
newName: "",
expectedResults: []common.TestSeries{
{
Name: "divideSeries(servers.s1.disk.bytes_used,sumSeries(servers.s1.disk.bytes_*))",
Data: []float64{0.10, 0.20, 0.30},
},
{
Name: "divideSeries(servers.s2.disk.bytes_used,sumSeries(servers.s2.disk.bytes_*))",
Data: []float64{0.01, 0.02, 0.03},
},
},
},
{
nodeNum: 1,
templateFunction: "divideSeries(%.disk.bytes_used, sumSeries(%.disk.bytes_*))",
newName: "%.disk.pct_used",
expectedResults: []common.TestSeries{
{
Name: "servers.s1.disk.pct_used",
Data: []float64{0.10, 0.20, 0.30},
},
{
Name: "servers.s2.disk.pct_used",
Data: []float64{0.01, 0.02, 0.03},
},
},
},
}

for _, test := range tests {
outSeries, err := applyByNode(
ctx,
singlePathSpec{
Values: inputs,
},
test.nodeNum,
test.templateFunction,
test.newName,
)
require.NoError(t, err)
require.Equal(t, len(test.expectedResults), len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries))
common.CompareOutputsAndExpected(t, 60000, start, test.expectedResults, outSeries.Values)
}
}

func TestAggregateWithWildcards(t *testing.T) {
var (
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
Expand Down
3 changes: 3 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2262,6 +2262,9 @@ func init() {
MustRegisterFunction(aliasByMetric)
MustRegisterFunction(aliasByNode)
MustRegisterFunction(aliasSub)
MustRegisterFunction(applyByNode).WithDefaultParams(map[uint8]interface{}{
4: "", // newName
})
MustRegisterFunction(asPercent).WithDefaultParams(map[uint8]interface{}{
2: []*ts.Series(nil), // total
})
Expand Down
Loading

0 comments on commit 84f5b98

Please sign in to comment.