diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 2c89ffa7cc..4a904f4932 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" + "github.com/m3db/m3/src/query/graphite/graphite" "github.com/m3db/m3/src/query/graphite/ts" "github.com/m3db/m3/src/query/util" ) @@ -275,6 +276,48 @@ func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Val return output } +// timeSlice takes one metric or a wildcard metric, followed by a quoted string with the time to start the line and +// another quoted string with the time to end the line. The start and end times are inclusive. +// Useful for filtering out a part of a series of data from a wider range of data. +func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end string) (ts.SeriesList, error) { + var ( + now = time.Now() + tzOffsetForAbsoluteTime time.Duration + ) + startTime, err := graphite.ParseTime(start, now, tzOffsetForAbsoluteTime) + if err != nil { + return ts.NewSeriesList(), err + } + endTime, err := graphite.ParseTime(end, now, tzOffsetForAbsoluteTime) + if err != nil { + return ts.NewSeriesList(), err + } + + input := ts.SeriesList(inputPath) + output := make([]*ts.Series, 0, input.Len()) + + for _, series := range input.Values { + stepDuration := time.Duration(series.MillisPerStep()) * time.Millisecond + truncatedValues := ts.NewValues(ctx, series.MillisPerStep(), series.Len()) + + currentTime := series.StartTime() + for i := 0; i < series.Len(); i++ { + equalOrAfterStart := currentTime.Equal(startTime) || currentTime.After(startTime) + beforeOrEqualEnd := currentTime.Before(endTime) || currentTime.Equal(endTime) + if equalOrAfterStart && beforeOrEqualEnd { + truncatedValues.SetValueAt(i, series.ValueAtTime(currentTime)) + } + currentTime = currentTime.Add(stepDuration) + } + + slicedSeries := ts.NewSeries(ctx, series.Name(), series.StartTime(), truncatedValues) + renamedSlicedSeries := slicedSeries.RenamedTo(fmt.Sprintf("timeSlice(%s, %s, %s)", slicedSeries.Name(), start, end)) + output = append(output, renamedSlicedSeries) + } + input.Values = output + return input, nil +} + // absolute returns the absolute value of each element in the series. func absolute(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error) { return transform(ctx, input, @@ -2008,6 +2051,9 @@ func init() { MustRegisterFunction(timeShift).WithDefaultParams(map[uint8]interface{}{ 3: true, // resetEnd }) + MustRegisterFunction(timeSlice).WithDefaultParams(map[uint8]interface{}{ + 3: "now", // endTime + }) MustRegisterFunction(transformNull).WithDefaultParams(map[uint8]interface{}{ 2: 0.0, // defaultValue }) diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 724c120eb3..8becf9fe8e 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -642,6 +642,43 @@ func testMovingFunction(t *testing.T, target, expectedName string, values, boots expected, res.Values) } +var ( + testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute) + testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute) +) + +// testGeneralFunction is a copy of testMovingFunction but without any logic for bootstrapping values +func testGeneralFunction(t *testing.T, target, expectedName string, values, output []float64) { + ctx := common.NewTestContext() + defer ctx.Close() + + engine := NewEngine( + &common.MovingFunctionStorage{ + StepMillis: 60000, + Values: values, + }, + ) + phonyContext := common.NewContext(common.ContextOptions{ + Start: testGeneralFunctionStart, + End: testGeneralFunctionEnd, + Engine: engine, + }) + + expr, err := phonyContext.Engine.(*Engine).Compile(target) + require.NoError(t, err) + res, err := expr.Execute(phonyContext) + require.NoError(t, err) + var expected []common.TestSeries + if output != nil { + expectedSeries := common.TestSeries{ + Name: expectedName, + Data: output, + } + expected = append(expected, expectedSeries) + } + common.CompareOutputsAndExpected(t, 60000, testGeneralFunctionStart, expected, res.Values) +} + func TestMovingAverageSuccess(t *testing.T) { values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0} bootstrap := []float64{3.0, 4.0, 5.0} @@ -2895,6 +2932,13 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float common.CompareOutputsAndExpected(t, 10000, testDelayStart, expected, res.Values) } +func TestTimeSlice(t *testing.T) { + values := []float64{math.NaN(),1.0,2.0,3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,8.0,9.0} + expected := []float64{math.NaN(),math.NaN(),math.NaN(),3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,math.NaN(),math.NaN()} + + testGeneralFunction(t, "timeSlice(foo.bar.baz, '-9min','-3min')", "timeSlice(foo.bar.baz, -9min, -3min)", values, expected) +} + func TestDashed(t *testing.T) { ctx := common.NewTestContext() defer ctx.Close() @@ -3046,6 +3090,7 @@ func TestFunctionsRegistered(t *testing.T) { "time", "timeFunction", "timeShift", + "timeSlice", "transformNull", "weightedAverage", }