Skip to content

Commit

Permalink
optimize SHOW TAG VALUES
Browse files Browse the repository at this point in the history
This commit optimizes `SHOW TAG VALUES` so that it avoids the
`SELECT` query engine execution and iterator creation. There
are also optimizations to reduce individual memory allocations
and to reduce in-memory heap size by only operating on one
measurement at a time.

Execution time has been reduce to approximately 900ms for
500,000 rows. This is about 2µs per row. Of this time,
approximately 1µs is spent retrieving and sorting the row
and 1µs is spent encoding into JSON and writing to the
response body.
  • Loading branch information
benbjohnson committed Jun 6, 2016
1 parent 7da0638 commit 1b94cd2
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 137 deletions.
148 changes: 148 additions & 0 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sort"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb"
Expand Down Expand Up @@ -403,6 +404,12 @@ func (e *StatementExecutor) executeSetPasswordUserStatement(q *influxql.SetPassw
}

func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
// Handle SHOW TAG VALUES separately so it can be optimized.
// https://github.com/influxdata/influxdb/issues/6233
if source, ok := stmt.Sources[0].(*influxql.Measurement); ok && source.Name == "_tags" {
return e.executeShowTagValues(stmt, ctx)
}

// It is important to "stamp" this time so that everywhere we evaluate `now()` in the statement is EXACTLY the same `now`
now := time.Now().UTC()
opt := influxql.SelectOptions{InterruptCh: ctx.InterruptCh}
Expand Down Expand Up @@ -597,6 +604,130 @@ func (e *StatementExecutor) iteratorCreator(stmt *influxql.SelectStatement, opt
return e.TSDBStore.IteratorCreator(shards)
}

func (e *StatementExecutor) executeShowTagValues(stmt *influxql.SelectStatement, ctx *influxql.ExecutionContext) error {
if stmt.Condition == nil {
return errors.New("a condition is required")
}

source := stmt.Sources[0].(*influxql.Measurement)
index := e.TSDBStore.DatabaseIndex(source.Database)
if index == nil {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}

measurementExpr := influxql.CloneExpr(stmt.Condition)
measurementExpr = influxql.Reduce(influxql.RewriteExpr(measurementExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || tag.Val != "_name" {
return nil
}
}
}
return e
}), nil)

mms, ok, err := index.MeasurementsByExpr(measurementExpr)
if err != nil {
return err
} else if !ok {
mms = index.Measurements()
sort.Sort(mms)
}

// If there are no measurements, return immediately.
if len(mms) == 0 {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
return nil
}

filterExpr := influxql.CloneExpr(stmt.Condition)
filterExpr = influxql.Reduce(influxql.RewriteExpr(filterExpr, func(e influxql.Expr) influxql.Expr {
switch e := e.(type) {
case *influxql.BinaryExpr:
switch e.Op {
case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX:
tag, ok := e.LHS.(*influxql.VarRef)
if !ok || strings.HasPrefix(tag.Val, "_") {
return nil
}
}
}
return e
}), nil)

var emitted bool
columns := stmt.ColumnNames()
for _, mm := range mms {
ids, err := mm.SeriesIDsAllOrByExpr(filterExpr)
if err != nil {
return err
}
ss := mm.SeriesByIDSlice(ids)

// Determine a list of keys from condition.
keySet, ok, err := mm.TagKeysByExpr(stmt.Condition)
if err != nil {
return err
}

// Loop over all keys for each series.
m := make(map[keyValue]struct{}, len(ss))
for _, series := range ss {
for key, value := range series.Tags {
if !ok {
// nop
} else if _, exists := keySet[key]; !exists {
continue
}
m[keyValue{key, value}] = struct{}{}
}
}

// Move to next series if no key/values match.
if len(m) == 0 {
continue
}

// Sort key/value set.
a := make([]keyValue, 0, len(m))
for kv := range m {
a = append(a, kv)
}
sort.Sort(keyValues(a))

// Convert to result values.
slab := make([]interface{}, len(a)*2)
values := make([][]interface{}, len(a))
for i, elem := range a {
slab[i*2], slab[i*2+1] = elem.key, elem.value
values[i] = slab[i*2 : i*2+2]
}

// Send result to client.
ctx.Results <- &influxql.Result{
StatementID: ctx.StatementID,
Series: []*models.Row{&models.Row{
Name: mm.Name,
Columns: columns,
Values: values,
}},
}
emitted = true
}

// Always emit at least one row.
if !emitted {
ctx.Results <- &influxql.Result{StatementID: ctx.StatementID, Series: make([]*models.Row, 0)}
}

return nil
}

