Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Add prefix label matcher.
Browse files Browse the repository at this point in the history
Implement labels.PrefixMatcher and use interface conversion in querier
to optimize label tuples search.

[unit-tests]: Fix bug and populate label index for mock index.

Signed-off-by: Dmitry Ilyevsky <[email protected]>
  • Loading branch information
dilyevsky committed Jul 22, 2017
1 parent 1378338 commit 37194b7
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 16 deletions.
24 changes: 23 additions & 1 deletion labels/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

package labels

import "regexp"
import (
"regexp"
"strings"
)

// Selector holds constraints for matching against a label set.
type Selector []Matcher
Expand Down Expand Up @@ -84,3 +87,22 @@ func (m *notMatcher) Matches(v string) bool { return !m.Matcher.Matches(v) }
func Not(m Matcher) Matcher {
return &notMatcher{m}
}

// PrefixMatcher implements Matcher for labels which values matches prefix.
type PrefixMatcher struct {
name, prefix string
}

// NewPrefixMatcher returns new Matcher for label name matching prefix.
func NewPrefixMatcher(name, prefix string) Matcher {
return &PrefixMatcher{name: name, prefix: prefix}
}

// Name implements Matcher interface.
func (m *PrefixMatcher) Name() string { return m.name }

// Prefix returns matching prefix.
func (m *PrefixMatcher) Prefix() string { return m.prefix }

// Matches implements Matcher interface.
func (m *PrefixMatcher) Matches(v string) bool { return strings.HasPrefix(v, m.prefix) }
51 changes: 44 additions & 7 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tsdb

import (
"fmt"
"sort"
"strings"

"github.com/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -220,6 +221,37 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) {
return p, absent
}

// tuplesByPrefix uses binary search to find prefix matches within ts.
func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) {
var outErr error
tslen := ts.Len()
i := sort.Search(tslen, func(i int) bool {
vs, err := ts.At(i)
if err != nil {
outErr = fmt.Errorf("Failed to read tuple %d/%d: %v", i, tslen, err)
return true
}
val := vs[0]
l := len(m.Prefix())
if l > len(vs) {
l = len(val)
}
return val[:l] >= m.Prefix()
})
if outErr != nil {
return nil, outErr
}
var matches []string
for ; i < tslen; i++ {
vs, err := ts.At(i)
if err != nil || !m.Matches(vs[0]) {
return matches, err
}
matches = append(matches, vs[0])
}
return matches, nil
}

func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
// Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok {
Expand All @@ -230,22 +262,27 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings {
return it
}

// TODO(fabxc): use interface upgrading to provide fast solution
// for prefix matches. Tuples are lexicographically sorted.
tpls, err := r.index.LabelValues(m.Name())
if err != nil {
return errPostings{err: err}
}

var res []string

for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if pm, ok := m.(*labels.PrefixMatcher); ok {
res, err = tuplesByPrefix(pm, tpls)
if err != nil {
return errPostings{err: err}
}
if m.Matches(vals[0]) {
res = append(res, vals[0])

} else {
for i := 0; i < tpls.Len(); i++ {
vals, err := tpls.At(i)
if err != nil {
return errPostings{err: err}
}
if m.Matches(vals[0]) {
res = append(res, vals[0])
}
}
}

Expand Down
89 changes: 81 additions & 8 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func createIdxChkReaders(tc []struct {

postings := &memPostings{m: make(map[term][]uint32, 512)}
chkReader := mockChunkReader(make(map[uint64]chunks.Chunk))
lblIdx := make(map[string]stringset)
mi := newMockIndex()

for i, s := range tc {
Expand All @@ -253,16 +254,28 @@ func createIdxChkReaders(tc []struct {
chkReader[ref] = chunk
}

mi.AddSeries(uint32(i), labels.FromMap(s.lset), metas...)
ls := labels.FromMap(s.lset)
mi.AddSeries(uint32(i), ls, metas...)

postings.add(uint32(i), term{})
for _, l := range labels.FromMap(s.lset) {
for _, l := range ls {
postings.add(uint32(i), term{l.Name, l.Value})

vs, present := lblIdx[l.Name]
if !present {
vs = stringset{}
lblIdx[l.Name] = vs
}
vs.set(l.Value)
}
}

for l, vs := range lblIdx {
mi.WriteLabelIndex([]string{l}, vs.slice())
}

for tm := range postings.m {
mi.WritePostings(tm.name, tm.name, postings.get(tm))
mi.WritePostings(tm.name, tm.value, postings.get(tm))
}

return mi, chkReader
Expand Down Expand Up @@ -334,6 +347,47 @@ func TestBlockQuerier(t *testing.T) {
},
},
},
{
lset: map[string]string{
"p": "abcd",
"x": "xyz",
},
chunks: [][]sample{
{
{1, 2}, {2, 3}, {3, 4},
},
{
{5, 2}, {6, 3}, {7, 4},
},
},
},
{
lset: map[string]string{
"a": "ab",
"p": "abce",
},
chunks: [][]sample{
{
{1, 1}, {2, 2}, {3, 3},
},
{
{5, 3}, {6, 6},
},
},
},
{
lset: map[string]string{
"p": "xyz",
},
chunks: [][]sample{
{
{1, 1}, {2, 2}, {3, 3},
},
{
{4, 4}, {5, 5}, {6, 6},
},
},
},
},

queries: []query{
Expand Down Expand Up @@ -373,11 +427,30 @@ func TestBlockQuerier(t *testing.T) {
),
}),
},
{
mint: 2,
maxt: 6,
ms: []labels.Matcher{labels.NewPrefixMatcher("p", "abc")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"p": "abcd",
"x": "xyz",
},
[]sample{{2, 3}, {3, 4}, {5, 2}, {6, 3}},
),
newSeries(map[string]string{
"a": "ab",
"p": "abce",
},
[]sample{{2, 2}, {3, 3}, {5, 3}, {6, 6}},
),
}),
},
},
}

Outer:
for _, c := range cases.queries {
for i, c := range cases.queries {
ir, cr := createIdxChkReaders(cases.data)
querier := &blockQuerier{
index: ir,
Expand All @@ -392,21 +465,21 @@ Outer:

for {
eok, rok := c.exp.Next(), res.Next()
require.Equal(t, eok, rok, "next")
require.Equal(t, eok, rok, "%d: next", i)

if !eok {
continue Outer
}
sexp := c.exp.At()
sres := res.At()

require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
require.Equal(t, sexp.Labels(), sres.Labels(), "%d: labels", i)

smplExp, errExp := expandSeriesIterator(sexp.Iterator())
smplRes, errRes := expandSeriesIterator(sres.Iterator())

require.Equal(t, errExp, errRes, "samples error")
require.Equal(t, smplExp, smplRes, "samples")
require.Equal(t, errExp, errRes, "%d: samples error", i)
require.Equal(t, smplExp, smplRes, "%d: samples", i)
}
}

Expand Down

0 comments on commit 37194b7

Please sign in to comment.