diff --git a/.gitignore b/.gitignore index 0d9653f78d1..7f3f51d4d76 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,6 @@ integration/migration_data/ # goconvey config files *.goconvey + +// Ingnore SourceGraph directory +.srclib-store/ diff --git a/CHANGELOG.md b/CHANGELOG.md index ffa3f7c8599..452e0663dcf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ With this release InfluxDB is moving to Go 1.5. - [#3876](https://github.com/influxdb/influxdb/pull/3876): Allow the following syntax in CQs: INTO "1hPolicy".:MEASUREMENT - [#3975](https://github.com/influxdb/influxdb/pull/3975): Add shard copy service - [#3986](https://github.com/influxdb/influxdb/pull/3986): Support sorting by time desc +- [#3930](https://github.com/influxdb/influxdb/pull/3930): Wire up TOP aggregate function - fixes [#1821](https://github.com/influxdb/influxdb/issues/1821) ### Bugfixes - [#3804](https://github.com/influxdb/influxdb/pull/3804): init.d script fixes, fixes issue 3803. diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 67a2b2fecab..028dbe70e4b 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -333,6 +333,7 @@ func configureLogging(s *Server) { s.MetaStore.Logger = nullLogger s.TSDBStore.Logger = nullLogger s.HintedHandoff.SetLogger(nullLogger) + s.Monitor.SetLogger(nullLogger) for _, service := range s.Services { if service, ok := service.(logSetter); ok { service.SetLogger(nullLogger) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 65d926914d1..7e90e525efd 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -993,7 +993,7 @@ func TestServer_Query_Count(t *testing.T) { &Query{ name: "selecting count(*) should error", command: `SELECT count(*) FROM db0.rp0.cpu`, - exp: `{"results":[{"error":"expected field argument in count()"}]}`, + exp: `{"error":"error parsing query: expected field argument in count()"}`, }, }...) @@ -2229,6 +2229,193 @@ func TestServer_Query_Aggregates(t *testing.T) { } } +func TestServer_Query_AggregatesTopInt(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig(), "") + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil { + t.Fatal(err) + } + if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil { + t.Fatal(err) + } + + writes := []string{ + // cpu data with overlapping duplicate values + // hour 0 + fmt.Sprintf(`cpu,host=server01 value=2.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`cpu,host=server02 value=3.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()), + fmt.Sprintf(`cpu,host=server03 value=4.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()), + // hour 1 + fmt.Sprintf(`cpu,host=server04 value=5.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()), + fmt.Sprintf(`cpu,host=server05 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:10Z").UnixNano()), + fmt.Sprintf(`cpu,host=server06 value=6.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:20Z").UnixNano()), + // hour 2 + fmt.Sprintf(`cpu,host=server07 value=7.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()), + fmt.Sprintf(`cpu,host=server08 value=9.0 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:10Z").UnixNano()), + + // memory data + // hour 0 + fmt.Sprintf(`memory,host=a,service=redis value=1000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=mysql value=2000i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=redis value=1500i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()), + // hour 1 + fmt.Sprintf(`memory,host=a,service=redis value=1001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=mysql value=2001i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=redis value=1501i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T01:00:00Z").UnixNano()), + // hour 2 + fmt.Sprintf(`memory,host=a,service=redis value=1002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=mysql value=2002i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()), + fmt.Sprintf(`memory,host=b,service=redis value=1502i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T02:00:00Z").UnixNano()), + } + + test := NewTest("db0", "rp0") + test.write = strings.Join(writes, "\n") + + test.addQueries([]*Query{ + &Query{ + name: "top - cpu", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 1) FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - 2 values", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 2) FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - 3 values - sorts on tie properly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 3) FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - with tag", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, 2) FROM cpu`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top","host"],"values":[["2000-01-01T01:00:10Z",7,"server05"],["2000-01-01T02:00:10Z",9,"server08"]]}]}]}`, + }, + &Query{ + name: "top - cpu - 3 values with limit 2", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 3) FROM cpu limit 2`, + exp: `{"error":"error parsing query: limit (3) in top function can not be larger than the LIMIT (2) in the select statement"}`, + }, + &Query{ + name: "top - cpu - hourly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T01:00:00Z",7],["2000-01-01T02:00:00Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - time specified - hourly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT time, TOP(value, 1) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - time specified (not first) - hourly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 1), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - 2 values hourly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 2) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`, + }, + &Query{ + name: "top - cpu - time specified - 2 values hourly", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 2), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - cpu - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 3) FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",4],["2000-01-01T00:00:00Z",3],["2000-01-01T00:00:00Z",2],["2000-01-01T01:00:00Z",7],["2000-01-01T01:00:00Z",6],["2000-01-01T01:00:00Z",5],["2000-01-01T02:00:00Z",9],["2000-01-01T02:00:00Z",7]]}]}]}`, + }, + &Query{ + name: "top - cpu - time specified - 3 values hourly - validates that a bucket can have less than limit if no values exist in that time bucket", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 3), time FROM cpu where time >= '2000-01-01T00:00:00Z' and time <= '2000-01-01T02:00:10Z' group by time(1h)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","top"],"values":[["2000-01-01T00:00:00Z",2],["2000-01-01T00:00:10Z",3],["2000-01-01T00:00:20Z",4],["2000-01-01T01:00:00Z",5],["2000-01-01T01:00:10Z",7],["2000-01-01T01:00:20Z",6],["2000-01-01T02:00:00Z",7],["2000-01-01T02:00:10Z",9]]}]}]}`, + }, + &Query{ + name: "top - memory - 2 values, two tags", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, 2), host, service FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T01:00:00Z",2001,"b","mysql"],["2000-01-01T02:00:00Z",2002,"b","mysql"]]}]}]}`, + }, + &Query{ + name: "top - memory - host tag with limit 2", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, 2) FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host"],"values":[["2000-01-01T02:00:00Z",2002,"b"],["2000-01-01T02:00:00Z",1002,"a"]]}]}]}`, + }, + &Query{ + name: "top - memory - host tag with limit 2, service tag in select", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, 2), service FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`, + }, + &Query{ + name: "top - memory - service tag with limit 2, host tag in select", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, service, 2), host FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","service","host"],"values":[["2000-01-01T02:00:00Z",2002,"mysql","b"],["2000-01-01T02:00:00Z",1502,"redis","b"]]}]}]}`, + }, + &Query{ + name: "top - memory - host and service tag with limit 2", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, service, 2) FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"]]}]}]}`, + }, + &Query{ + name: "top - memory - host tag with limit 2 with service tag in select", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, 2), service FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`, + }, + &Query{ + name: "top - memory - host and service tag with limit 3", + params: url.Values{"db": []string{"db0"}}, + command: `SELECT TOP(value, host, service, 3) FROM memory`, + exp: `{"results":[{"series":[{"name":"memory","columns":["time","top","host","service"],"values":[["2000-01-01T02:00:00Z",2002,"b","mysql"],["2000-01-01T02:00:00Z",1502,"b","redis"],["2000-01-01T02:00:00Z",1002,"a","redis"]]}]}]}`, + }, + + // TODO + // - Test that specifiying fields or tags in the function will rewrite the query to expand them to the fields + // - Test that a field can be used in the top function + // - Test that asking for a field will come back before a tag if they have the same name for a tag and a field + // - Test that `select top(value, host, 2)` when there is only one value for `host` it will only bring back one value + // - Test that `select top(value, host, 4) from foo where time > now() - 1d and time < now() group by time(1h)` and host is unique in some time buckets that it returns only the unique ones, and not always 4 values + + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + // Test various aggregates when different series only have data for the same timestamp. func TestServer_Query_AggregatesIdenticalTime(t *testing.T) { t.Parallel() diff --git a/influxql/ast.go b/influxql/ast.go index d51face8ec6..873023f0a00 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -9,6 +9,8 @@ import ( "strconv" "strings" "time" + + "github.com/influxdb/influxdb/pkg/slices" ) // DataType represents the primitive data types available in InfluxQL. @@ -857,6 +859,48 @@ func (s *SelectStatement) RewriteDistinct() { } } +// ColumnNames will walk all fields and functions and return the appropriate field names for the select statement +// while maintaining order of the field names +func (s *SelectStatement) ColumnNames() []string { + // Always set the first column to be time, even if they didn't specify it + columnNames := []string{"time"} + + // First walk each field + for _, field := range s.Fields { + switch f := field.Expr.(type) { + case *Call: + if f.Name == "top" || f.Name == "bottom" { + if len(f.Args) == 2 { + columnNames = append(columnNames, f.Name) + continue + } + // We have a special case now where we have to add the column names for the fields TOP or BOTTOM asked for as well + columnNames = slices.Union(columnNames, f.Fields(), true) + continue + } + columnNames = append(columnNames, field.Name()) + default: + // time is always first, and we already added it, so ignore it if they asked for it anywhere else. + if field.Name() != "time" { + columnNames = append(columnNames, field.Name()) + } + } + } + + return columnNames +} + +// HasTimeFieldSpecified will walk all fields and determine if the user explicitly asked for time +// This is needed to determine re-write behaviors for functions like TOP and BOTTOM +func (s *SelectStatement) HasTimeFieldSpecified() bool { + for _, f := range s.Fields { + if f.Name() == "time" { + return true + } + } + return false +} + // String returns a string representation of the select statement. func (s *SelectStatement) String() string { var buf bytes.Buffer @@ -1029,40 +1073,96 @@ func (s *SelectStatement) validateFields() error { return nil } +// validSelectWithAggregate determines if a SELECT statement has the correct +// combination of aggregate functions combined with selected fields and tags +// Currently we don't have support for all aggregates, but aggregates that +// can be combined with fields/tags are: +// TOP, BOTTOM, MAX, MIN, FIRST, LAST +func (s *SelectStatement) validSelectWithAggregate(numAggregates int) error { + if numAggregates != 0 && numAggregates != len(s.Fields) { + return fmt.Errorf("mixing aggregate and non-aggregate queries is not supported") + } + return nil +} + func (s *SelectStatement) validateAggregates(tr targetRequirement) error { - // First, if 1 field is an aggregate, then all fields must be an aggregate. This is - // a explicit limitation of the current system. + // Curently most aggregates can be the ONLY thing in a select statement + // Others, like TOP/BOTTOM can mix aggregates and tags/fields numAggregates := 0 for _, f := range s.Fields { if _, ok := f.Expr.(*Call); ok { numAggregates++ } } - if numAggregates != 0 && numAggregates != len(s.Fields) { - return fmt.Errorf("mixing aggregate and non-aggregate queries is not supported") - } - // Secondly, determine if specific calls have at least one and only one argument for _, f := range s.Fields { - if c, ok := f.Expr.(*Call); ok { - switch c.Name { + switch expr := f.Expr.(type) { + case *Call: + switch expr.Name { case "derivative", "non_negative_derivative": - if min, max, got := 1, 2, len(c.Args); got > max || got < min { - return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", c.Name, min, max, got) + if err := s.validSelectWithAggregate(numAggregates); err != nil { + return err + } + if min, max, got := 1, 2, len(expr.Args); got > max || got < min { + return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", expr.Name, min, max, got) } case "percentile": - if exp, got := 2, len(c.Args); got != exp { - return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got) + if err := s.validSelectWithAggregate(numAggregates); err != nil { + return err + } + if exp, got := 2, len(expr.Args); got != exp { + return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) + } + _, ok := expr.Args[1].(*NumberLiteral) + if !ok { + return fmt.Errorf("expected float argument in percentile()") + } + case "top", "bottom": + if exp, got := 2, len(expr.Args); got < exp { + return fmt.Errorf("invalid number of arguments for %s, expected at least %d, got %d", expr.Name, exp, got) + } + if len(expr.Args) > 1 { + callLimit, ok := expr.Args[len(expr.Args)-1].(*NumberLiteral) + if !ok { + return fmt.Errorf("expected integer as last argument in %s(), found %s", expr.Name, expr.Args[len(expr.Args)-1]) + } + // Check if they asked for a limit smaller than what they passed into the call + if int64(callLimit.Val) > int64(s.Limit) && s.Limit != 0 { + return fmt.Errorf("limit (%d) in %s function can not be larger than the LIMIT (%d) in the select statement", int64(callLimit.Val), expr.Name, int64(s.Limit)) + } + + for _, v := range expr.Args[:len(expr.Args)-1] { + if _, ok := v.(*VarRef); !ok { + return fmt.Errorf("only fields or tags are allowed in %s(), found %s", expr.Name, v) + } + } } default: - if exp, got := 1, len(c.Args); got != exp { - return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", c.Name, exp, got) + if err := s.validSelectWithAggregate(numAggregates); err != nil { + return err + } + if exp, got := 1, len(expr.Args); got != exp { + return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got) + } + switch fc := expr.Args[0].(type) { + case *VarRef: + // do nothing + case *Call: + if fc.Name != "distinct" { + return fmt.Errorf("expected field argument in %s()", expr.Name) + } + case *Distinct: + if expr.Name != "count" { + return fmt.Errorf("expected field argument in %s()", expr.Name) + } + default: + return fmt.Errorf("expected field argument in %s()", expr.Name) } } } } - // Now, check that we have valid duration and where clauses for aggregates + // Check that we have valid duration and where clauses for aggregates // fetch the group by duration groupByDuration, _ := s.GroupByInterval() @@ -2241,6 +2341,33 @@ func (c *Call) String() string { return fmt.Sprintf("%s(%s)", c.Name, strings.Join(str, ", ")) } +// Fields will extract any field names from the call. Only specific calls support this. +func (c *Call) Fields() []string { + switch c.Name { + case "top", "bottom": + // maintain the order the user specified in the query + keyMap := make(map[string]struct{}) + keys := []string{} + for i, a := range c.Args { + if i == 0 { + // special case, first argument is always the name of the function regardless of the field name + keys = append(keys, c.Name) + continue + } + switch v := a.(type) { + case *VarRef: + if _, ok := keyMap[v.Val]; !ok { + keyMap[v.Val] = struct{}{} + keys = append(keys, v.Val) + } + } + } + return keys + default: + return []string{} + } +} + // Distinct represents a DISTINCT expression. type Distinct struct { // Identifier following DISTINCT diff --git a/influxql/ast_test.go b/influxql/ast_test.go index b58e7353b6f..ca28361118f 100644 --- a/influxql/ast_test.go +++ b/influxql/ast_test.go @@ -451,7 +451,7 @@ func TestSelectStatement_IsRawQuerySet(t *testing.T) { isRaw: false, }, { - stmt: "select mean(*) from foo group by *", + stmt: "select mean(value) from foo group by *", isRaw: false, }, } diff --git a/influxql/functions.go b/influxql/functions.go index 87119441bf6..b9774ff37b6 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -12,13 +12,14 @@ import ( "math" "math/rand" "sort" - "strings" ) // Iterator represents a forward-only iterator over a set of points. // These are used by the MapFunctions in this file type Iterator interface { Next() (time int64, value interface{}) + Tags() map[string]string + TMin() int64 } // MapFunc represents a function used for mapping over a sequential series of data. @@ -39,39 +40,6 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return MapRawQuery, nil } - // Ensure that there is either a single argument or if for percentile, two - if c.Name == "percentile" { - if len(c.Args) != 2 { - return nil, fmt.Errorf("expected two arguments for %s()", c.Name) - } - } else if strings.HasSuffix(c.Name, "derivative") { - // derivatives require a field name and optional duration - if len(c.Args) == 0 { - return nil, fmt.Errorf("expected field name argument for %s()", c.Name) - } - } else if len(c.Args) != 1 { - return nil, fmt.Errorf("expected one argument for %s()", c.Name) - } - - // derivative can take a nested aggregate function, everything else expects - // a variable reference as the first arg - if !strings.HasSuffix(c.Name, "derivative") { - // Ensure the argument is appropriate for the aggregate function. - switch fc := c.Args[0].(type) { - case *VarRef: - case *Distinct: - if c.Name != "count" { - return nil, fmt.Errorf("expected field argument in %s()", c.Name) - } - case *Call: - if fc.Name != "distinct" { - return nil, fmt.Errorf("expected field argument in %s()", c.Name) - } - default: - return nil, fmt.Errorf("expected field argument in %s()", c.Name) - } - } - // Retrieve map function by name. switch c.Name { case "count": @@ -104,11 +72,11 @@ func InitializeMapFunc(c *Call) (MapFunc, error) { return MapFirst, nil case "last": return MapLast, nil + case "top": + return func(itr Iterator) interface{} { + return MapTop(itr, c) + }, nil case "percentile": - _, ok := c.Args[1].(*NumberLiteral) - if !ok { - return nil, fmt.Errorf("expected float argument in percentile()") - } return MapEcho, nil case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then @@ -156,16 +124,14 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) { return ReduceFirst, nil case "last": return ReduceLast, nil + case "top": + return func(values []interface{}) interface{} { + return ReduceTop(values, c) + }, nil case "percentile": - if len(c.Args) != 2 { - return nil, fmt.Errorf("expected float argument in percentile()") - } - - lit, ok := c.Args[1].(*NumberLiteral) - if !ok { - return nil, fmt.Errorf("expected float argument in percentile()") - } - return ReducePercentile(lit.Val), nil + return func(values []interface{}) interface{} { + return ReducePercentile(values, c) + }, nil case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate @@ -204,7 +170,7 @@ func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) { }, nil case "distinct": return func(b []byte) (interface{}, error) { - var val distinctValues + var val interfaceValues err := json.Unmarshal(b, &val) return val, err }, nil @@ -253,12 +219,14 @@ func MapCount(itr Iterator) interface{} { return nil } -type distinctValues []interface{} +type interfaceValues []interface{} -func (d distinctValues) Len() int { return len(d) } -func (d distinctValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] } -func (d distinctValues) Less(i, j int) bool { +func (d interfaceValues) Len() int { return len(d) } +func (d interfaceValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] } +func (d interfaceValues) Less(i, j int) bool { // Sort by type if types match + + // Sort by float64/int64 first as that is the most likely match { d1, ok1 := d[i].(float64) d2, ok2 := d[j].(float64) @@ -267,6 +235,23 @@ func (d distinctValues) Less(i, j int) bool { } } + { + d1, ok1 := d[i].(int64) + d2, ok2 := d[j].(int64) + if ok1 && ok2 { + return d1 < d2 + } + } + + // Sort by every numeric type left + { + d1, ok1 := d[i].(float32) + d2, ok2 := d[j].(float32) + if ok1 && ok2 { + return d1 < d2 + } + } + { d1, ok1 := d[i].(uint64) d2, ok2 := d[j].(uint64) @@ -275,6 +260,54 @@ func (d distinctValues) Less(i, j int) bool { } } + { + d1, ok1 := d[i].(uint32) + d2, ok2 := d[j].(uint32) + if ok1 && ok2 { + return d1 < d2 + } + } + + { + d1, ok1 := d[i].(uint16) + d2, ok2 := d[j].(uint16) + if ok1 && ok2 { + return d1 < d2 + } + } + + { + d1, ok1 := d[i].(uint8) + d2, ok2 := d[j].(uint8) + if ok1 && ok2 { + return d1 < d2 + } + } + + { + d1, ok1 := d[i].(int32) + d2, ok2 := d[j].(int32) + if ok1 && ok2 { + return d1 < d2 + } + } + + { + d1, ok1 := d[i].(int16) + d2, ok2 := d[j].(int16) + if ok1 && ok2 { + return d1 < d2 + } + } + + { + d1, ok1 := d[i].(int8) + d2, ok2 := d[j].(int8) + if ok1 && ok2 { + return d1 < d2 + } + } + { d1, ok1 := d[i].(bool) d2, ok2 := d[j].(bool) @@ -303,16 +336,30 @@ func (d distinctValues) Less(i, j int) bool { switch v := val.(type) { case uint64: return intWeight, float64(v) + case uint32: + return intWeight, float64(v) + case uint16: + return intWeight, float64(v) + case uint8: + return intWeight, float64(v) case int64: return intWeight, float64(v) + case int32: + return intWeight, float64(v) + case int16: + return intWeight, float64(v) + case int8: + return intWeight, float64(v) case float64: - return floatWeight, v + return floatWeight, float64(v) + case float32: + return floatWeight, float64(v) case bool: return boolWeight, 0 case string: return stringWeight, 0 } - panic("unreachable code") + panic("interfaceValues.Less - unreachable code") } w1, n1 := infer(d[i]) @@ -338,7 +385,7 @@ func MapDistinct(itr Iterator) interface{} { return nil } - results := make(distinctValues, len(index)) + results := make(interfaceValues, len(index)) var i int for value, _ := range index { results[i] = value @@ -356,7 +403,7 @@ func ReduceDistinct(values []interface{}) interface{} { if v == nil { continue } - d, ok := v.(distinctValues) + d, ok := v.(interfaceValues) if !ok { msg := fmt.Sprintf("expected distinctValues, got: %T", v) panic(msg) @@ -367,7 +414,7 @@ func ReduceDistinct(values []interface{}) interface{} { } // convert map keys to an array - results := make(distinctValues, len(index)) + results := make(interfaceValues, len(index)) var i int for k, _ := range index { results[i] = k @@ -1035,6 +1082,414 @@ func ReduceLast(values []interface{}) interface{} { return nil } +type positionOut struct { + points PositionPoints + callArgs []string // ordered args in the call +} + +func (p *positionOut) lessKey(i, j int) bool { + t1, t2 := p.points[i].Tags, p.points[j].Tags + for _, k := range p.callArgs { + if t1[k] != t2[k] { + return t1[k] < t2[k] + } + } + return false +} + +func (p *positionOut) less(i, j int, sortFloat func(d1, d2 float64) bool, sortInt64 func(d1, d2 int64) bool, sortUint64 func(d1, d2 uint64) bool) bool { + // Sort by float64/int64 first as that is the most likely match + { + d1, ok1 := p.points[i].Value.(float64) + d2, ok2 := p.points[j].Value.(float64) + if ok1 && ok2 { + return sortFloat(d1, d2) + } + } + + { + d1, ok1 := p.points[i].Value.(int64) + d2, ok2 := p.points[j].Value.(int64) + if ok1 && ok2 { + return sortInt64(d1, d2) + } + } + + // Sort by every numeric type left + { + d1, ok1 := p.points[i].Value.(float32) + d2, ok2 := p.points[j].Value.(float32) + if ok1 && ok2 { + return sortFloat(float64(d1), float64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(uint64) + d2, ok2 := p.points[j].Value.(uint64) + if ok1 && ok2 { + return sortUint64(d1, d2) + } + } + + { + d1, ok1 := p.points[i].Value.(uint32) + d2, ok2 := p.points[j].Value.(uint32) + if ok1 && ok2 { + return sortUint64(uint64(d1), uint64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(uint16) + d2, ok2 := p.points[j].Value.(uint16) + if ok1 && ok2 { + return sortUint64(uint64(d1), uint64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(uint8) + d2, ok2 := p.points[j].Value.(uint8) + if ok1 && ok2 { + return sortUint64(uint64(d1), uint64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(int32) + d2, ok2 := p.points[j].Value.(int32) + if ok1 && ok2 { + return sortInt64(int64(d1), int64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(int16) + d2, ok2 := p.points[j].Value.(int16) + if ok1 && ok2 { + return sortInt64(int64(d1), int64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(int8) + d2, ok2 := p.points[j].Value.(int8) + if ok1 && ok2 { + return sortInt64(int64(d1), int64(d2)) + } + } + + { + d1, ok1 := p.points[i].Value.(bool) + d2, ok2 := p.points[j].Value.(bool) + if ok1 && ok2 { + return d1 == true && d2 == false + } + } + + { + d1, ok1 := p.points[i].Value.(string) + d2, ok2 := p.points[j].Value.(string) + if ok1 && ok2 { + return d1 < d2 + } + } + + // Types did not match, need to sort based on arbitrary weighting of type + const ( + intWeight = iota + floatWeight + boolWeight + stringWeight + ) + + infer := func(val interface{}) (int, float64) { + switch v := val.(type) { + case uint64: + return intWeight, float64(v) + case uint32: + return intWeight, float64(v) + case uint16: + return intWeight, float64(v) + case uint8: + return intWeight, float64(v) + case int64: + return intWeight, float64(v) + case int32: + return intWeight, float64(v) + case int16: + return intWeight, float64(v) + case int8: + return intWeight, float64(v) + case float64: + return floatWeight, float64(v) + case float32: + return floatWeight, float64(v) + case bool: + return boolWeight, 0 + case string: + return stringWeight, 0 + } + panic("interfaceValues.Less - unreachable code") + } + + w1, n1 := infer(p.points[i].Value) + w2, n2 := infer(p.points[j].Value) + + // If we had "numeric" data, use that for comparison + if (w1 == floatWeight || w1 == intWeight) && (w2 == floatWeight || w2 == intWeight) { + return sortFloat(n1, n2) + } + + return w1 < w2 + +} + +type PositionPoints []PositionPoint +type PositionPoint struct { + Time int64 + Value interface{} + Tags map[string]string +} + +type topMapOut struct { + positionOut +} + +func (t topMapOut) Len() int { return len(t.points) } +func (t topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] } +func (t topMapOut) Less(i, j int) bool { + sortFloat := func(d1, d2 float64) bool { + if d1 != d2 { + return d1 > d2 + } + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + return t.lessKey(i, j) + } + + sortInt64 := func(d1, d2 int64) bool { + if d1 != d2 { + return d1 > d2 + } + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + return t.lessKey(i, j) + } + + sortUint64 := func(d1, d2 uint64) bool { + if d1 != d2 { + return d1 > d2 + } + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + return t.lessKey(i, j) + } + return t.less(i, j, sortFloat, sortInt64, sortUint64) +} + +type topReduceOut struct { + positionOut +} + +func (t topReduceOut) Len() int { return len(t.points) } +func (t topReduceOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] } +func (t topReduceOut) Less(i, j int) bool { + // Now sort by time first, not value + sortFloat := func(d1, d2 float64) bool { + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + if d1 != d2 { + return d1 > d2 + } + return t.lessKey(i, j) + } + + sortInt64 := func(d1, d2 int64) bool { + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + if d1 != d2 { + return d1 > d2 + } + return t.lessKey(i, j) + } + + sortUint64 := func(d1, d2 uint64) bool { + k1, k2 := t.points[i].Time, t.points[j].Time + if k1 != k2 { + return k1 < k2 + } + if d1 != d2 { + return d1 > d2 + } + return t.lessKey(i, j) + } + return t.less(i, j, sortFloat, sortInt64, sortUint64) +} + +// callArgs will get any additional field/tag names that may be needed to sort with +// it is important to maintain the order of these that they were asked for in the call +// for sorting purposes +func topCallArgs(c *Call) []string { + var names []string + for _, v := range c.Args[1 : len(c.Args)-1] { + if f, ok := v.(*VarRef); ok { + names = append(names, f.Val) + } + } + return names +} + +// MapTop emits the top data points for each group by interval +func MapTop(itr Iterator, c *Call) interface{} { + // Capture the limit if it was specified in the call + lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral) + limit := int64(lit.Val) + + // Simple case where only value and limit are specified. + if len(c.Args) == 2 { + out := positionOut{callArgs: topCallArgs(c)} + + for k, v := itr.Next(); k != -1; k, v = itr.Next() { + t := k + if bt := itr.TMin(); bt > -1 { + t = bt + } + out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) + } + + // If we have more than we asked for, only send back the top values + if int64(len(out.points)) > limit { + sort.Sort(topMapOut{out}) + out.points = out.points[:limit] + } + if len(out.points) > 0 { + return out.points + } + return nil + } + // They specified tags in the call to get unique sets, so we need to map them as we accumulate them + outMap := make(map[string]positionOut) + + mapKey := func(args []string, fields map[string]interface{}, keys map[string]string) string { + key := "" + for _, a := range args { + if v, ok := fields[a]; ok { + key += a + ":" + fmt.Sprintf("%v", v) + "," + continue + } + if v, ok := keys[a]; ok { + key += a + ":" + v + "," + continue + } + } + return key + } + + for k, v := itr.Next(); k != -1; k, v = itr.Next() { + t := k + if bt := itr.TMin(); bt > -1 { + t = bt + } + callArgs := c.Fields() + tags := itr.Tags() + // TODO in the future we need to send in fields as well + // this will allow a user to query on both fields and tags + // fields will take the priority over tags if there is a name collision + key := mapKey(callArgs, nil, tags) + if out, ok := outMap[key]; ok { + out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) + outMap[key] = out + } else { + out = positionOut{callArgs: topCallArgs(c)} + out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) + outMap[key] = out + } + } + // Sort all the maps + for k, v := range outMap { + sort.Sort(topMapOut{v}) + outMap[k] = v + } + + slice := func(needed int64, m map[string]positionOut) PositionPoints { + points := PositionPoints{} + var collected int64 + for k, v := range m { + if len(v.points) > 0 { + points = append(points, v.points[0]) + v.points = v.points[1:] + m[k] = v + collected++ + } + } + o := positionOut{callArgs: topCallArgs(c), points: points} + sort.Sort(topMapOut{o}) + points = o.points + // If we got more than we needed, sort them and return the top + if collected > needed { + points = o.points[:needed] + } + + return points + } + + points := PositionPoints{} + var collected int64 + for collected < limit { + p := slice(limit-collected, outMap) + if len(p) == 0 { + break + } + points = append(points, p...) + collected += int64(len(p)) + } + if len(points) > 0 { + return points + } + return nil +} + +// ReduceTop computes the top values for each key. +func ReduceTop(values []interface{}, c *Call) interface{} { + lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral) + limit := int64(lit.Val) + + out := positionOut{callArgs: topCallArgs(c)} + for _, v := range values { + if v == nil { + continue + } + o, _ := v.(PositionPoints) + out.points = append(out.points, o...) + } + + // Get the top of the top values + sort.Sort(topMapOut{out}) + // If we have more than we asked for, only send back the top values + if int64(len(out.points)) > limit { + out.points = out.points[:limit] + } + + // now we need to resort the tops by time + sort.Sort(topReduceOut{out}) + if len(out.points) > 0 { + return out.points + } + return nil +} + // MapEcho emits the data points for each group by interval func MapEcho(itr Iterator) interface{} { var values []interface{} @@ -1046,36 +1501,39 @@ func MapEcho(itr Iterator) interface{} { } // ReducePercentile computes the percentile of values for each key. -func ReducePercentile(percentile float64) ReduceFunc { - return func(values []interface{}) interface{} { - var allValues []float64 +func ReducePercentile(values []interface{}, c *Call) interface{} { + // Checks that this arg exists and is a valid type are done in the parsing validation + // and have test coverage there + lit, _ := c.Args[1].(*NumberLiteral) + percentile := lit.Val - for _, v := range values { - if v == nil { - continue - } + var allValues []float64 - vals := v.([]interface{}) - for _, v := range vals { - switch v.(type) { - case int64: - allValues = append(allValues, float64(v.(int64))) - case float64: - allValues = append(allValues, v.(float64)) - } - } + for _, v := range values { + if v == nil { + continue } - sort.Float64s(allValues) - length := len(allValues) - index := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 - - if index < 0 || index >= len(allValues) { - return nil + vals := v.([]interface{}) + for _, v := range vals { + switch v.(type) { + case int64: + allValues = append(allValues, float64(v.(int64))) + case float64: + allValues = append(allValues, v.(float64)) + } } + } + + sort.Float64s(allValues) + length := len(allValues) + index := int(math.Floor(float64(length)*percentile/100.0+0.5)) - 1 - return allValues[index] + if index < 0 || index >= len(allValues) { + return nil } + + return allValues[index] } // IsNumeric returns whether a given aggregate can only be run on numeric fields. diff --git a/influxql/functions_test.go b/influxql/functions_test.go index 56303d8d9a9..d28f1aeac42 100644 --- a/influxql/functions_test.go +++ b/influxql/functions_test.go @@ -14,15 +14,24 @@ type point struct { seriesKey string time int64 value interface{} + tags map[string]string } type testIterator struct { - values []point + values []point + lastTags map[string]string + nextFunc func() (timestamp int64, value interface{}) + tagsFunc func() map[string]string + tMinFunc func() int64 } func (t *testIterator) Next() (timestamp int64, value interface{}) { + if t.nextFunc != nil { + return t.nextFunc() + } if len(t.values) > 0 { v := t.values[0] + t.lastTags = t.values[0].tags t.values = t.values[1:] return v.time, v.value } @@ -30,6 +39,20 @@ func (t *testIterator) Next() (timestamp int64, value interface{}) { return -1, nil } +func (t *testIterator) Tags() map[string]string { + if t.tagsFunc != nil { + return t.tagsFunc() + } + return t.lastTags +} + +func (t *testIterator) TMin() int64 { + if t.tMinFunc != nil { + return t.tMinFunc() + } + return -1 +} + func TestMapMeanNoValues(t *testing.T) { iter := &testIterator{} if got := MapMean(iter); got != nil { @@ -44,13 +67,13 @@ func TestMapMean(t *testing.T) { output *meanMapOutput }{ { // Single point - input: []point{point{"0", 1, 1.0}}, + input: []point{point{"0", 1, 1.0, nil}}, output: &meanMapOutput{1, 1, Float64Type}, }, { // Two points input: []point{ - point{"0", 1, 2.0}, - point{"0", 2, 8.0}, + point{"0", 1, 2.0, nil}, + point{"0", 2, 8.0, nil}, }, output: &meanMapOutput{2, 5.0, Float64Type}, }, @@ -71,55 +94,12 @@ func TestMapMean(t *testing.T) { } } } -func TestInitializeMapFuncPercentile(t *testing.T) { - // No args - c := &Call{ - Name: "percentile", - Args: []Expr{}, - } - _, err := InitializeMapFunc(c) - if err == nil { - t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) - } - - if exp := "expected two arguments for percentile()"; err.Error() != exp { - t.Errorf("InitializeMapFunc(%v) mismatch. exp %v got %v", c, exp, err.Error()) - } - - // No percentile arg - c = &Call{ - Name: "percentile", - Args: []Expr{ - &VarRef{Val: "field1"}, - }, - } - - _, err = InitializeMapFunc(c) - if err == nil { - t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) - } - - if exp := "expected two arguments for percentile()"; err.Error() != exp { - t.Errorf("InitializeMapFunc(%v) mismatch. exp %v got %v", c, exp, err.Error()) - } -} func TestInitializeMapFuncDerivative(t *testing.T) { for _, fn := range []string{"derivative", "non_negative_derivative"} { - // No args should fail - c := &Call{ - Name: fn, - Args: []Expr{}, - } - - _, err := InitializeMapFunc(c) - if err == nil { - t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) - } - // Single field arg should return MapEcho - c = &Call{ + c := &Call{ Name: fn, Args: []Expr{ &VarRef{Val: " field1"}, @@ -127,7 +107,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { }, } - _, err = InitializeMapFunc(c) + _, err := InitializeMapFunc(c) if err != nil { t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) } @@ -148,48 +128,14 @@ func TestInitializeMapFuncDerivative(t *testing.T) { } } -func TestInitializeReduceFuncPercentile(t *testing.T) { - // No args - c := &Call{ - Name: "percentile", - Args: []Expr{}, - } - _, err := InitializeReduceFunc(c) - if err == nil { - t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) - } - - if exp := "expected float argument in percentile()"; err.Error() != exp { - t.Errorf("InitializedReduceFunc(%v) mismatch. exp %v got %v", c, exp, err.Error()) - } - - // No percentile arg - c = &Call{ - Name: "percentile", - Args: []Expr{ - &VarRef{Val: "field1"}, - }, - } - - _, err = InitializeReduceFunc(c) - if err == nil { - t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) - } - - if exp := "expected float argument in percentile()"; err.Error() != exp { - t.Errorf("InitializedReduceFunc(%v) mismatch. exp %v got %v", c, exp, err.Error()) - } -} - func TestReducePercentileNil(t *testing.T) { - // ReducePercentile should ignore nil values when calculating the percentile - fn := ReducePercentile(100) input := []interface{}{ nil, } - got := fn(input) + // ReducePercentile should ignore nil values when calculating the percentile + got := ReducePercentile(input, &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 100}}}) if got != nil { t.Fatalf("ReducePercentile(100) returned wrong type. exp nil got %v", got) } @@ -212,16 +158,16 @@ func TestMapDistinct(t *testing.T) { iter := &testIterator{ values: []point{ - {seriesKey1, timeId1, uint64(1)}, - {seriesKey1, timeId2, uint64(1)}, - {seriesKey1, timeId3, "1"}, - {seriesKey2, timeId4, uint64(1)}, - {seriesKey2, timeId5, float64(1.0)}, - {seriesKey2, timeId6, "1"}, + {seriesKey1, timeId1, uint64(1), nil}, + {seriesKey1, timeId2, uint64(1), nil}, + {seriesKey1, timeId3, "1", nil}, + {seriesKey2, timeId4, uint64(1), nil}, + {seriesKey2, timeId5, float64(1.0), nil}, + {seriesKey2, timeId6, "1", nil}, }, } - values := MapDistinct(iter).(distinctValues) + values := MapDistinct(iter).(interfaceValues) if exp, got := 3, len(values); exp != got { t.Errorf("Wrong number of values. exp %v got %v", exp, got) @@ -229,7 +175,7 @@ func TestMapDistinct(t *testing.T) { sort.Sort(values) - exp := distinctValues{ + exp := interfaceValues{ uint64(1), float64(1), "1", @@ -253,7 +199,7 @@ func TestMapDistinctNil(t *testing.T) { } func TestReduceDistinct(t *testing.T) { - v1 := distinctValues{ + v1 := interfaceValues{ "2", "1", float64(2.0), @@ -264,7 +210,7 @@ func TestReduceDistinct(t *testing.T) { false, } - expect := distinctValues{ + expect := interfaceValues{ uint64(1), float64(1), uint64(2), @@ -301,11 +247,11 @@ func TestReduceDistinctNil(t *testing.T) { }, { name: "empty mappper (len 1)", - values: []interface{}{distinctValues{}}, + values: []interface{}{interfaceValues{}}, }, { name: "empty mappper (len 2)", - values: []interface{}{distinctValues{}, distinctValues{}}, + values: []interface{}{interfaceValues{}, interfaceValues{}}, }, } @@ -319,7 +265,7 @@ func TestReduceDistinctNil(t *testing.T) { } func Test_distinctValues_Sort(t *testing.T) { - values := distinctValues{ + values := interfaceValues{ "2", "1", float64(2.0), @@ -330,7 +276,7 @@ func Test_distinctValues_Sort(t *testing.T) { false, } - expect := distinctValues{ + expect := interfaceValues{ uint64(1), float64(1), uint64(2), @@ -366,13 +312,13 @@ func TestMapCountDistinct(t *testing.T) { iter := &testIterator{ values: []point{ - {seriesKey1, timeId1, uint64(1)}, - {seriesKey1, timeId2, uint64(1)}, - {seriesKey1, timeId3, "1"}, - {seriesKey2, timeId4, uint64(1)}, - {seriesKey2, timeId5, float64(1.0)}, - {seriesKey2, timeId6, "1"}, - {seriesKey2, timeId7, true}, + {seriesKey1, timeId1, uint64(1), nil}, + {seriesKey1, timeId2, uint64(1), nil}, + {seriesKey1, timeId3, "1", nil}, + {seriesKey2, timeId4, uint64(1), nil}, + {seriesKey2, timeId5, float64(1.0), nil}, + {seriesKey2, timeId6, "1", nil}, + {seriesKey2, timeId7, true, nil}, }, } @@ -532,3 +478,276 @@ func BenchmarkGetSortedRangeBySort(b *testing.B) { } benchGetSortedRangeResults = results } + +func TestMapTop(t *testing.T) { + tests := []struct { + name string + skip bool + iter *testIterator + exp positionOut + call *Call + }{ + { + name: "int64 - basic", + iter: &testIterator{ + values: []point{ + {"", 10, int64(99), map[string]string{"host": "a"}}, + {"", 10, int64(53), map[string]string{"host": "b"}}, + {"", 20, int64(88), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + points: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(88), map[string]string{"host": "a"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - basic with tag", + iter: &testIterator{ + values: []point{ + {"", 10, int64(99), map[string]string{"host": "a"}}, + {"", 20, int64(53), map[string]string{"host": "b"}}, + {"", 30, int64(88), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + callArgs: []string{"host"}, + points: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(53), map[string]string{"host": "b"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - tie on value, resolve based on time", + iter: &testIterator{ + values: []point{ + {"", 20, int64(99), map[string]string{"host": "a"}}, + {"", 10, int64(53), map[string]string{"host": "a"}}, + {"", 10, int64(99), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + callArgs: []string{"host"}, + points: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(99), map[string]string{"host": "a"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - tie on value, time, resolve based on tags", + iter: &testIterator{ + values: []point{ + {"", 10, int64(99), map[string]string{"host": "b"}}, + {"", 10, int64(99), map[string]string{"host": "a"}}, + {"", 20, int64(88), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + callArgs: []string{"host"}, + points: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{10, int64(99), map[string]string{"host": "b"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "mixed numerics - ints", + iter: &testIterator{ + values: []point{ + {"", 10, int64(99), map[string]string{"host": "a"}}, + {"", 10, int64(53), map[string]string{"host": "b"}}, + {"", 20, uint64(88), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + points: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, uint64(88), map[string]string{"host": "a"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "mixed numerics - ints & floats", + iter: &testIterator{ + values: []point{ + {"", 10, float64(99), map[string]string{"host": "a"}}, + {"", 10, int64(53), map[string]string{"host": "b"}}, + {"", 20, uint64(88), map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + points: PositionPoints{ + PositionPoint{10, float64(99), map[string]string{"host": "a"}}, + PositionPoint{20, uint64(88), map[string]string{"host": "a"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "mixed numerics - ints, floats, & strings", + iter: &testIterator{ + values: []point{ + {"", 10, float64(99), map[string]string{"host": "a"}}, + {"", 10, int64(53), map[string]string{"host": "b"}}, + {"", 20, "88", map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + points: PositionPoints{ + PositionPoint{10, float64(99), map[string]string{"host": "a"}}, + PositionPoint{10, int64(53), map[string]string{"host": "b"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "bools", + iter: &testIterator{ + values: []point{ + {"", 10, true, map[string]string{"host": "a"}}, + {"", 10, true, map[string]string{"host": "b"}}, + {"", 20, false, map[string]string{"host": "a"}}, + }, + }, + exp: positionOut{ + points: PositionPoints{ + PositionPoint{10, true, map[string]string{"host": "a"}}, + PositionPoint{10, true, map[string]string{"host": "b"}}, + }, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + } + + for _, test := range tests { + if test.skip { + continue + } + values := MapTop(test.iter, test.call).(PositionPoints) + t.Logf("Test: %s", test.name) + if exp, got := len(test.exp.points), len(values); exp != got { + t.Errorf("Wrong number of values. exp %v got %v", exp, got) + } + if !reflect.DeepEqual(values, test.exp.points) { + t.Errorf("Wrong values. \nexp\n %v\ngot\n %v", spew.Sdump(test.exp.points), spew.Sdump(values)) + } + } +} + +func TestReduceTop(t *testing.T) { + tests := []struct { + name string + skip bool + values []interface{} + exp PositionPoints + call *Call + }{ + { + name: "int64 - single map", + values: []interface{}{ + PositionPoints{ + {10, int64(99), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, + {20, int64(88), map[string]string{"host": "a"}}, + }, + }, + exp: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(88), map[string]string{"host": "a"}}, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - double map", + values: []interface{}{ + PositionPoints{ + {10, int64(99), map[string]string{"host": "a"}}, + }, + PositionPoints{ + {10, int64(53), map[string]string{"host": "b"}}, + {20, int64(88), map[string]string{"host": "a"}}, + }, + }, + exp: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(88), map[string]string{"host": "a"}}, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - double map with nil", + values: []interface{}{ + PositionPoints{ + {10, int64(99), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, + {20, int64(88), map[string]string{"host": "a"}}, + }, + nil, + }, + exp: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(88), map[string]string{"host": "a"}}, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + { + name: "int64 - double map with non-matching tags and tag selected", + values: []interface{}{ + PositionPoints{ + {10, int64(99), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, + {20, int64(88), map[string]string{}}, + }, + nil, + }, + exp: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(88), map[string]string{}}, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}}, + }, + { + skip: true, + name: "int64 - double map with non-matching tags", + values: []interface{}{ + PositionPoints{ + {10, int64(99), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, + {20, int64(88), map[string]string{}}, + }, + nil, + }, + exp: PositionPoints{ + PositionPoint{10, int64(99), map[string]string{"host": "a"}}, + PositionPoint{20, int64(55), map[string]string{"host": "b"}}, + }, + call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}}, + }, + } + + for _, test := range tests { + if test.skip { + continue + } + values := ReduceTop(test.values, test.call) + t.Logf("Test: %s", test.name) + if values != nil { + v, _ := values.(PositionPoints) + if exp, got := len(test.exp), len(v); exp != got { + t.Errorf("Wrong number of values. exp %v got %v", exp, got) + } + } + if !reflect.DeepEqual(values, test.exp) { + t.Errorf("Wrong values. \nexp\n %v\ngot\n %v", spew.Sdump(test.exp), spew.Sdump(values)) + } + } +} diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 2df5104d81e..80ed8950725 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -213,6 +213,65 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // select percentile statements + { + s: `select percentile("field1", 2.0) from cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "percentile", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2.0}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + // select top statements + { + s: `select top("field1", 2) from cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + { + s: `select top(field1, 2) from cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + { + s: `select top(field1, 2), tag1 from cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}}, + {Expr: &influxql.VarRef{Val: "tag1"}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + + { + s: `select top(field1, tag1, 2), tag1 from cpu`, + stmt: &influxql.SelectStatement{ + IsRawQuery: false, + Fields: []*influxql.Field{ + {Expr: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "tag1"}, &influxql.NumberLiteral{Val: 2}}}}, + {Expr: &influxql.VarRef{Val: "tag1"}}, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + }, + }, + // select distinct statements { s: `select distinct(field1) from cpu`, @@ -1252,6 +1311,21 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT field1 FROM myseries GROUP`, err: `found EOF, expected BY at line 1, char 35`}, {s: `SELECT field1 FROM myseries LIMIT`, err: `found EOF, expected number at line 1, char 35`}, {s: `SELECT field1 FROM myseries LIMIT 10.5`, err: `fractional parts not allowed in LIMIT at line 1, char 35`}, + {s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`}, + {s: `SELECT top(field1) FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 1`}, + {s: `SELECT top(field1,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`}, + {s: `SELECT top(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`}, + {s: `SELECT top(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`}, + {s: `SELECT top(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`}, + {s: `SELECT bottom() FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 0`}, + {s: `SELECT bottom(field1) FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 1`}, + {s: `SELECT bottom(field1,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`}, + {s: `SELECT bottom(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`}, + {s: `SELECT bottom(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`}, + {s: `SELECT bottom(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`}, + {s: `SELECT percentile() FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 0`}, + {s: `SELECT percentile(field1) FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 1`}, + {s: `SELECT percentile(field1, foo) FROM myseries`, err: `expected float argument in percentile()`}, {s: `SELECT field1 FROM myseries OFFSET`, err: `found EOF, expected number at line 1, char 36`}, {s: `SELECT field1 FROM myseries OFFSET 10.5`, err: `fractional parts not allowed in OFFSET at line 1, char 36`}, {s: `SELECT field1 FROM myseries ORDER`, err: `found EOF, expected BY at line 1, char 35`}, @@ -1268,7 +1342,6 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT field1 FROM 12`, err: `found 12, expected identifier at line 1, char 20`}, {s: `SELECT 1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 FROM myseries`, err: `unable to parse number at line 1, char 8`}, {s: `SELECT 10.5h FROM myseries`, err: `found h, expected FROM at line 1, char 12`}, - {s: `SELECT derivative(field1), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `SELECT distinct(field1), sum(field1) FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`}, {s: `SELECT distinct(field1), field2 FROM myseries`, err: `aggregate function distinct() can not be combined with other functions or fields`}, {s: `SELECT distinct(field1, field2) FROM myseries`, err: `distinct function can only have one argument`}, @@ -1279,8 +1352,12 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT count(distinct field1, field2) FROM myseries`, err: `count(distinct ) can only have one argument`}, {s: `select count(distinct(too, many, arguments)) from myseries`, err: `count(distinct ) can only have one argument`}, {s: `select count() from myseries`, err: `invalid number of arguments for count, expected 1, got 0`}, + {s: `SELECT derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, {s: `select derivative() from myseries`, err: `invalid number of arguments for derivative, expected at least 1 but no more than 2, got 0`}, {s: `select derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for derivative, expected at least 1 but no more than 2, got 3`}, + {s: `SELECT non_negative_derivative(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, + {s: `select non_negative_derivative() from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 0`}, + {s: `select non_negative_derivative(mean(value), 1h, 3) from myseries`, err: `invalid number of arguments for non_negative_derivative, expected at least 1 but no more than 2, got 3`}, {s: `SELECT field1 from myseries WHERE host =~ 'asd' LIMIT 1`, err: `found asd, expected regex at line 1, char 42`}, {s: `SELECT value > 2 FROM cpu`, err: `invalid operator > in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, {s: `SELECT value = 2 FROM cpu`, err: `invalid operator = in SELECT clause at line 1, char 8; operator is intended for WHERE clause`}, diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index a22610ea4a9..7ad290084aa 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -625,13 +625,13 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery(t *testing.T) t.Fatalf("unexpected database: %s", database) } else if name != "cq0" { t.Fatalf("unexpected name: %s", name) - } else if query != `CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END` { + } else if query != `CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END` { t.Fatalf("unexpected query: %s", query) } return nil } - stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`) + stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END`) if res := e.ExecuteStatement(stmt); res.Err != nil { t.Fatal(res.Err) } else if res.Series != nil { @@ -646,7 +646,7 @@ func TestStatementExecutor_ExecuteStatement_CreateContinuousQuery_Err(t *testing return errors.New("marker") } - stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(*) INTO db1 FROM db0 GROUP BY time(1h) END`) + stmt := influxql.MustParseStatement(`CREATE CONTINUOUS QUERY cq0 ON db0 BEGIN SELECT count(field1) INTO db1 FROM db0 GROUP BY time(1h) END`) if res := e.ExecuteStatement(stmt); res.Err == nil || res.Err.Error() != "marker" { t.Fatalf("unexpected error: %s", res.Err) } @@ -693,14 +693,14 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries(t *testing.T) { Name: "db0", ContinuousQueries: []meta.ContinuousQueryInfo{ - {Name: "cq0", Query: "SELECT count(*) INTO db1 FROM db0"}, - {Name: "cq1", Query: "SELECT count(*) INTO db2 FROM db0"}, + {Name: "cq0", Query: "SELECT count(field1) INTO db1 FROM db0"}, + {Name: "cq1", Query: "SELECT count(field1) INTO db2 FROM db0"}, }, }, { Name: "db1", ContinuousQueries: []meta.ContinuousQueryInfo{ - {Name: "cq2", Query: "SELECT count(*) INTO db3 FROM db1"}, + {Name: "cq2", Query: "SELECT count(field1) INTO db3 FROM db1"}, }, }, }, nil @@ -714,15 +714,15 @@ func TestStatementExecutor_ExecuteStatement_ShowContinuousQueries(t *testing.T) Name: "db0", Columns: []string{"name", "query"}, Values: [][]interface{}{ - {"cq0", "SELECT count(*) INTO db1 FROM db0"}, - {"cq1", "SELECT count(*) INTO db2 FROM db0"}, + {"cq0", "SELECT count(field1) INTO db1 FROM db0"}, + {"cq1", "SELECT count(field1) INTO db2 FROM db0"}, }, }, { Name: "db1", Columns: []string{"name", "query"}, Values: [][]interface{}{ - {"cq2", "SELECT count(*) INTO db3 FROM db1"}, + {"cq2", "SELECT count(field1) INTO db3 FROM db1"}, }, }, }) { @@ -755,7 +755,7 @@ func TestStatementExecutor_ExecuteStatement_Unsupported(t *testing.T) { // Execute a SELECT statement. NewStatementExecutor().ExecuteStatement( - influxql.MustParseStatement(`SELECT count(*) FROM db0`), + influxql.MustParseStatement(`SELECT count(field1) FROM db0`), ) }() diff --git a/pkg/slices/strings.go b/pkg/slices/strings.go new file mode 100644 index 00000000000..16d6a13f748 --- /dev/null +++ b/pkg/slices/strings.go @@ -0,0 +1,37 @@ +package slices + +import "strings" + +func Union(setA, setB []string, ignoreCase bool) []string { + for _, b := range setB { + if ignoreCase { + if !ExistsIgnoreCase(setA, b) { + setA = append(setA, b) + } + continue + } + if !Exists(setA, b) { + setA = append(setA, b) + } + } + return setA +} + +func Exists(set []string, find string) bool { + for _, s := range set { + if s == find { + return true + } + } + return false +} + +func ExistsIgnoreCase(set []string, find string) bool { + find = strings.ToLower(find) + for _, s := range set { + if strings.ToLower(s) == find { + return true + } + } + return false +} diff --git a/tsdb/executor.go b/tsdb/executor.go index 5880f0a31cf..1ece6ccd60e 100644 --- a/tsdb/executor.go +++ b/tsdb/executor.go @@ -397,11 +397,7 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) { } // Put together the rows to return, starting with columns. - columnNames := make([]string, len(e.stmt.Fields)+1) - columnNames[0] = "time" - for i, f := range e.stmt.Fields { - columnNames[i+1] = f.Name() - } + columnNames := e.stmt.ColumnNames() // Open the mappers. for _, m := range e.mappers { @@ -528,6 +524,12 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) { } } + // Perform top/bottom unwraps + values, err = e.processTopBottom(values, columnNames) + if err != nil { + out <- &influxql.Row{Err: err} + } + // Perform any mathematics. values = processForMath(e.stmt.Fields, values) @@ -622,6 +624,67 @@ func (e *SelectExecutor) close() { } } +func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames []string) ([][]interface{}, error) { + aggregates := e.stmt.FunctionCalls() + var call *influxql.Call + process := false + for _, c := range aggregates { + if c.Name == "top" || c.Name == "bottom" { + process = true + call = c + break + } + } + if !process { + return results, nil + } + var values [][]interface{} + + // Check if we have a group by, if not, rewrite the entire result by flattening it out + //if len(e.stmt.Dimensions) == 0 { + for _, vals := range results { + // start at 1 because the first value is always time + for j := 1; j < len(vals); j++ { + switch v := vals[j].(type) { + case influxql.PositionPoints: + tMin := vals[0].(time.Time) + for _, p := range v { + result := e.topBottomPointToQueryResult(p, tMin, call, columnNames) + values = append(values, result) + } + case nil: + continue + default: + return nil, fmt.Errorf("unrechable code - processTopBottom") + } + } + } + return values, nil +} + +func (e *SelectExecutor) topBottomPointToQueryResult(p influxql.PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} { + tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano) + // If we didn't explicity ask for time, and we have a group by, then use TMIN for the time returned + if len(e.stmt.Dimensions) > 0 && !e.stmt.HasTimeFieldSpecified() { + tm = tMin.UTC().Format(time.RFC3339Nano) + } + vals := []interface{}{tm} + for _, c := range columnNames { + if c == call.Name { + vals = append(vals, p.Value) + continue + } + // TODO in the future fields will also be available to us. + // we should always favor fields over tags if there is a name collision + + // look in the tags for a value + if t, ok := p.Tags[c]; ok { + vals = append(vals, t) + } + } + return vals +} + // limitedRowWriter accepts raw mapper values, and will emit those values as rows in chunks // of the given size. If the chunk size is 0, no chunking will be performed. In addiiton if // limit is reached, outstanding values will be emitted. If limit is zero, no limit is enforced. diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 052bdafb25e..c7cc75e612c 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -343,7 +343,7 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) { } cursor := lm.cursors[lm.currCursorIndex] - k, v, t := cursor.Next(lm.queryTMin, lm.queryTMax, lm.selectFields, lm.whereFields) + k, v := cursor.Next(lm.queryTMin, lm.queryTMax, lm.selectFields, lm.whereFields) if v == nil { // Tagset cursor is empty, move to next one. lm.currCursorIndex++ @@ -363,7 +363,7 @@ func (lm *SelectMapper) nextChunkRaw() (interface{}, error) { cursorKey: cursor.key(), } } - value := &MapperValue{Time: k, Value: v, Tags: t} + value := &MapperValue{Time: k, Value: v, Tags: cursor.Tags()} output.Values = append(output.Values, value) if len(output.Values) == lm.chunkSize { return output, nil @@ -435,13 +435,29 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { heap.Push(tsc.pointHeap, p) } // Wrap the tagset cursor so it implements the mapping functions interface. - f := func() (time int64, value interface{}) { - k, v, _ := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields) + nextf := func() (time int64, value interface{}) { + k, v := tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields) return k, v } + tagf := func() map[string]string { + return tsc.Tags() + } + + tminf := func() int64 { + if len(lm.selectStmt.Dimensions) == 0 { + return -1 + } + if !lm.selectStmt.HasTimeFieldSpecified() { + return tmin + } + return -1 + } + tagSetCursor := &aggTagSetCursor{ - nextFunc: f, + nextFunc: nextf, + tagsFunc: tagf, + tMinFunc: tminf, } // Execute the map function which walks the entire interval, and aggregates @@ -619,6 +635,8 @@ func (lm *SelectMapper) Close() { // by intervals. type aggTagSetCursor struct { nextFunc func() (time int64, value interface{}) + tagsFunc func() map[string]string + tMinFunc func() int64 } // Next returns the next value for the aggTagSetCursor. It implements the interface expected @@ -627,6 +645,16 @@ func (a *aggTagSetCursor) Next() (time int64, value interface{}) { return a.nextFunc() } +// Tags returns the current tags for the cursor +func (a *aggTagSetCursor) Tags() map[string]string { + return a.tagsFunc() +} + +// TMin returns the current floor time for the bucket being worked on +func (a *aggTagSetCursor) TMin() int64 { + return a.tMinFunc() +} + type pointHeapItem struct { timestamp int64 value []byte @@ -670,6 +698,7 @@ type tagSetCursor struct { tags map[string]string // Tag key-value pairs cursors []*seriesCursor // Underlying series cursors. decoder *FieldCodec // decoder for the raw data bytes + currentTags map[string]string // the current tags for the underlying series cursor in play // pointHeap is a min-heap, ordered by timestamp, that contains the next // point from each seriesCursor. Queries sometimes pull points from @@ -723,11 +752,11 @@ func (tsc *tagSetCursor) key() string { // Next returns the next matching series-key, timestamp byte slice and meta tags for the tagset. Filtering // is enforced on the values. If there is no matching value, then a nil result is returned. -func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}, map[string]string) { +func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}) { for { // If we're out of points, we're done. if tsc.pointHeap.Len() == 0 { - return -1, nil, nil + return -1, nil } // Grab the next point with the lowest timestamp. @@ -735,13 +764,16 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri // We're done if the point is outside the query's time range [tmin:tmax). if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) { - return -1, nil, nil + return -1, nil } // Decode the raw point. value := tsc.decodeRawPoint(p, selectFields, whereFields) timestamp := p.timestamp - tags := p.cursor.tags + + // Keep track of the current tags for the series cursor so we can + // respond with them if asked + tsc.currentTags = p.cursor.tags // Advance the cursor nextKey, nextVal := p.cursor.Next() @@ -759,10 +791,16 @@ func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []stri continue } - return timestamp, value, tags + return timestamp, value } } +// Tags returns the current tags of the current cursor +// if there is no current currsor, it returns nil +func (tsc *tagSetCursor) Tags() map[string]string { + return tsc.currentTags +} + // decodeRawPoint decodes raw point data into field names & values and does WHERE filtering. func (tsc *tagSetCursor) decodeRawPoint(p *pointHeapItem, selectFields, whereFields []string) interface{} { if len(selectFields) > 1 {