Skip to content

Commit

Permalink
Merge pull request #8081 from influxdata/js-8045-subqueries-with-cond…
Browse files Browse the repository at this point in the history
…itions

Refactor the subquery code and fix outer condition queries
  • Loading branch information
jsternberg authored May 1, 2017
2 parents 8655042 + df30a4d commit 9bd7fce
Show file tree
Hide file tree
Showing 8 changed files with 504 additions and 386 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
- [#8266](https://github.com/influxdata/influxdb/issues/8266): top() and bottom() now returns the time for every point.
- [#8315](https://github.com/influxdata/influxdb/issues/8315): Remove default upper time bound on DELETE queries.
- [#8066](https://github.com/influxdata/influxdb/issues/8066): Fix LIMIT and OFFSET for certain aggregate queries.
- [#8045](https://github.com/influxdata/influxdb/issues/8045): Refactor the subquery code and fix outer condition queries.

## v1.2.3 [unreleased]

Expand Down
256 changes: 256 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2736,6 +2736,70 @@ type floatDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type floatIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point FloatPoint
}

func newFloatIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *floatIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &floatIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: FloatPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *floatIteratorMapper) Next() (*FloatPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(float64); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = 0
itr.point.Nil = true
}
} else {
itr.point.Value = 0
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *floatIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *floatIteratorMapper) Close() error {
return itr.e.Close()
}

type floatFilterIterator struct {
input FloatIterator
cond Expr
Expand Down Expand Up @@ -5585,6 +5649,70 @@ type integerDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type integerIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point IntegerPoint
}

func newIntegerIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *integerIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &integerIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: IntegerPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *integerIteratorMapper) Next() (*IntegerPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(int64); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = 0
itr.point.Nil = true
}
} else {
itr.point.Value = 0
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *integerIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *integerIteratorMapper) Close() error {
return itr.e.Close()
}

type integerFilterIterator struct {
input IntegerIterator
cond Expr
Expand Down Expand Up @@ -8420,6 +8548,70 @@ type stringDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type stringIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point StringPoint
}

func newStringIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *stringIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &stringIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: StringPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *stringIteratorMapper) Next() (*StringPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(string); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = ""
itr.point.Nil = true
}
} else {
itr.point.Value = ""
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *stringIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *stringIteratorMapper) Close() error {
return itr.e.Close()
}

type stringFilterIterator struct {
input StringIterator
cond Expr
Expand Down Expand Up @@ -11255,6 +11447,70 @@ type booleanDedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type booleanIteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point BooleanPoint
}

func newBooleanIteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *booleanIteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &booleanIteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: BooleanPoint{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *booleanIteratorMapper) Next() (*BooleanPoint, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.(bool); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = false
itr.point.Nil = true
}
} else {
itr.point.Value = false
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *booleanIteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *booleanIteratorMapper) Close() error {
return itr.e.Close()
}

type booleanFilterIterator struct {
input BooleanIterator
cond Expr
Expand Down
64 changes: 64 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,70 @@ type {{$k.name}}DedupeIterator struct {
m map[string]struct{} // lookup of points already sent
}

type {{$k.name}}IteratorMapper struct {
e *Emitter
buf []interface{}
driver IteratorMap // which iterator to use for the primary value, can be nil
fields []IteratorMap // which iterator to use for an aux field
point {{$k.Name}}Point
}

func new{{$k.Name}}IteratorMapper(itrs []Iterator, driver IteratorMap, fields []IteratorMap, opt IteratorOptions) *{{$k.name}}IteratorMapper {
e := NewEmitter(itrs, opt.Ascending, 0)
e.OmitTime = true
return &{{$k.name}}IteratorMapper{
e: e,
buf: make([]interface{}, len(itrs)),
driver: driver,
fields: fields,
point: {{$k.Name}}Point{
Aux: make([]interface{}, len(fields)),
},
}
}

func (itr *{{$k.name}}IteratorMapper) Next() (*{{$k.Name}}Point, error) {
t, name, tags, err := itr.e.loadBuf()
if err != nil || t == ZeroTime {
return nil, err
}
itr.point.Time = t
itr.point.Name = name
itr.point.Tags = tags

itr.e.readInto(t, name, tags, itr.buf)
if itr.driver != nil {
if v := itr.driver.Value(tags, itr.buf); v != nil {
if v, ok := v.({{$k.Type}}); ok {
itr.point.Value = v
itr.point.Nil = false
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
} else {
itr.point.Value = {{$k.Nil}}
itr.point.Nil = true
}
}
for i, f := range itr.fields {
itr.point.Aux[i] = f.Value(tags, itr.buf)
}
return &itr.point, nil
}

func (itr *{{$k.name}}IteratorMapper) Stats() IteratorStats {
stats := IteratorStats{}
for _, itr := range itr.e.itrs {
stats.Add(itr.Stats())
}
return stats
}

func (itr *{{$k.name}}IteratorMapper) Close() error {
return itr.e.Close()
}

type {{$k.name}}FilterIterator struct {
input {{$k.Name}}Iterator
cond Expr
Expand Down
Loading

0 comments on commit 9bd7fce

Please sign in to comment.