func (e *StatementExecutor) executeShowContinuousQueriesStatement(stmt *influxql.ShowContinuousQueriesStatement) (models.Rows, error) {
dis := e.MetaClient.Databases()

Expand Down Expand Up @@ -1008,6 +1139,7 @@ type TSDBStore interface {
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
DatabaseIndex(name string) *tsdb.DatabaseIndex
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}
Expand Down Expand Up @@ -1105,3 +1237,19 @@ type uint64Slice []uint64
func (a uint64Slice) Len() int { return len(a) }
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }

type keyValue struct {
key, value string
}

type keyValues []keyValue

func (a keyValues) Len() int { return len(a) }
func (a keyValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a keyValues) Less(i, j int) bool {
ki, kj := a[i].key, a[j].key
if ki == kj {
return a[i].value < a[j].value
}
return ki < kj
}
6 changes: 6 additions & 0 deletions coordinator/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)

const (
Expand Down Expand Up @@ -200,6 +201,7 @@ type TSDBStore struct {
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DatabaseIndexFn func(name string) *tsdb.DatabaseIndex
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}

Expand Down Expand Up @@ -267,6 +269,10 @@ func (s *TSDBStore) ShardIteratorCreator(id uint64) influxql.IteratorCreator {
return s.ShardIteratorCreatorFn(id)
}

func (s *TSDBStore) DatabaseIndex(name string) *tsdb.DatabaseIndex {
return s.DatabaseIndexFn(name)
}

// MustParseQuery parses s into a query. Panic on error.
func MustParseQuery(s string) *influxql.Query {
q, err := influxql.ParseQuery(s)
Expand Down
35 changes: 29 additions & 6 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,18 @@ func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
return ss.Tags
}

// measurementsByExpr takes an expression containing only tags and returns a
// MeasurementsByExpr takes an expression containing only tags and returns a
// list of matching *Measurement. The bool return argument returns if the
// expression was a measurement expression. It is used to differentiate a list
// of no measurements because all measurements were filtered out (when the bool
// is true) against when there are no measurements because the expression
// wasn't evaluated (when the bool is false).
func (d *DatabaseIndex) MeasurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurementsByExpr(expr)
}

func (d *DatabaseIndex) measurementsByExpr(expr influxql.Expr) (Measurements, bool, error) {
if expr == nil {
return nil, false, nil
Expand Down Expand Up @@ -538,6 +544,17 @@ func (m *Measurement) SeriesByID(id uint64) *Series {
return m.seriesByID[id]
}

// SeriesByIDSlice returns a list of series by identifiers.
func (m *Measurement) SeriesByIDSlice(ids []uint64) []*Series {
m.mu.RLock()
defer m.mu.RUnlock()
a := make([]*Series, len(ids))
for i, id := range ids {
a[i] = m.seriesByID[id]
}
return a
}

// AppendSeriesKeysByID appends keys for a list of series ids to a buffer.
func (m *Measurement) AppendSeriesKeysByID(dst []string, ids []uint64) []string {
m.mu.RLock()
Expand Down Expand Up @@ -1121,8 +1138,14 @@ func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr,
return exprs
}

// seriesIDsAllOrByExpr walks an expressions for matching series IDs
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expressions is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.seriesIDsAllOrByExpr(expr)
}

func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
// If no expression given or the measurement has no series,
// we can take just return the ids or nil accordingly.
Expand All @@ -1142,7 +1165,7 @@ func (m *Measurement) seriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error
}

// tagKeysByExpr extracts the tag keys wanted by the expression.
func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
func (m *Measurement) TagKeysByExpr(expr influxql.Expr) (stringSet, bool, error) {
switch e := expr.(type) {
case *influxql.BinaryExpr:
switch e.Op {
Expand Down Expand Up @@ -1175,12 +1198,12 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
}
return m.tagKeysByFilter(tf.Op, tf.Value, tf.Regex), true, nil
case influxql.AND, influxql.OR:
lhsKeys, lhsOk, err := m.tagKeysByExpr(e.LHS)
lhsKeys, lhsOk, err := m.TagKeysByExpr(e.LHS)
if err != nil {
return nil, false, err
}

rhsKeys, rhsOk, err := m.tagKeysByExpr(e.RHS)
rhsKeys, rhsOk, err := m.TagKeysByExpr(e.RHS)
if err != nil {
return nil, false, err
}
Expand All @@ -1201,7 +1224,7 @@ func (m *Measurement) tagKeysByExpr(expr influxql.Expr) (stringSet, bool, error)
return nil, false, fmt.Errorf("invalid operator")
}
case *influxql.ParenExpr:
return m.tagKeysByExpr(e.Expr)
return m.TagKeysByExpr(e.Expr)
}
return nil, false, fmt.Errorf("%#v", expr)
}
Expand Down
Loading

0 comments on commit 1b94cd2

Please sign in to comment.