Skip to content

Commit

Permalink
Fix derivative when there is a group by time() and fill
Browse files Browse the repository at this point in the history
Fix #334
  • Loading branch information
dgnorton committed Oct 21, 2014
1 parent 65e4136 commit 4038818
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 14 deletions.
2 changes: 2 additions & 0 deletions engine/aggregator_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ func (self *AggregatorEngine) aggregateValuesForSeries(series *protocol.Series)
// update the state of the given group
node := seriesState.trie.GetNode(group)
var err error
log4go.Trace("Aggregating for group %v", group)
for idx, aggregator := range self.aggregators {
log4go.Trace("Aggregating value for %T for group %v and state %v", aggregator, group, node.states[idx])
node.states[idx], err = aggregator.AggregatePoint(node.states[idx], point)
if err != nil {
return false, err
Expand Down
30 changes: 27 additions & 3 deletions engine/aggregator_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ type DerivativeAggregatorState struct {

type DerivativeAggregator struct {
AbstractAggregator
duration *time.Duration // if it's group by time()
lastState *DerivativeAggregatorState
defaultValue *protocol.FieldValue
alias string
}
Expand Down Expand Up @@ -393,12 +395,27 @@ func (self *DerivativeAggregator) AggregatePoint(state interface{}, p *protocol.
s = &DerivativeAggregatorState{}
}

// starting a new bucket? (only for group by time())
if s != self.lastState && self.duration != nil {
// if there was a previous bucket, update its lastValue
if self.lastState != nil {
self.lastState.lastValue = newValue
}
// save the current state as the last
self.lastState = s
}

if s.firstValue == nil {
s.firstValue = newValue
return s, nil
}

s.lastValue = newValue
if self.duration == nil {
s.lastValue = newValue
} else {
s.lastValue = s.firstValue
}

return s, nil
}

Expand Down Expand Up @@ -444,13 +461,20 @@ func NewDerivativeAggregator(q *parser.SelectQuery, v *parser.Value, defaultValu
return nil, err
}

return &DerivativeAggregator{
da := &DerivativeAggregator{
AbstractAggregator: AbstractAggregator{
value: v.Elems[0],
},
defaultValue: wrappedDefaultValue,
alias: v.Alias,
}, nil
}

da.duration, _, err = q.GetGroupByClause().GetGroupByTime()
if err != nil {
return nil, err
}

return da, nil
}

//
Expand Down
74 changes: 63 additions & 11 deletions integration/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,10 +2309,10 @@ var (
[
{
"points": [
[310000, 400.0],
[300000, 30.0],
[120000, 20.0],
[60000, 10.0],
[310000, 15.0]
[60000, 5.0]
],
"name": "data",
"columns": ["time", "value"]
Expand Down Expand Up @@ -2340,7 +2340,7 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg
// write test data to the database
self.client.WriteJsonData(tstData, c, influxdb.Millisecond)
// build the test query string
query := fmtFillQuery(aggregate, aggArgs, "data", fill)
query := fmtQuery(aggregate, aggArgs, "data", fill)
// run the query
series := self.client.RunQuery(query, c)
// check that we only got one result series
Expand All @@ -2357,12 +2357,17 @@ func (self *DataTestSuite) tstAggregateFill(tstData, aggregate, fill string, agg
}
}

func fmtFillQuery(aggregate string, aggArgs []interface{}, series, fill string) string {
func fmtQuery(aggregate string, aggArgs []interface{}, series, fill string) string {
args := "value"
for _, arg := range aggArgs {
args = fmt.Sprintf("%s, %v", args, arg)
}
return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill)

if fill != "" {
return fmt.Sprintf("select %s(%s) from %s group by time(60s) fill(%s) where time > 60s and time < 320s", aggregate, args, series, fill)
}

return fmt.Sprintf("select %s(%s) from %s group by time(60s) where time > 60s and time < 320s", aggregate, args, series)
}

var emptyAggArgs []interface{}
Expand All @@ -2373,6 +2378,53 @@ type tv struct {
Value interface{}
}

// Test Derivative with consecutive buckets and filling later buckets
func (self *DataTestSuite) TestIssue334DerivativeConsecutiveBucketsFillLater(c *C) {
data := `
[
{
"name": "data",
"columns": ["time", "value"],
"points": [
[130000, 80.0],
[120000, 40.0],
[70000, 20.0],
[60000, 10.0]
]
}
]`
// the 120000 bucket includes the points 80.0, 40.0 and the new value from the next bucket 20.0
// the 60000 bucket includes two points only 20.0 and 10.0
expect := []tv{{300000.0, nil}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}}
self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c)
}

// Test Derivative with non-consecutive buckets and filling in between
func (self *DataTestSuite) TestIssue334DerivativeNonConsecutiveBucketsFillBetween(c *C) {
data := `
[
{
"name": "data",
"columns": ["time", "value"],
"points": [
[250000, 320.0],
[240000, 90.0],
[130000, 80.0],
[120000, 40.0],
[70000, 20.0],
[60000, 10.0]
]
}
]`
// The 300000 bucket includes no points
// the 240000 bucket includes 320.0, 90.0 and 80.0 from the following bucket (240.0 / 120 = 2)
// the 180000 bucket includes no points
// the 120000 bucket includes 80.0, 40.0 and 20.0 from the next bucket (60.0 / 60 = 1)
// the 60000 bucket includes 20.0 and 10.0 (10 / 10 = 1)
expect := []tv{{300000.0, nil}, {240000.0, 2.0}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, nil}}
self.tstAggregateFill(data, "derivative", "null", emptyAggArgs, expect, c)
}

