From 41c8370bbc550ecfbf2652872bbc7d91fa573c9e Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 16 Mar 2017 15:51:23 -0500 Subject: [PATCH] Fix fill(linear) when multiple series exist and there are null values When there were multiple series and anything other than the last series had any null values, the series would start using the first point from the next series to interpolate points. Interpolation should not cross between series. Now, the linear fill checks to make sure the next point is within the same series before using it to perform interpolation. --- CHANGELOG.md | 1 + influxql/iterator.gen.go | 6 +-- influxql/iterator.gen.go.tmpl | 3 +- influxql/select_test.go | 72 +++++++++++++++++++++++++++++++++++ 4 files changed, 76 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a30a459994..10e742465d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#8091](https://github.com/influxdata/influxdb/issues/8091): Do not increment the continuous query statistic if no query is run. - [#8064](https://github.com/influxdata/influxdb/issues/8064): Forbid wildcards in binary expressions. +- [#8148](https://github.com/influxdata/influxdb/issues/8148): Fix fill(linear) when multiple series exist and there are null values. ## v1.2.2 [2017-03-14] diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index aa82c9104a1..acf2a8a5926 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -680,8 +680,7 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearFloat(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) @@ -3338,8 +3337,7 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linearInteger(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 5269fb9b28c..08e39457f9f 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -679,8 +679,7 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { next, err := itr.input.peek() if err != nil { return nil, err - } - if next != nil { + } else if next != nil && next.Name == itr.window.name && next.Tags.ID() == itr.window.tags.ID() { interval := int64(itr.opt.Interval.Duration) start := itr.window.time / interval p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value) diff --git a/influxql/select_test.go b/influxql/select_test.go index d738207a158..62b7aafd2d5 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -1291,6 +1291,42 @@ func TestSelect_Fill_Linear_Float_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Float_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT query with a fill(linear) statement can be executed for integers. func TestSelect_Fill_Linear_Integer_One(t *testing.T) { var ic IteratorCreator @@ -1354,6 +1390,42 @@ func TestSelect_Fill_Linear_Integer_Many(t *testing.T) { } } +func TestSelect_Fill_Linear_Integer_MultipleSeries(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(m *influxql.Measurement, opt influxql.IteratorOptions) (influxql.Iterator, error) { + if m.Name != "cpu" { + t.Fatalf("unexpected source: %s", m.Name) + } + return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2}, + {Name: "cpu", Tags: ParseTags("host=B"), Time: 32 * Second, Value: 4}, + }}, opt) + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 0 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 10 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 20 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 30 * Second, Value: 4, Aggregated: 1}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 40 * Second, Nil: true}}, + {&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=B"), Time: 50 * Second, Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Ensure a SELECT stddev() query can be executed. func TestSelect_Stddev_Float(t *testing.T) { var ic IteratorCreator