Skip to content

Commit

Permalink
Merge branch 'master' into ss/default_metadatas
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jul 8, 2020
2 parents 3591d5d + daaa0c8 commit 617d723
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 60 deletions.
5 changes: 5 additions & 0 deletions scripts/docker-integration-tests/carbon/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ t=$(date +%s)
echo "foo.bar:baz.qux 42 $t" | nc 0.0.0.0 7204
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon 'foo.bar:*.*' 42

# Test writing and reading IDs with a single element.
t=$(date +%s)
echo "quail 42 $t" | nc 0.0.0.0 7204
ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon 'quail' 42

t=$(date +%s)
echo "a 0 $t" | nc 0.0.0.0 7204
echo "a.bar 0 $t" | nc 0.0.0.0 7204
Expand Down
23 changes: 13 additions & 10 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,16 +1242,19 @@ func (i *nsIndex) AggregateQuery(
}
ctx.RegisterFinalizer(results)
// use appropriate fn to query underlying blocks.
// default to block.Query()
fn := i.execBlockQueryFn
// use block.Aggregate() when possible
if query.Equal(allQuery) {
fn = i.execBlockAggregateQueryFn
}
field, isField := idx.FieldQuery(query.Query)
if isField {
fn = i.execBlockAggregateQueryFn
aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field)
// use block.Aggregate() for querying and set the query if required.
fn := i.execBlockAggregateQueryFn
isAllQuery := query.Equal(allQuery)
if !isAllQuery {
if field, isFieldQuery := idx.FieldQuery(query.Query); isFieldQuery {
aopts.FieldFilter = aopts.FieldFilter.AddIfMissing(field)
} else {
// Need to actually restrict whether we should return a term or not
// based on running the actual query to resolve a postings list and
// then seeing if that intersects the aggregated term postings list
// at all.
aopts.RestrictByQuery = &query
}
}
aopts.FieldFilter = aopts.FieldFilter.SortAndDedupe()
results.Reset(i.nsMetadata.ID(), aopts)
Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ func (b *block) executorWithRLock() (search.Executor, error) {
}

