Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite highest and lowest functions #2623

Merged
merged 16 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 58 additions & 29 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func delay(
func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Values {
output := ts.NewValues(ctx, series.MillisPerStep(), series.Len())
for i := steps; i < series.Len(); i++ {
output.SetValueAt(i, series.ValueAt(i - steps))
output.SetValueAt(i, series.ValueAt(i-steps))
}
return output
}
Expand Down Expand Up @@ -562,36 +562,52 @@ func takeByFunction(input singlePathSpec, n int, sr ts.SeriesReducer, sort ts.Di
return common.Head(r, n)
}

func getReducer(f string) (ts.SeriesReducer, error) {
sa := ts.SeriesReducerApproach(f)
r, ok := sa.SafeReducer()
if !ok {
return r, errors.NewInvalidParamsError(fmt.Errorf("invalid function %s", f))
}
return r, nil
}

// highest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the highest
// aggregated value over the time period specified.
func highest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Descending)
}

// highestSum takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// total value in the time period specified.
func highestSum(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerSum.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestSum(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "sum")
}

// highestMax takes one metric or a wildcard seriesList followed by an integer
// n. Out of all metrics passed, draws only the N metrics with the highest
// maximum value in the time period specified.
func highestMax(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerMax.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestMax(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "max")
}

// highestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// highest value at the end of the time period specified.
func highestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "current")
}

// highestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// highest average value for the time period specified.
func highestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func highestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "average")
}

// fallbackSeries takes one metric or a wildcard seriesList, and a second fallback metric.
Expand All @@ -608,33 +624,41 @@ func fallbackSeries(_ *common.Context, input singlePathSpec, fallback singlePath
// N. Draws the N most deviant metrics. To find the deviants, the standard
// deviation (sigma) of each series is taken and ranked. The top N standard
// deviations are returned.
func mostDeviant(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerStdDev.Reducer()
return takeByFunction(input, n, sr, ts.Descending)
func mostDeviant(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return highest(ctx, input, n, "stddev")
}

// lowest takes one metric or a wildcard seriesList followed by an integer N and an aggregation function.
// Out of all metrics passed, draws only the N metrics with the lowest
// aggregated value over the time period specified.
func lowest(_ *common.Context, input singlePathSpec, n int, f string) (ts.SeriesList, error) {
reducer, err := getReducer(f)
if err != nil {
return ts.NewSeriesList(), err
}
return takeByFunction(input, n, reducer, ts.Ascending)
}

// lowestAverage takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the top N metrics with the
// lowest average value for the time period specified.
func lowestAverage(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerAvg.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestAverage(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "average")
}

// lowestCurrent takes one metric or a wildcard seriesList followed by an
// integer n. Out of all metrics passed, draws only the N metrics with the
// lowest value at the end of the time period specified.
func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
sr := ts.SeriesReducerLast.Reducer()
return takeByFunction(input, n, sr, ts.Ascending)
func lowestCurrent(ctx *common.Context, input singlePathSpec, n int) (ts.SeriesList, error) {
return lowest(ctx, input, n, "current")
}

// windowSizeFunc calculates window size for moving average calculation
type windowSizeFunc func(stepSize int) int

type windowSizeParsed struct {
deltaValue time.Duration
stringValue string
deltaValue time.Duration
stringValue string
windowSizeFunc windowSizeFunc
}

Expand Down Expand Up @@ -841,7 +865,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS
curr := bootstrap.ValueAt(i + offset)
if !math.IsNaN(curr) {
// formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
ema = emaConstant * curr + (1 - emaConstant) * ema
ema = emaConstant*curr + (1-emaConstant)*ema
vals.SetValueAt(i, ema)
} else {
vals.SetValueAt(i, math.NaN())
Expand Down Expand Up @@ -1798,8 +1822,6 @@ func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}



func newMovingBinaryTransform(
ctx *common.Context,
input singlePathSpec,
Expand Down Expand Up @@ -1830,7 +1852,7 @@ func newMovingBinaryTransform(

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
Expand Down Expand Up @@ -1910,7 +1932,6 @@ func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInte
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper)
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -2136,6 +2157,10 @@ func init() {
MustRegisterFunction(group)
MustRegisterFunction(groupByNode)
MustRegisterFunction(groupByNodes)
MustRegisterFunction(highest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(highestAverage)
MustRegisterFunction(highestCurrent)
MustRegisterFunction(highestMax)
Expand All @@ -2154,6 +2179,10 @@ func init() {
MustRegisterFunction(logarithm).WithDefaultParams(map[uint8]interface{}{
2: 10, // base
})
MustRegisterFunction(lowest).WithDefaultParams(map[uint8]interface{}{
2: 1, // n,
3: "average", // f
})
MustRegisterFunction(lowestAverage)
MustRegisterFunction(lowestCurrent)
MustRegisterFunction(maxSeries)
Expand Down
Loading