// count aggregate filling with null
func (self *DataTestSuite) TestCountAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, 1.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 1.0}, {60000.0, 1.0}}
Expand Down Expand Up @@ -2537,32 +2589,32 @@ func (self *DataTestSuite) TestBottom10AggregateFillWith0(c *C) {

// derivative aggregate filling with null
func (self *DataTestSuite) TestDerivativeAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, -1.5}, {240000.0, nil}, {180000.0, nil}}
expVals := []tv{{300000.0, 2.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, 0.25}}
self.tstAggregateFill(aggTstData2, "derivative", "null", emptyAggArgs, expVals, c)
}

// derivative aggregate filling with 0
func (self *DataTestSuite) TestDerivativeAggregateFillWith0(c *C) {
expVals := []tv{{300000.0, -1.5}, {240000.0, 0.0}, {180000.0, 0.0}}
expVals := []tv{{300000.0, 2.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.25}}
self.tstAggregateFill(aggTstData2, "derivative", "0", emptyAggArgs, expVals, c)
}

// difference aggregate filling with null
func (self *DataTestSuite) TestDifferenceAggregateFillWithNull(c *C) {
expVals := []tv{{300000.0, 15.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}}
expVals := []tv{{300000.0, -370.0}, {240000.0, nil}, {180000.0, nil}, {120000.0, nil}, {60000.0, nil}}
self.tstAggregateFill(aggTstData2, "difference", "null", emptyAggArgs, expVals, c)
}

// difference aggregate filling with 0
func (self *DataTestSuite) TestDifferenceAggregateFillWith0(c *C) {
expVals := []tv{{300000.0, 15.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}}
expVals := []tv{{300000.0, -370.0}, {240000.0, 0.0}, {180000.0, 0.0}, {120000.0, 0.0}, {60000.0, 0.0}}
self.tstAggregateFill(aggTstData2, "difference", "0", emptyAggArgs, expVals, c)
}

// histogram aggregate filling with null
func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) {
self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond)
series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "null"), c)
series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "null"), c)
c.Assert(len(series), Equals, 1)
maps := ToMap(series[0])
c.Assert(len(maps), Equals, 6)
Expand All @@ -2574,7 +2626,7 @@ func (self *DataTestSuite) TestHistogramAggregateFillWithNull(c *C) {
// histogram aggregate filling with 0
func (self *DataTestSuite) TestHistogramAggregateFillWith0(c *C) {
self.client.WriteJsonData(aggTstData2, c, influxdb.Millisecond)
series := self.client.RunQuery(fmtFillQuery("histogram", []interface{}{}, "data", "0"), c)
series := self.client.RunQuery(fmtQuery("histogram", []interface{}{}, "data", "0"), c)
c.Assert(len(series), Equals, 1)
maps := ToMap(series[0])
c.Assert(len(maps), Equals, 6)
Expand Down

0 comments on commit 4038818

Please sign in to comment.