func (b *block) segmentsWithRLock() []segment.Segment {
// TODO: Also keep the lifetimes of the segments alive, i.e.
// don't let the segments taken ref to here be operated on since
// they could be closed by mutable segments container, etc.
numSegments := b.mutableSegments.Len()
for _, coldSeg := range b.coldMutableSegments {
numSegments += coldSeg.Len()
Expand Down Expand Up @@ -594,7 +597,8 @@ func (b *block) aggregateWithSpan(
aggOpts := results.AggregateResultsOptions()
iterateTerms := aggOpts.Type == AggregateTagNamesAndValues
iterateOpts := fieldsAndTermsIteratorOpts{
iterateTerms: iterateTerms,
restrictByQuery: aggOpts.RestrictByQuery,
iterateTerms: iterateTerms,
allowFn: func(field []byte) bool {
// skip any field names that we shouldn't allow.
if bytes.Equal(field, doc.IDReservedFieldName) {
Expand Down
149 changes: 119 additions & 30 deletions src/dbnode/storage/index/fields_terms_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,25 @@
package index

import (
"errors"

"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/postings/roaring"
xerrors "github.com/m3db/m3/src/x/errors"
pilosaroaring "github.com/m3dbx/pilosa/roaring"
)

var (
errUnpackBitmapFromPostingsList = errors.New("unable to unpack bitmap from postings list")
)

// fieldsAndTermsIteratorOpts configures the fieldsAndTermsIterator.
type fieldsAndTermsIteratorOpts struct {
iterateTerms bool
allowFn allowFn
fieldIterFn newFieldIterFn
restrictByQuery *Query
iterateTerms bool
allowFn allowFn
fieldIterFn newFieldIterFn
}

func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool {
Expand Down Expand Up @@ -59,9 +69,12 @@ type fieldsAndTermsIter struct {
termIter segment.TermsIterator

current struct {
field []byte
term []byte
field []byte
term []byte
postings postings.List
}

restrictByPostings *pilosaroaring.Bitmap
}

var (
Expand Down Expand Up @@ -91,11 +104,56 @@ func (fti *fieldsAndTermsIter) Reset(s segment.Segment, opts fieldsAndTermsItera
if s == nil {
return nil
}

fiter, err := fti.opts.newFieldIter(s)
if err != nil {
return err
}
fti.fieldIter = fiter

if opts.restrictByQuery == nil {
// No need to restrict results by query.
return nil
}

// If need to restrict by query, run the query on the segment first.
var (
readerTryClose bool
)
reader, err := fti.seg.Reader()
if err != nil {
return err
}

defer func() {
if !readerTryClose {
reader.Close()
}
}()

searcher, err := opts.restrictByQuery.SearchQuery().Searcher()
if err != nil {
return err
}

pl, err := searcher.Search(reader)
if err != nil {
return err
}

// Hold onto the postings bitmap to intersect against on a per term basis.
bitmap, ok := roaring.BitmapFromPostingsList(pl)
if !ok {
return errUnpackBitmapFromPostingsList
}

readerTryClose = true
if err := reader.Close(); err != nil {
return err
}

fti.restrictByPostings = bitmap

return nil
}

Expand All @@ -121,47 +179,78 @@ func (fti *fieldsAndTermsIter) setNextField() bool {
func (fti *fieldsAndTermsIter) setNext() bool {
// check if current field has another term
if fti.termIter != nil {
if fti.termIter.Next() {
fti.current.term, _ = fti.termIter.Current()
hasNextTerm, err := fti.nextTermsIterResult()
if err != nil {
fti.err = err
return false
}
if hasNextTerm {
return true
}
if err := fti.termIter.Err(); err != nil {
}

// i.e. need to switch to next field
for hasNextField := fti.setNextField(); hasNextField; hasNextField = fti.setNextField() {
// and get next term for the field
var err error
fti.termIter, err = fti.seg.TermsIterable().Terms(fti.current.field)
if err != nil {
fti.err = err
return false
}
if err := fti.termIter.Close(); err != nil {

hasNextTerm, err := fti.nextTermsIterResult()
if err != nil {
fti.err = err
return false
}
if hasNextTerm {
return true
}
}

// i.e. need to switch to next field
hasNext := fti.setNextField()
if !hasNext {
return false
}

// and get next term for the field
termsIter, err := fti.seg.TermsIterable().Terms(fti.current.field)
if err != nil {
// Check field iterator did not encounter error.
if err := fti.fieldIter.Err(); err != nil {
fti.err = err
return false
}
fti.termIter = termsIter

hasNext = fti.termIter.Next()
if !hasNext {
if fti.fieldIter.Err(); err != nil {
fti.err = err
return false
// No more fields.
return false
}

func (fti *fieldsAndTermsIter) nextTermsIterResult() (bool, error) {
for fti.termIter.Next() {
fti.current.term, fti.current.postings = fti.termIter.Current()
if fti.restrictByPostings == nil {
// No restrictions.
return true, nil
}

bitmap, ok := roaring.BitmapFromPostingsList(fti.current.postings)
if !ok {
return false, errUnpackBitmapFromPostingsList
}
fti.termIter = nil
// i.e. no more terms for this field, should try the next one
return fti.setNext()
}

fti.current.term, _ = fti.termIter.Current()
return true
// Check term isn part of at least some of the documents we're
// restricted to providing results for based on intersection
// count.
// Note: IntersectionCount is significantly faster than intersecting and
// counting results and also does not allocate.
if n := fti.restrictByPostings.IntersectionCount(bitmap); n > 0 {
// Matches, this is next result.
return true, nil
}
}
if err := fti.termIter.Err(); err != nil {
return false, err
}
if err := fti.termIter.Close(); err != nil {
return false, err
}
// Term iterator no longer relevant, no next.
fti.termIter = nil
return false, nil
}

func (fti *fieldsAndTermsIter) Next() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"testing"

"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/fst"
"github.com/m3db/m3/src/m3ninx/index/segment/mem"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"
xtest "github.com/m3db/m3/src/x/test"
Expand Down Expand Up @@ -174,6 +177,78 @@ func TestFieldsTermsIteratorEmptyTermInclude(t *testing.T) {
requireSlicesEqual(t, []pair{}, slice)
}

func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) {
testDocs := []doc.Document{
doc.Document{
Fields: []doc.Field{
doc.Field{
Name: []byte("fruit"),
Value: []byte("banana"),
},
doc.Field{
Name: []byte("color"),
Value: []byte("yellow"),
},
},
},
doc.Document{
Fields: []doc.Field{
doc.Field{
Name: []byte("fruit"),
Value: []byte("apple"),
},
doc.Field{
Name: []byte("color"),
Value: []byte("red"),
},
},
},
doc.Document{
Fields: []doc.Field{
doc.Field{
Name: []byte("fruit"),
Value: []byte("pineapple"),
},
doc.Field{
Name: []byte("color"),
Value: []byte("yellow"),
},
},
},
}

seg, err := mem.NewSegment(0, mem.NewOptions())
require.NoError(t, err)

require.NoError(t, seg.InsertBatch(m3ninxindex.Batch{
Docs: testDocs,
AllowPartialUpdates: true,
}))

require.NoError(t, seg.Seal())

fruitRegexp, err := idx.NewRegexpQuery([]byte("fruit"), []byte("^.*apple$"))
require.NoError(t, err)

colorRegexp, err := idx.NewRegexpQuery([]byte("color"), []byte("^(red|yellow)$"))
require.NoError(t, err)

iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{
iterateTerms: true,
restrictByQuery: &Query{
Query: idx.NewConjunctionQuery(fruitRegexp, colorRegexp),
},
})
require.NoError(t, err)
slice := toSlice(t, iter)
requireSlicesEqual(t, []pair{
pair{"color", "red"},
pair{"color", "yellow"},
pair{"fruit", "apple"},
pair{"fruit", "pineapple"},
}, slice)
}

func newMockSegment(ctrl *gomock.Controller, tagValues map[string][]string) segment.Segment {
fields := make([]iterpoint, 0, len(tagValues))
for k := range tagValues {
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ type AggregateResultsOptions struct {

// FieldFilter is an optional param to filter aggregate values.
FieldFilter AggregateFieldFilter

// RestrictByQuery is a query to restrict the set of documents that must
// be present for an aggregated term to be returned.
RestrictByQuery *Query
}

// AggregateResultsAllocator allocates AggregateResults types.
Expand Down
Loading

0 comments on commit 617d723

Please sign in to comment.