From 59663999d434fa593d7987d669f49cdb49db2719 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jul 2020 16:05:25 -0400 Subject: [PATCH 1/3] [dbnode] Always use block aggregate query for aggregate queries and intersect term results with query postings list (#2441) --- src/dbnode/storage/index.go | 23 +-- src/dbnode/storage/index/block.go | 6 +- .../storage/index/fields_terms_iterator.go | 149 ++++++++++++++---- src/dbnode/storage/index/types.go | 4 + src/dbnode/storage/index_block_test.go | 10 +- 5 files changed, 146 insertions(+), 46 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index e3a3b0e4b8..e95d26190b 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1242,16 +1242,19 @@ func (i *nsIndex) AggregateQuery( } ctx.RegisterFinalizer(results) // use appropriate fn to query underlying blocks. - // default to block.Query() - fn := i.execBlockQueryFn - // use block.Aggregate() when possible - if query.Equal(allQuery) { - fn = i.execBlockAggregateQueryFn - } - field, isField := idx.FieldQuery(query.Query) - if isField { - fn = i.execBlockAggregateQueryFn - aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field) + // use block.Aggregate() for querying and set the query if required. + fn := i.execBlockAggregateQueryFn + isAllQuery := query.Equal(allQuery) + if !isAllQuery { + if field, isFieldQuery := idx.FieldQuery(query.Query); isFieldQuery { + aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field) + } else { + // Need to actually restrict whether we should return a term or not + // based on running the actual query to resolve a postings list and + // then seeing if that intersects the aggregated term postings list + // at all. + aopts.RestrictByQuery = &query + } } aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe() results.Reset(i.nsMetadata.ID(), aopts) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 1c1a7cd603..cbaee2fffc 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -356,6 +356,9 @@ func (b *block) executorWithRLock() (search.Executor, error) { } func (b *block) segmentsWithRLock() []segment.Segment { + // TODO: Also keep the lifetimes of the segments alive, i.e. + // don't let the segments taken ref to here be operated on since + // they could be closed by mutable segments container, etc. numSegments := b.mutableSegments.Len() for _, coldSeg := range b.coldMutableSegments { numSegments += coldSeg.Len() @@ -594,7 +597,8 @@ func (b *block) aggregateWithSpan( aggOpts := results.AggregateResultsOptions() iterateTerms := aggOpts.Type == AggregateTagNamesAndValues iterateOpts := fieldsAndTermsIteratorOpts{ - iterateTerms: iterateTerms, + restrictByQuery: aggOpts.RestrictByQuery, + iterateTerms: iterateTerms, allowFn: func(field []byte) bool { // skip any field names that we shouldn't allow. if bytes.Equal(field, doc.IDReservedFieldName) { diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index 517e0f2c2b..3dfd11ebbd 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -21,15 +21,25 @@ package index import ( + "errors" + "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/postings/roaring" xerrors "github.com/m3db/m3/src/x/errors" + pilosaroaring "github.com/m3dbx/pilosa/roaring" +) + +var ( + errUnpackBitmapFromPostingsList = errors.New("unable to unpack bitmap from postings list") ) // fieldsAndTermsIteratorOpts configures the fieldsAndTermsIterator. type fieldsAndTermsIteratorOpts struct { - iterateTerms bool - allowFn allowFn - fieldIterFn newFieldIterFn + restrictByQuery *Query + iterateTerms bool + allowFn allowFn + fieldIterFn newFieldIterFn } func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { @@ -59,9 +69,12 @@ type fieldsAndTermsIter struct { termIter segment.TermsIterator current struct { - field []byte - term []byte + field []byte + term []byte + postings postings.List } + + restrictByPostings *pilosaroaring.Bitmap } var ( @@ -91,11 +104,56 @@ func (fti *fieldsAndTermsIter) Reset(s segment.Segment, opts fieldsAndTermsItera if s == nil { return nil } + fiter, err := fti.opts.newFieldIter(s) if err != nil { return err } fti.fieldIter = fiter + + if opts.restrictByQuery == nil { + // No need to restrict results by query. + return nil + } + + // If need to restrict by query, run the query on the segment first. + var ( + readerTryClose bool + ) + reader, err := fti.seg.Reader() + if err != nil { + return err + } + + defer func() { + if !readerTryClose { + reader.Close() + } + }() + + searcher, err := opts.restrictByQuery.SearchQuery().Searcher() + if err != nil { + return err + } + + pl, err := searcher.Search(reader) + if err != nil { + return err + } + + // Hold onto the postings bitmap to intersect against on a per term basis. + bitmap, ok := roaring.BitmapFromPostingsList(pl) + if !ok { + return errUnpackBitmapFromPostingsList + } + + readerTryClose = true + if err := reader.Close(); err != nil { + return err + } + + fti.restrictByPostings = bitmap + return nil } @@ -121,47 +179,78 @@ func (fti *fieldsAndTermsIter) setNextField() bool { func (fti *fieldsAndTermsIter) setNext() bool { // check if current field has another term if fti.termIter != nil { - if fti.termIter.Next() { - fti.current.term, _ = fti.termIter.Current() + hasNextTerm, err := fti.nextTermsIterResult() + if err != nil { + fti.err = err + return false + } + if hasNextTerm { return true } - if err := fti.termIter.Err(); err != nil { + } + + // i.e. need to switch to next field + for hasNextField := fti.setNextField(); hasNextField; hasNextField = fti.setNextField() { + // and get next term for the field + var err error + fti.termIter, err = fti.seg.TermsIterable().Terms(fti.current.field) + if err != nil { fti.err = err return false } - if err := fti.termIter.Close(); err != nil { + + hasNextTerm, err := fti.nextTermsIterResult() + if err != nil { fti.err = err return false } + if hasNextTerm { + return true + } } - // i.e. need to switch to next field - hasNext := fti.setNextField() - if !hasNext { - return false - } - - // and get next term for the field - termsIter, err := fti.seg.TermsIterable().Terms(fti.current.field) - if err != nil { + // Check field iterator did not encounter error. + if err := fti.fieldIter.Err(); err != nil { fti.err = err return false } - fti.termIter = termsIter - hasNext = fti.termIter.Next() - if !hasNext { - if fti.fieldIter.Err(); err != nil { - fti.err = err - return false + // No more fields. + return false +} + +func (fti *fieldsAndTermsIter) nextTermsIterResult() (bool, error) { + for fti.termIter.Next() { + fti.current.term, fti.current.postings = fti.termIter.Current() + if fti.restrictByPostings == nil { + // No restrictions. + return true, nil + } + + bitmap, ok := roaring.BitmapFromPostingsList(fti.current.postings) + if !ok { + return false, errUnpackBitmapFromPostingsList } - fti.termIter = nil - // i.e. no more terms for this field, should try the next one - return fti.setNext() - } - fti.current.term, _ = fti.termIter.Current() - return true + // Check term isn part of at least some of the documents we're + // restricted to providing results for based on intersection + // count. + // Note: IntersectionCount is significantly faster than intersecting and + // counting results and also does not allocate. + if n := fti.restrictByPostings.IntersectionCount(bitmap); n > 0 { + // Matches, this is next result. + return true, nil + } + } + if err := fti.termIter.Err(); err != nil { + return false, err + } + if err := fti.termIter.Close(); err != nil { + return false, err + } + // Term iterator no longer relevant, no next. + fti.termIter = nil + return false, nil } func (fti *fieldsAndTermsIter) Next() bool { diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index d32425c66e..394740ff85 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -241,6 +241,10 @@ type AggregateResultsOptions struct { // FieldFilter is an optional param to filter aggregate values. FieldFilter AggregateFieldFilter + + // RestrictByQuery is a query to restrict the set of documents that must + // be present for an aggregated term to be returned. + RestrictByQuery *Query } // AggregateResultsAllocator allocates AggregateResults types. diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index e487d2c48c..fad3bf2073 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -1014,7 +1014,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { } aggOpts := index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Query(gomock.Any(), gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().Aggregate(gomock.Any(), gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) result, err := idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1026,8 +1026,8 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { RequireExhaustive: test.requireExhaustive, } aggOpts = index.AggregationOptions{QueryOptions: qOpts} - b0.EXPECT().Query(gomock.Any(), gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) - b1.EXPECT().Query(gomock.Any(), gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b0.EXPECT().Aggregate(gomock.Any(), gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) + b1.EXPECT().Aggregate(gomock.Any(), gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil) result, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) require.True(t, result.Exhaustive) @@ -1038,7 +1038,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { EndExclusive: t0.Add(time.Minute), RequireExhaustive: test.requireExhaustive, } - b0.EXPECT().Query(gomock.Any(), gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(false, nil) + b0.EXPECT().Aggregate(gomock.Any(), gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(false, nil) aggOpts = index.AggregationOptions{QueryOptions: qOpts} result, err = idx.AggregateQuery(ctx, q, aggOpts) if test.requireExhaustive { @@ -1151,7 +1151,7 @@ func TestNamespaceIndexBlockAggregateQueryReleasingContext(t *testing.T) { gomock.InOrder( mockPool.EXPECT().Get().Return(stubResult), - b0.EXPECT().Query(ctx, gomock.Any(), q, qOpts, gomock.Any(), gomock.Any()).Return(true, nil), + b0.EXPECT().Aggregate(ctx, gomock.Any(), qOpts, gomock.Any(), gomock.Any()).Return(true, nil), mockPool.EXPECT().Put(stubResult), ) _, err = idx.AggregateQuery(ctx, q, aggOpts) From 78a9858da4220aa5892342d2ecbc7da4f96110f4 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 7 Jul 2020 20:11:01 -0400 Subject: [PATCH 2/3] [dbnode] Add TestFieldsTermsIteratorIterateTermsAndRestrictByQuery (#2449) --- ....go => fields_terms_iterator_prop_test.go} | 0 ..._test.go => fields_terms_iterator_test.go} | 75 +++++++++++++++++++ 2 files changed, 75 insertions(+) rename src/dbnode/storage/index/{field_terms_iterator_prop_test.go => fields_terms_iterator_prop_test.go} (100%) rename src/dbnode/storage/index/{field_terms_iterator_test.go => fields_terms_iterator_test.go} (84%) diff --git a/src/dbnode/storage/index/field_terms_iterator_prop_test.go b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go similarity index 100% rename from src/dbnode/storage/index/field_terms_iterator_prop_test.go rename to src/dbnode/storage/index/fields_terms_iterator_prop_test.go diff --git a/src/dbnode/storage/index/field_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go similarity index 84% rename from src/dbnode/storage/index/field_terms_iterator_test.go rename to src/dbnode/storage/index/fields_terms_iterator_test.go index 31ebfc39bb..ad1efa8169 100644 --- a/src/dbnode/storage/index/field_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -28,8 +28,11 @@ import ( "testing" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/idx" + m3ninxindex "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/index/segment/mem" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/util" xtest "github.com/m3db/m3/src/x/test" @@ -174,6 +177,78 @@ func TestFieldsTermsIteratorEmptyTermInclude(t *testing.T) { requireSlicesEqual(t, []pair{}, slice) } +func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { + testDocs := []doc.Document{ + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("banana"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, + }, + }, + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("apple"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("red"), + }, + }, + }, + doc.Document{ + Fields: []doc.Field{ + doc.Field{ + Name: []byte("fruit"), + Value: []byte("pineapple"), + }, + doc.Field{ + Name: []byte("color"), + Value: []byte("yellow"), + }, + }, + }, + } + + seg, err := mem.NewSegment(0, mem.NewOptions()) + require.NoError(t, err) + + require.NoError(t, seg.InsertBatch(m3ninxindex.Batch{ + Docs: testDocs, + AllowPartialUpdates: true, + })) + + require.NoError(t, seg.Seal()) + + fruitRegexp, err := idx.NewRegexpQuery([]byte("fruit"), []byte("^.*apple$")) + require.NoError(t, err) + + colorRegexp, err := idx.NewRegexpQuery([]byte("color"), []byte("^(red|yellow)$")) + require.NoError(t, err) + + iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{ + iterateTerms: true, + restrictByQuery: &Query{ + Query: idx.NewConjunctionQuery(fruitRegexp, colorRegexp), + }, + }) + require.NoError(t, err) + slice := toSlice(t, iter) + requireSlicesEqual(t, []pair{ + pair{"color", "red"}, + pair{"color", "yellow"}, + pair{"fruit", "apple"}, + pair{"fruit", "pineapple"}, + }, slice) +} + func newMockSegment(ctrl *gomock.Controller, tagValues map[string][]string) segment.Segment { fields := make([]iterpoint, 0, len(tagValues)) for k := range tagValues { From daaa0c8ab65fcf5245448db29114cc2b44b03645 Mon Sep 17 00:00:00 2001 From: arnikola Date: Tue, 7 Jul 2020 22:30:53 -0400 Subject: [PATCH 3/3] [query] Carbon fetch for metrics with no separator (#2450) --- .../docker-integration-tests/carbon/test.sh | 5 + src/query/graphite/lexer/lexer.go | 3 + src/query/graphite/native/compiler.go | 91 ++++++++++++++++--- src/query/graphite/native/compiler_test.go | 37 ++++++++ 4 files changed, 122 insertions(+), 14 deletions(-) diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh index d9cb74aa94..d88c88a534 100755 --- a/scripts/docker-integration-tests/carbon/test.sh +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -79,6 +79,11 @@ t=$(date +%s) echo "foo.bar:baz.qux 42 $t" | nc 0.0.0.0 7204 ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon 'foo.bar:*.*' 42 +# Test writing and reading IDs with a single element. +t=$(date +%s) +echo "quail 42 $t" | nc 0.0.0.0 7204 +ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon 'quail' 42 + t=$(date +%s) echo "a 0 $t" | nc 0.0.0.0 7204 echo "a.bar 0 $t" | nc 0.0.0.0 7204 diff --git a/src/query/graphite/lexer/lexer.go b/src/query/graphite/lexer/lexer.go index ddd3f7ee76..2f4597db58 100644 --- a/src/query/graphite/lexer/lexer.go +++ b/src/query/graphite/lexer/lexer.go @@ -103,6 +103,9 @@ type Token struct { value string } +// MustMakeToken is a test function for creating a Token.MustMakeToken. +func MustMakeToken(value string) *Token { return &Token{value: value} } + // TokenType returns the type of token consumed. func (t Token) TokenType() TokenType { return t.tokenType diff --git a/src/query/graphite/native/compiler.go b/src/query/graphite/native/compiler.go index 9469ce4015..616bb21dd0 100644 --- a/src/query/graphite/native/compiler.go +++ b/src/query/graphite/native/compiler.go @@ -39,7 +39,8 @@ func compile(input string) (Expression, error) { lex, tokens := lexer.NewLexer(input, booleanLiterals) go lex.Run() - c := compiler{input: input, tokens: tokens} + lookforward := newTokenLookforward(tokens) + c := compiler{input: input, tokens: lookforward} expr, err := c.compileExpression() // Exhaust all tokens until closed or else lexer won't close @@ -49,15 +50,54 @@ func compile(input string) (Expression, error) { return expr, err } +type tokenLookforward struct { + lookforward *lexer.Token + tokens chan *lexer.Token +} + +func newTokenLookforward(tokens chan *lexer.Token) *tokenLookforward { + return &tokenLookforward{ + tokens: tokens, + } +} + +// get advances the lexer tokens. +func (l *tokenLookforward) get() *lexer.Token { + if token := l.lookforward; token != nil { + l.lookforward = nil + return token + } + + if token, ok := <-l.tokens; ok { + return token + } + + return nil +} + +func (l *tokenLookforward) peek() (*lexer.Token, bool) { + if l.lookforward != nil { + return l.lookforward, true + } + + token, ok := <-l.tokens + if !ok { + return nil, false + } + + l.lookforward = token + return token, true +} + // A compiler converts an input string into an executable Expression type compiler struct { input string - tokens chan *lexer.Token + tokens *tokenLookforward } // compileExpression compiles a top level expression func (c *compiler) compileExpression() (Expression, error) { - token := <-c.tokens + token := c.tokens.get() if token == nil { return noopExpression{}, nil } @@ -69,31 +109,54 @@ func (c *compiler) compileExpression() (Expression, error) { case lexer.Identifier: fc, err := c.compileFunctionCall(token.Value(), nil) + fetchCandidate := false if err != nil { - return nil, err + _, fnNotFound := err.(errFuncNotFound) + if fnNotFound && c.canCompileAsFetch(token.Value()) { + fetchCandidate = true + expr = newFetchExpression(token.Value()) + } else { + return nil, err + } } - expr, err = newFuncExpression(fc) - if err != nil { - return nil, err + if !fetchCandidate { + expr, err = newFuncExpression(fc) + if err != nil { + return nil, err + } } default: return nil, c.errorf("unexpected value %s", token.Value()) } - if token := <-c.tokens; token != nil { + if token := c.tokens.get(); token != nil { return nil, c.errorf("extra data %s", token.Value()) } return expr, nil } +// canCompileAsFetch attempts to see if the given term is a non-delimited +// carbon metric; no dots, without any trailing parentheses. +func (c *compiler) canCompileAsFetch(fname string) bool { + if nextToken, hasNext := c.tokens.peek(); hasNext { + return nextToken.TokenType() != lexer.LParenthesis + } + + return true +} + +type errFuncNotFound struct{ err error } + +func (e errFuncNotFound) Error() string { return e.err.Error() } + // compileFunctionCall compiles a function call func (c *compiler) compileFunctionCall(fname string, nextToken *lexer.Token) (*functionCall, error) { fn := findFunction(fname) if fn == nil { - return nil, c.errorf("could not find function named %s", fname) + return nil, errFuncNotFound{c.errorf("could not find function named %s", fname)} } if nextToken != nil { @@ -158,7 +221,7 @@ func (c *compiler) compileFunctionCall(fname string, nextToken *lexer.Token) (*f // compileArg parses and compiles a single argument func (c *compiler) compileArg(fname string, index int, reflectType reflect.Type) (arg funcArg, foundRParen bool, err error) { - token := <-c.tokens + token := c.tokens.get() if token == nil { return nil, false, c.errorf("unexpected eof while parsing %s", fname) } @@ -173,7 +236,7 @@ func (c *compiler) compileArg(fname string, index int, fname, token.Value()) } - if token = <-c.tokens; token == nil { + if token = c.tokens.get(); token == nil { return nil, false, c.errorf("unexpected eof while parsing %s", fname) } } @@ -219,13 +282,13 @@ func (c *compiler) convertTokenToArg(token *lexer.Token, reflectType reflect.Typ currentToken := token.Value() // handle named arguments - nextToken := <-c.tokens + nextToken := c.tokens.get() if nextToken == nil { return nil, c.errorf("unexpected eof, %s should be followed by = or (", currentToken) } if nextToken.TokenType() == lexer.Equal { // TODO: check if currentToken matches the expected parameter name - tokenAfterNext := <-c.tokens + tokenAfterNext := c.tokens.get() if tokenAfterNext == nil { return nil, c.errorf("unexpected eof, named argument %s should be followed by its value", currentToken) } @@ -240,7 +303,7 @@ func (c *compiler) convertTokenToArg(token *lexer.Token, reflectType reflect.Typ // expectToken reads the next token and confirms it is the expected type before returning it func (c *compiler) expectToken(expectedType lexer.TokenType) (*lexer.Token, error) { - token := <-c.tokens + token := c.tokens.get() if token == nil { return nil, c.errorf("expected %v but encountered eof", expectedType) } diff --git a/src/query/graphite/native/compiler_test.go b/src/query/graphite/native/compiler_test.go index 6149cd0408..61dc7b4cc8 100644 --- a/src/query/graphite/native/compiler_test.go +++ b/src/query/graphite/native/compiler_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/m3db/m3/src/query/graphite/common" + "github.com/m3db/m3/src/query/graphite/lexer" xtest "github.com/m3db/m3/src/query/graphite/testing" "github.com/m3db/m3/src/query/graphite/ts" @@ -56,6 +57,7 @@ func TestCompile1(t *testing.T) { tests := []testCompile{ {"", noopExpression{}}, + {"foobar", newFetchExpression("foobar")}, {"foo.bar.{a,b,c}.baz-*.stat[0-9]", newFetchExpression("foo.bar.{a,b,c}.baz-*.stat[0-9]")}, {"noArgs()", &funcExpression{&functionCall{f: noArgs}}}, @@ -290,6 +292,8 @@ type testCompilerError struct { func TestCompileErrors(t *testing.T) { tests := []testCompilerError{ {"hello()", "top-level functions must return timeseries data"}, + {"foobar(", "invalid expression 'foobar(': could not find function named foobar"}, + {"foobar()", "invalid expression 'foobar()': could not find function named foobar"}, {"sortByName(foo.*.zed)junk", "invalid expression 'sortByName(foo.*.zed)junk': " + "extra data junk"}, {"aliasByNode(", @@ -435,7 +439,40 @@ func TestExtractFetchExpressions(t *testing.T) { require.NoError(t, err) assert.Equal(t, test.targets, targets, test.expr) } +} + +func TestTokenLookforward(t *testing.T) { + tokenVals := []string{"a", "b", "c"} + tokens := make(chan *lexer.Token) + go func() { + for _, v := range tokenVals { + tokens <- lexer.MustMakeToken(v) + } + + close(tokens) + }() + + lookforward := newTokenLookforward(tokens) + token := lookforward.get() + assert.Equal(t, "a", token.Value()) + + // assert that peek does not iterate token. + token, found := lookforward.peek() + assert.True(t, found) + assert.Equal(t, "b", token.Value()) + token, found = lookforward.peek() + assert.True(t, found) + assert.Equal(t, "b", token.Value()) + + // assert that next get after peek will iterate forward. + token = lookforward.get() + assert.Equal(t, "b", token.Value()) + token = lookforward.get() + assert.Equal(t, "c", token.Value()) + // assert peek is empty once channel is closed. + _, found = lookforward.peek() + assert.False(t, found) } func init() {