Skip to content

Commit

Permalink
[query] Implemented the Graphite timeSlice function (#2581)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddywahle authored Sep 8, 2020
1 parent 2be52d1 commit fbb59d9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
46 changes: 46 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/graphite/ts"
"github.com/m3db/m3/src/query/util"
)
Expand Down Expand Up @@ -275,6 +276,48 @@ func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Val
return output
}

// timeSlice takes one metric or a wildcard metric, followed by a quoted string with the time to start the line and
// another quoted string with the time to end the line. The start and end times are inclusive.
// Useful for filtering out a part of a series of data from a wider range of data.
func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end string) (ts.SeriesList, error) {
var (
now = time.Now()
tzOffsetForAbsoluteTime time.Duration
)
startTime, err := graphite.ParseTime(start, now, tzOffsetForAbsoluteTime)
if err != nil {
return ts.NewSeriesList(), err
}
endTime, err := graphite.ParseTime(end, now, tzOffsetForAbsoluteTime)
if err != nil {
return ts.NewSeriesList(), err
}

input := ts.SeriesList(inputPath)
output := make([]*ts.Series, 0, input.Len())

for _, series := range input.Values {
stepDuration := time.Duration(series.MillisPerStep()) * time.Millisecond
truncatedValues := ts.NewValues(ctx, series.MillisPerStep(), series.Len())

currentTime := series.StartTime()
for i := 0; i < series.Len(); i++ {
equalOrAfterStart := currentTime.Equal(startTime) || currentTime.After(startTime)
beforeOrEqualEnd := currentTime.Before(endTime) || currentTime.Equal(endTime)
if equalOrAfterStart && beforeOrEqualEnd {
truncatedValues.SetValueAt(i, series.ValueAtTime(currentTime))
}
currentTime = currentTime.Add(stepDuration)
}

slicedSeries := ts.NewSeries(ctx, series.Name(), series.StartTime(), truncatedValues)
renamedSlicedSeries := slicedSeries.RenamedTo(fmt.Sprintf("timeSlice(%s, %s, %s)", slicedSeries.Name(), start, end))
output = append(output, renamedSlicedSeries)
}
input.Values = output
return input, nil
}

// absolute returns the absolute value of each element in the series.
func absolute(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error) {
return transform(ctx, input,
Expand Down Expand Up @@ -2008,6 +2051,9 @@ func init() {
MustRegisterFunction(timeShift).WithDefaultParams(map[uint8]interface{}{
3: true, // resetEnd
})
MustRegisterFunction(timeSlice).WithDefaultParams(map[uint8]interface{}{
3: "now", // endTime
})
MustRegisterFunction(transformNull).WithDefaultParams(map[uint8]interface{}{
2: 0.0, // defaultValue
})
Expand Down
45 changes: 45 additions & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,43 @@ func testMovingFunction(t *testing.T, target, expectedName string, values, boots
expected, res.Values)
}

var (
testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute)
testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute)
)

// testGeneralFunction is a copy of testMovingFunction but without any logic for bootstrapping values
func testGeneralFunction(t *testing.T, target, expectedName string, values, output []float64) {
ctx := common.NewTestContext()
defer ctx.Close()

engine := NewEngine(
&common.MovingFunctionStorage{
StepMillis: 60000,
Values: values,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Start: testGeneralFunctionStart,
End: testGeneralFunctionEnd,
Engine: engine,
})

expr, err := phonyContext.Engine.(*Engine).Compile(target)
require.NoError(t, err)
res, err := expr.Execute(phonyContext)
require.NoError(t, err)
var expected []common.TestSeries
if output != nil {
expectedSeries := common.TestSeries{
Name: expectedName,
Data: output,
}
expected = append(expected, expectedSeries)
}
common.CompareOutputsAndExpected(t, 60000, testGeneralFunctionStart, expected, res.Values)
}

func TestMovingAverageSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
Expand Down Expand Up @@ -2895,6 +2932,13 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float
common.CompareOutputsAndExpected(t, 10000, testDelayStart, expected, res.Values)
}

func TestTimeSlice(t *testing.T) {
values := []float64{math.NaN(),1.0,2.0,3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,8.0,9.0}
expected := []float64{math.NaN(),math.NaN(),math.NaN(),3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,math.NaN(),math.NaN()}

testGeneralFunction(t, "timeSlice(foo.bar.baz, '-9min','-3min')", "timeSlice(foo.bar.baz, -9min, -3min)", values, expected)
}

func TestDashed(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -3046,6 +3090,7 @@ func TestFunctionsRegistered(t *testing.T) {
"time",
"timeFunction",
"timeShift",
"timeSlice",
"transformNull",
"weightedAverage",
}
Expand Down

0 comments on commit fbb59d9

Please sign in to comment.