diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 6bf06d60a5..ec889cc244 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -391,6 +391,47 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +function test_labels { + TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \ + TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \ + TAG_NAME_2="name_2" TAG_VALUE_2="value_2_1" \ + prometheus_remote_write \ + label_metric now 42.42 \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + TAG_NAME_0="name_0" TAG_VALUE_0="value_0_2" \ + TAG_NAME_1="name_1" TAG_VALUE_1="value_1_2" \ + prometheus_remote_write \ + label_metric_2 now 42.42 \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + # Test label search with match + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels" | jq -r "[.data[] | select(index(\"name_0\", \"name_1\", \"name_2\"))] | length") -eq 3 ]]' + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric" | jq -r ".data | length") -eq 4 ]]' + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2" | jq -r ".data | length") -eq 3 ]]' + + # Test label values search with match + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values" | jq -r ".data | length") -eq 2 ]]' # two values without a match + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data | length") -eq 1 ]]' + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data[0]") = "value_1_1" ]]' + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data | length") -eq 1 ]]' + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data[0]") = "value_1_2" ]]' +} + echo "Running readiness test" test_readiness @@ -409,6 +450,7 @@ test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags test_series +test_labels echo "Running function correctness tests" test_correctness diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index c718b2fd54..5f47813827 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -630,7 +630,7 @@ func (b *block) aggregateWithSpan( } return aggOpts.FieldFilter.Allow(field) }, - fieldIterFn: func(r segment.Reader) (segment.FieldsIterator, error) { + fieldIterFn: func(r segment.Reader) (segment.FieldsPostingsListIterator, error) { // NB(prateek): we default to using the regular (FST) fields iterator // unless we have a predefined list of fields we know we need to restrict // our search to, in which case we iterate that list and check if known values @@ -642,7 +642,7 @@ func (b *block) aggregateWithSpan( // to this function is expected to have (FieldsFilter) pretty small. If that changes // in the future, we can revisit this. if len(aggOpts.FieldFilter) == 0 { - return r.Fields() + return r.FieldsPostingsList() } return newFilterFieldsIterator(r, aggOpts.FieldFilter) }, diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index 186c32d47b..44f6cc4685 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -23,11 +23,12 @@ package index import ( "errors" + pilosaroaring "github.com/m3dbx/pilosa/roaring" + "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" xerrors "github.com/m3db/m3/src/x/errors" - pilosaroaring "github.com/m3dbx/pilosa/roaring" ) var ( @@ -49,23 +50,23 @@ func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { return o.allowFn(f) } -func (o fieldsAndTermsIteratorOpts) newFieldIter(r segment.Reader) (segment.FieldsIterator, error) { +func (o fieldsAndTermsIteratorOpts) newFieldIter(r segment.Reader) (segment.FieldsPostingsListIterator, error) { if o.fieldIterFn == nil { - return r.Fields() + return r.FieldsPostingsList() } return o.fieldIterFn(r) } type allowFn func(field []byte) bool -type newFieldIterFn func(r segment.Reader) (segment.FieldsIterator, error) +type newFieldIterFn func(r segment.Reader) (segment.FieldsPostingsListIterator, error) type fieldsAndTermsIter struct { reader segment.Reader opts fieldsAndTermsIteratorOpts err error - fieldIter segment.FieldsIterator + fieldIter segment.FieldsPostingsListIterator termIter segment.TermsIterator current struct { @@ -145,10 +146,33 @@ func (fti *fieldsAndTermsIter) setNextField() bool { } for fieldIter.Next() { - field := fieldIter.Current() + field, pl := fieldIter.Current() if !fti.opts.allow(field) { continue } + if fti.restrictByPostings == nil { + // No restrictions. + fti.current.field = field + return true + } + + bitmap, ok := roaring.BitmapFromPostingsList(pl) + if !ok { + fti.err = errUnpackBitmapFromPostingsList + return false + } + + // Check field is part of at least some of the documents we're + // restricted to providing results for based on intersection + // count. + // Note: IntersectionCount is significantly faster than intersecting and + // counting results and also does not allocate. + if n := fti.restrictByPostings.IntersectionCount(bitmap); n < 1 { + // No match, not next result. + continue + } + + // Matches, this is next result. fti.current.field = field return true } @@ -213,7 +237,7 @@ func (fti *fieldsAndTermsIter) nextTermsIterResult() (bool, error) { return false, errUnpackBitmapFromPostingsList } - // Check term isn part of at least some of the documents we're + // Check term is part of at least some of the documents we're // restricted to providing results for based on intersection // count. // Note: IntersectionCount is significantly faster than intersecting and diff --git a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go index 91224e7e60..75220065f5 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go @@ -35,6 +35,8 @@ import ( "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/m3ninx/index/segment" xtest "github.com/m3db/m3/src/x/test" ) @@ -62,7 +64,8 @@ func TestFieldsTermsIteratorPropertyTest(t *testing.T) { if err != nil { return false, err } - observed := toSlice(t, iter) + observed, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, expected, observed) return true, nil }, @@ -97,7 +100,7 @@ func TestFieldsTermsIteratorPropertyTestNoPanic(t *testing.T) { if err != nil { return false, err } - toSlice(t, iter) + _, _ = toSlice(iter) return true, nil }, genIterableSegment(ctrl), @@ -149,9 +152,9 @@ func genIterableSegment(ctrl *gomock.Controller) gopter.Gen { r := segment.NewMockReader(ctrl) - fieldIterator := &stubFieldIterator{points: fields} + fieldsPostingsListIterator := &stubFieldsPostingsListIterator{points: fields} - r.EXPECT().Fields().Return(fieldIterator, nil).AnyTimes() + r.EXPECT().FieldsPostingsList().Return(fieldsPostingsListIterator, nil).AnyTimes() for f, values := range tagValues { sort.Slice(values, func(i, j int) bool { diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index 706d49d68e..243f5336ca 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -27,6 +27,9 @@ import ( "strings" "testing" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -34,11 +37,9 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/postings/roaring" "github.com/m3db/m3/src/m3ninx/util" xtest "github.com/m3db/m3/src/x/test" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" ) var ( @@ -91,7 +92,8 @@ func TestFieldsTermsIteratorReuse(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"d", "e"}, {"d", "f"}, @@ -106,7 +108,8 @@ func TestFieldsTermsIteratorReuse(t *testing.T) { }, }) require.NoError(t, err) - slice = toSlice(t, iter) + slice, err = toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"a", "b"}, {"a", "c"}, @@ -135,7 +138,8 @@ func TestFieldsTermsIteratorSimpleSkip(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"d", "e"}, {"d", "f"}, @@ -159,7 +163,8 @@ func TestFieldsTermsIteratorTermsOnly(t *testing.T) { iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"a", ""}, {"d", ""}, @@ -173,25 +178,61 @@ func TestFieldsTermsIteratorEmptyTerm(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - reader := newMockSegmentReader(ctrl, map[string][]string{ - "a": nil, + reader := newMockSegmentReader(ctrl, map[string]terms{ + "a": {}, }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: false}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{{"a", ""}}, slice) } +func TestFieldsTermsIteratorRestrictByQueryFields(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + pl0 := roaring.NewPostingsList() + require.NoError(t, pl0.Insert(postings.ID(42))) + + pl1 := roaring.NewPostingsList() + require.NoError(t, pl1.Insert(postings.ID(1))) + + pl2 := roaring.NewPostingsList() + require.NoError(t, pl2.Insert(postings.ID(2))) + + reader := newMockSegmentReader(ctrl, map[string]terms{ + "foo": {values: []term{{value: "foo_0"}}, postings: pl0}, + "bar": {values: []term{{value: "bar_0"}}, postings: pl1}, + "baz": {values: []term{{value: "baz_0"}}, postings: pl2}, + }) + + // Simulate term query for "bar": + reader.EXPECT().MatchField([]byte("bar")).Return(pl1, nil) + + iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{ + iterateTerms: false, + restrictByQuery: &Query{ + Query: idx.NewFieldQuery([]byte("bar")), + }, + }) + require.NoError(t, err) + slice, err := toSlice(iter) + require.NoError(t, err) + requireSlicesEqual(t, []pair{{"bar", ""}}, slice) +} + func TestFieldsTermsIteratorEmptyTermInclude(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - reader := newMockSegmentReader(ctrl, map[string][]string{ - "a": nil, + reader := newMockSegmentReader(ctrl, map[string]terms{ + "a": {}, }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: true}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{}, slice) } @@ -261,7 +302,8 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"color", "red"}, {"color", "yellow"}, @@ -270,11 +312,22 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, slice) } -func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string) segment.Reader { - fields := make([]iterpoint, 0, len(tagValues)) - for k := range tagValues { +type terms struct { + values []term + postings postings.List +} + +type term struct { + value string + postings postings.List +} + +func newMockSegmentReader(ctrl *gomock.Controller, termValues map[string]terms) *segment.MockReader { + fields := make([]iterpoint, 0, len(termValues)) + for field := range termValues { fields = append(fields, iterpoint{ - value: k, + value: field, + postings: termValues[field].postings, }) } sort.Slice(fields, func(i, j int) bool { @@ -282,17 +335,20 @@ func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string }) r := segment.NewMockReader(ctrl) - fieldIterator := &stubFieldIterator{points: fields} + fieldsPostingsListIterator := &stubFieldsPostingsListIterator{points: fields} - r.EXPECT().Fields().Return(fieldIterator, nil).AnyTimes() + r.EXPECT().FieldsPostingsList().Return(fieldsPostingsListIterator, nil).AnyTimes() for _, f := range fields { - termValues := tagValues[f.value] - sort.Strings(termValues) + termValues := termValues[f.value].values + sort.Slice(termValues, func(i, j int) bool { + return termValues[i].value < termValues[j].value + }) terms := make([]iterpoint, 0, len(termValues)) for _, t := range termValues { terms = append(terms, iterpoint{ - value: t, + value: t.value, + postings: t.postings, }) } termIterator := &stubTermIterator{points: terms} @@ -302,6 +358,40 @@ func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string return r } +type stubFieldsPostingsListIterator struct { + current iterpoint + points []iterpoint +} + +func (s *stubFieldsPostingsListIterator) Next() bool { + if len(s.points) == 0 { + return false + } + s.current = s.points[0] + s.points = s.points[1:] + return true +} + +func (s *stubFieldsPostingsListIterator) Current() ([]byte, postings.List) { + return []byte(s.current.value), s.current.postings +} + +func (s *stubFieldsPostingsListIterator) Err() error { + return s.current.err +} + +func (s *stubFieldsPostingsListIterator) Close() error { + if s.current.err != nil { + return s.current.err + } + for s.Next() { + if err := s.Err(); err != nil { + return err + } + } + return nil +} + type stubTermIterator struct { current iterpoint points []iterpoint @@ -317,7 +407,7 @@ func (s *stubTermIterator) Next() bool { } func (s *stubTermIterator) Current() ([]byte, postings.List) { - return []byte(s.current.value), nil + return []byte(s.current.value), s.current.postings } func (s *stubTermIterator) Err() error { @@ -371,8 +461,9 @@ func (s *stubFieldIterator) Close() error { } type iterpoint struct { - err error - value string + err error + value string + postings postings.List } type pair struct { @@ -429,7 +520,7 @@ func (s *fieldsTermsIterSetup) requireEquals(t *testing.T, iter fieldsAndTermsIt require.NoError(t, iter.Close()) } -func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { +func toSlice(iter fieldsAndTermsIterator) ([]pair, error) { var pairs []pair for iter.Next() { n, v := iter.Current() @@ -441,7 +532,7 @@ func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { Value: string(v), }) } - return pairs + return pairs, iter.Err() } func requireSlicesEqual(t *testing.T, a, b []pair) { diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go index 96ac7570e2..7b24eb2ad9 100644 --- a/src/dbnode/storage/index/filter_fields_iterator.go +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -21,9 +21,12 @@ package index import ( + "bytes" "errors" "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/postings" + xerrors "github.com/m3db/m3/src/x/errors" ) var ( @@ -33,54 +36,71 @@ var ( func newFilterFieldsIterator( reader segment.Reader, fields AggregateFieldFilter, -) (segment.FieldsIterator, error) { +) (segment.FieldsPostingsListIterator, error) { if len(fields) == 0 { return nil, errNoFiltersSpecified } + fieldsIter, err := reader.FieldsPostingsList() + if err != nil { + return nil, err + } return &filterFieldsIterator{ reader: reader, + fieldsIter: fieldsIter, fields: fields, currentIdx: -1, }, nil } type filterFieldsIterator struct { - reader segment.Reader - fields AggregateFieldFilter + reader segment.Reader + fieldsIter segment.FieldsPostingsListIterator + fields AggregateFieldFilter err error currentIdx int } -var _ segment.FieldsIterator = &filterFieldsIterator{} +var _ segment.FieldsPostingsListIterator = &filterFieldsIterator{} func (f *filterFieldsIterator) Next() bool { if f.err != nil { return false } - f.currentIdx++ // required because we start at -1 - for f.currentIdx < len(f.fields) { - field := f.fields[f.currentIdx] + for f.fieldsIter.Next() { + field, _ := f.fieldsIter.Current() - ok, err := f.reader.ContainsField(field) - if err != nil { - f.err = err - return false + found := false + for _, f := range f.fields { + if bytes.Equal(field, f) { + found = true + break + } } - - // i.e. we found a field from the filter list contained in the segment. - if ok { + if found { return true } - - // the current field is unsuitable, so we skip to the next possiblity. - f.currentIdx++ } return false } -func (f *filterFieldsIterator) Current() []byte { return f.fields[f.currentIdx] } -func (f *filterFieldsIterator) Err() error { return f.err } -func (f *filterFieldsIterator) Close() error { return nil } +func (f *filterFieldsIterator) Current() ([]byte, postings.List) { + return f.fieldsIter.Current() +} + +func (f *filterFieldsIterator) Err() error { + return f.err +} + +func (f *filterFieldsIterator) Close() error { + multiErr := xerrors.NewMultiError() + if err := f.reader.Close(); err != nil { + multiErr = multiErr.Add(err) + } + if err := f.fieldsIter.Close(); err != nil { + multiErr = multiErr.Add(err) + } + return multiErr.FinalError() +} diff --git a/src/dbnode/storage/index/filter_fields_iterator_test.go b/src/dbnode/storage/index/filter_fields_iterator_test.go index 9e584011a4..db60a0e39d 100644 --- a/src/dbnode/storage/index/filter_fields_iterator_test.go +++ b/src/dbnode/storage/index/filter_fields_iterator_test.go @@ -26,7 +26,6 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" xtest "github.com/m3db/m3/src/x/test" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -44,11 +43,12 @@ func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - r.EXPECT().ContainsField(gomock.Any()).Return(false, nil).AnyTimes() require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -59,17 +59,15 @@ func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(false, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -80,17 +78,15 @@ func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "b": {}, "e": {}}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(false, nil), - r.EXPECT().ContainsField([]byte("b")).Return(true, nil), - r.EXPECT().ContainsField([]byte("c")).Return(false, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "b", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "b", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -101,17 +97,15 @@ func TestNewFilterFieldsIteratorEndMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "e": {}, "c": {}}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(false, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "c", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -122,21 +116,24 @@ func TestNewFilterFieldsIteratorAllMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "b": {}, "c": {}}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(true, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) + require.True(t, iter.Next()) - require.Equal(t, "b", string(iter.Current())) + val, _ = iter.Current() + require.Equal(t, "b", string(val)) + require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) + val, _ = iter.Current() + require.Equal(t, "c", string(val)) + require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -147,19 +144,20 @@ func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "c": {}}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) + require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) + val, _ = iter.Current() + require.Equal(t, "c", string(val)) + require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) diff --git a/src/dbnode/storage/index/read_through_segment.go b/src/dbnode/storage/index/read_through_segment.go index 089fd546bc..38d5a6e4c4 100644 --- a/src/dbnode/storage/index/read_through_segment.go +++ b/src/dbnode/storage/index/read_through_segment.go @@ -287,6 +287,11 @@ func (s *readThroughSegmentReader) Fields() (segment.FieldsIterator, error) { return s.reader.Fields() } +// FieldsPostingsList is a pass through call. +func (s *readThroughSegmentReader) FieldsPostingsList() (segment.FieldsPostingsListIterator, error) { + return s.reader.FieldsPostingsList() +} + // ContainsField is a pass through call. func (s *readThroughSegmentReader) ContainsField(field []byte) (bool, error) { return s.reader.ContainsField(field) diff --git a/src/m3ninx/index/segment/fst/fst_terms_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_iterator.go index 39327dc673..734ebbd911 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_iterator.go @@ -31,6 +31,7 @@ type fstTermsIterOpts struct { seg *fsSegment fst *vellum.FST finalizeFST bool + fieldsFST bool } func (o fstTermsIterOpts) Close() error { diff --git a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go index b0c2c25224..b01e998858 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go @@ -21,6 +21,7 @@ package fst import ( + "github.com/m3db/m3/src/m3ninx/generated/proto/fswriter" sgmt "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" postingsroaring "github.com/m3db/m3/src/m3ninx/postings/roaring" @@ -92,8 +93,17 @@ func (f *fstTermsPostingsIter) Next() bool { currOffset := f.termsIter.CurrentOffset() f.seg.RLock() - f.err = f.seg.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(f.bitmap, - currOffset) + if f.termsIter.opts.fieldsFST { + var fieldsData fswriter.FieldData + fieldsData, f.err = f.seg.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(currOffset) + currOffset = fieldsData.FieldPostingsListOffset + } + if f.err == nil { + // Only attempt if the previous unmarshal definitely succeeded + // if we are operating on a fields FST. + f.err = f.seg.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(f.bitmap, + currOffset) + } f.seg.RUnlock() return f.err == nil diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index fd041df720..883a440e05 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -46,15 +46,17 @@ import ( ) var ( - errReaderClosed = errors.New("segment is closed") - errReaderFinalized = errors.New("segment is finalized") - errReaderNilRegexp = errors.New("nil regexp provided") - errUnsupportedMajorVersion = errors.New("unsupported major version") - errDocumentsDataUnset = errors.New("documents data bytes are not set") - errDocumentsIdxUnset = errors.New("documents index bytes are not set") - errPostingsDataUnset = errors.New("postings data bytes are not set") - errFSTTermsDataUnset = errors.New("fst terms data bytes are not set") - errFSTFieldsDataUnset = errors.New("fst fields data bytes are not set") + errReaderClosed = errors.New("segment is closed") + errReaderFinalized = errors.New("segment is finalized") + errReaderNilRegexp = errors.New("nil regexp provided") + errDocumentsDataUnset = errors.New("documents data bytes are not set") + errDocumentsIdxUnset = errors.New("documents index bytes are not set") + errPostingsDataUnset = errors.New("postings data bytes are not set") + errFSTTermsDataUnset = errors.New("fst terms data bytes are not set") + errFSTFieldsDataUnset = errors.New("fst fields data bytes are not set") + errUnsupportedFeatureFieldsPostingsList = errors.New( + "fst unsupported operation on old segment version: missing field postings list", + ) ) // SegmentData represent the collection of required parameters to construct a Segment. @@ -391,6 +393,23 @@ func (i *termsIterable) termsNotClosedMaybeFinalizedWithRLock( return i.postingsIter, nil } +func (i *termsIterable) fieldsNotClosedMaybeFinalizedWithRLock() (sgmt.FieldsPostingsListIterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if i.r.finalized { + return nil, errReaderFinalized + } + + i.fieldsIter.reset(fstTermsIterOpts{ + seg: i.r, + fst: i.r.fieldsFST, + finalizeFST: false, + fieldsFST: true, + }) + i.postingsIter.reset(i.r, i.fieldsIter) + return i.postingsIter, nil +} + func (r *fsSegment) UnmarshalPostingsListBitmap(b *pilosaroaring.Bitmap, offset uint64) error { r.RLock() defer r.RUnlock() @@ -438,18 +457,37 @@ func (r *fsSegment) matchFieldNotClosedMaybeFinalizedWithRLock( return r.opts.PostingsListPool().Get(), nil } - protoBytes, _, err := r.retrieveTermsBytesWithRLock(r.data.FSTTermsData.Bytes, termsFSTOffset) + fieldData, err := r.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(termsFSTOffset) if err != nil { return nil, err } + postingsOffset := fieldData.FieldPostingsListOffset + return r.retrievePostingsListWithRLock(postingsOffset) +} + +func (r *fsSegment) unmarshalFieldDataNotClosedMaybeFinalizedWithRLock( + fieldDataOffset uint64, +) (fswriter.FieldData, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return fswriter.FieldData{}, errReaderFinalized + } + if !r.data.Version.supportsFieldPostingsList() { + return fswriter.FieldData{}, errUnsupportedFeatureFieldsPostingsList + } + + protoBytes, _, err := r.retrieveTermsBytesWithRLock(r.data.FSTTermsData.Bytes, fieldDataOffset) + if err != nil { + return fswriter.FieldData{}, err + } + var fieldData fswriter.FieldData if err := fieldData.Unmarshal(protoBytes); err != nil { - return nil, err + return fswriter.FieldData{}, err } - - postingsOffset := fieldData.FieldPostingsListOffset - return r.retrievePostingsListWithRLock(postingsOffset) + return fieldData, nil } func (r *fsSegment) matchTermNotClosedMaybeFinalizedWithRLock( @@ -856,6 +894,12 @@ func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { return nil, errReaderClosed } + sr.fsSegment.RLock() + defer sr.fsSegment.RUnlock() + if sr.fsSegment.finalized { + return nil, errReaderFinalized + } + iter := newFSTTermsIter() iter.reset(fstTermsIterOpts{ seg: sr.fsSegment, @@ -879,6 +923,17 @@ func (sr *fsSegmentReader) ContainsField(field []byte) (bool, error) { return sr.fsSegment.fieldsFST.Contains(field) } +func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) { + if sr.closed { + return nil, errReaderClosed + } + fieldsIterable := newTermsIterable(sr.fsSegment) + sr.fsSegment.RLock() + iter, err := fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RUnlock() + return iter, err +} + func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { if sr.closed { return nil, errReaderClosed diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index eef2fbdf5f..50339643db 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -312,6 +312,35 @@ func TestPostingsListEqualForMatchField(t *testing.T) { }) } } + +func TestPostingsListEqualForMatchFieldWithFieldsPostingsList(t *testing.T) { + for _, test := range testDocuments { + t.Run(test.name, func(t *testing.T) { + for _, tc := range newTestCases(t, test.docs) { + tc := tc + t.Run(tc.name, func(t *testing.T) { + expSeg, obsSeg := tc.expected, tc.observed + expReader, err := expSeg.Reader() + require.NoError(t, err) + obsReader, err := obsSeg.Reader() + require.NoError(t, err) + + obsFieldsPostingsIter, err := obsReader.FieldsPostingsList() + require.NoError(t, err) + + for obsFieldsPostingsIter.Next() { + f, obsPl := obsFieldsPostingsIter.Current() + expPl, err := expReader.MatchField(f) + require.NoError(t, err) + require.True(t, expPl.Equal(obsPl), + fmt.Sprintf("field[%s] - [%v] != [%v]", string(f), pprintIter(expPl), pprintIter(obsPl))) + } + }) + } + }) + } +} + func TestPostingsListEqualForMatchTerm(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { @@ -343,6 +372,38 @@ func TestPostingsListEqualForMatchTerm(t *testing.T) { } } +func TestPostingsListEqualForMatchTermWithFieldsPostingsList(t *testing.T) { + for _, test := range testDocuments { + t.Run(test.name, func(t *testing.T) { + memSeg, fstSeg := newTestSegments(t, test.docs) + memReader, err := memSeg.Reader() + require.NoError(t, err) + fstReader, err := fstSeg.Reader() + require.NoError(t, err) + + fstFieldsPostingsIter, err := fstReader.FieldsPostingsList() + require.NoError(t, err) + + for fstFieldsPostingsIter.Next() { + f, _ := fstFieldsPostingsIter.Current() + + memTermsIter, err := memSeg.Terms(f) + require.NoError(t, err) + memTerms := toTermPostings(t, memTermsIter) + + for term := range memTerms { + memPl, err := memReader.MatchTerm(f, []byte(term)) + require.NoError(t, err) + fstPl, err := fstReader.MatchTerm(f, []byte(term)) + require.NoError(t, err) + require.True(t, memPl.Equal(fstPl), + fmt.Sprintf("%s:%s - [%v] != [%v]", string(f), term, pprintIter(memPl), pprintIter(fstPl))) + } + } + }) + } +} + func TestPostingsListContainsID(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { diff --git a/src/m3ninx/index/segment/mem/mem_mock.go b/src/m3ninx/index/segment/mem/mem_mock.go index a51c4f759a..b7f5c7a114 100644 --- a/src/m3ninx/index/segment/mem/mem_mock.go +++ b/src/m3ninx/index/segment/mem/mem_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/m3ninx/index/segment/mem (interfaces: ReadableSegment) -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -88,6 +88,21 @@ func (mr *MockReadableSegmentMockRecorder) Fields() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fields", reflect.TypeOf((*MockReadableSegment)(nil).Fields)) } +// FieldsPostingsList mocks base method +func (m *MockReadableSegment) FieldsPostingsList() (segment.FieldsPostingsListIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FieldsPostingsList") + ret0, _ := ret[0].(segment.FieldsPostingsListIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FieldsPostingsList indicates an expected call of FieldsPostingsList +func (mr *MockReadableSegmentMockRecorder) FieldsPostingsList() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FieldsPostingsList", reflect.TypeOf((*MockReadableSegment)(nil).FieldsPostingsList)) +} + // Terms mocks base method func (m *MockReadableSegment) Terms(arg0 []byte) (segment.TermsIterator, error) { m.ctrl.T.Helper() diff --git a/src/m3ninx/index/segment/mem/reader.go b/src/m3ninx/index/segment/mem/reader.go index 998a184452..93a42824f9 100644 --- a/src/m3ninx/index/segment/mem/reader.go +++ b/src/m3ninx/index/segment/mem/reader.go @@ -70,6 +70,10 @@ func (r *reader) Terms(field []byte) (sgmt.TermsIterator, error) { return r.segment.Terms(field) } +func (r *reader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) { + return r.segment.FieldsPostingsList() +} + func (r *reader) MatchField(field []byte) (postings.List, error) { // falling back to regexp .* as this segment implementation is only used in tests. return r.MatchRegexp(field, index.DotStarCompiledRegex()) diff --git a/src/m3ninx/index/segment/mem/types.go b/src/m3ninx/index/segment/mem/types.go index 9a5268e0b3..30479fa85f 100644 --- a/src/m3ninx/index/segment/mem/types.go +++ b/src/m3ninx/index/segment/mem/types.go @@ -72,6 +72,7 @@ type ReadableSegment interface { Fields() (sgmt.FieldsIterator, error) ContainsField(field []byte) (bool, error) Terms(field []byte) (sgmt.TermsIterator, error) + FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) matchTerm(field, term []byte) (postings.List, error) matchRegexp(field []byte, compiled *re.Regexp) (postings.List, error) getDoc(id postings.ID) (doc.Metadata, error) diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 7c86c01359..36b4e3efbe 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -360,6 +360,21 @@ func (mr *MockReaderMockRecorder) Terms(field interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Terms", reflect.TypeOf((*MockReader)(nil).Terms), field) } +// FieldsPostingsList mocks base method +func (m *MockReader) FieldsPostingsList() (FieldsPostingsListIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FieldsPostingsList") + ret0, _ := ret[0].(FieldsPostingsListIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FieldsPostingsList indicates an expected call of FieldsPostingsList +func (mr *MockReaderMockRecorder) FieldsPostingsList() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FieldsPostingsList", reflect.TypeOf((*MockReader)(nil).FieldsPostingsList)) +} + // ContainsField mocks base method func (m *MockReader) ContainsField(field []byte) (bool, error) { m.ctrl.T.Helper() diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index 0b176dc6de..61ba1d3cd3 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -69,6 +69,7 @@ type Reader interface { index.Reader FieldsIterable TermsIterable + FieldsPostingsListIterable // ContainsField returns a bool indicating if the Segment contains the provided field. ContainsField(field []byte) (bool, error) diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 87859f9caf..f1a9ba13d7 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -211,7 +211,10 @@ func ParseSeriesMatchQuery( parseOpts xpromql.ParseOptions, tagOptions models.TagOptions, ) ([]*storage.FetchQuery, error) { - r.ParseForm() + if err := r.ParseForm(); err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + matcherValues := r.Form["match[]"] if len(matcherValues) == 0 { return nil, xerrors.NewInvalidParamsError(errors.ErrInvalidMatchers) @@ -222,30 +225,85 @@ func ParseSeriesMatchQuery( return nil, err } - queries := make([]*storage.FetchQuery, len(matcherValues)) - fn := parseOpts.MetricSelectorFn() - for i, s := range matcherValues { - promMatchers, err := fn(s) - if err != nil { - return nil, xerrors.NewInvalidParamsError(err) - } - - matchers, err := xpromql.LabelMatchersToModelMatcher(promMatchers, tagOptions) - if err != nil { - return nil, xerrors.NewInvalidParamsError(err) - } + matchers, ok, err := ParseMatch(r, parseOpts, tagOptions) + if err != nil { + return nil, err + } + if !ok { + return nil, xerrors.NewInvalidParamsError( + fmt.Errorf("need more than one matcher: expected>=1, actual=%d", len(matchers))) + } - queries[i] = &storage.FetchQuery{ - Raw: fmt.Sprintf("match[]=%s", s), - TagMatchers: matchers, + queries := make([]*storage.FetchQuery, 0, len(matcherValues)) + // nolint:gocritic + for _, m := range matchers { + queries = append(queries, &storage.FetchQuery{ + Raw: fmt.Sprintf("match[]=%s", m.Match), + TagMatchers: m.Matchers, Start: start, End: end, - } + }) } return queries, nil } +// ParsedMatch is a parsed matched. +type ParsedMatch struct { + Match string + Matchers models.Matchers +} + +// ParseMatch parses all match params from the GET request. +func ParseMatch( + r *http.Request, + parseOpts xpromql.ParseOptions, + tagOptions models.TagOptions, +) ([]ParsedMatch, bool, error) { + if err := r.ParseForm(); err != nil { + return nil, false, xerrors.NewInvalidParamsError(err) + } + + matcherValues := r.Form["match[]"] + if len(matcherValues) == 0 { + return nil, false, nil + } + + matchers := make([]ParsedMatch, 0, len(matcherValues)) + for _, str := range matcherValues { + m, err := parseMatch(parseOpts, tagOptions, str) + if err != nil { + return nil, false, err + } + matchers = append(matchers, ParsedMatch{ + Match: str, + Matchers: m, + }) + } + + return matchers, true, nil +} + +func parseMatch( + parseOpts xpromql.ParseOptions, + tagOptions models.TagOptions, + matcher string, +) (models.Matchers, error) { + fn := parseOpts.MetricSelectorFn() + + promMatchers, err := fn(matcher) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + + matchers, err := xpromql.LabelMatchersToModelMatcher(promMatchers, tagOptions) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + + return matchers, nil +} + func renderNameOnlyTagCompletionResultsJSON( w io.Writer, results []consolidators.CompletedTag, diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index b8479ae4ab..056aea1a9e 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -193,3 +193,88 @@ func TestParseStartAndEnd(t *testing.T) { }) } } + +// TestParseMatch tests the parsing / construction logic around ParseMatch(). +// matcher_test.go has more comprehensive testing on parsing details. +func TestParseMatch(t *testing.T) { + parseOpts := promql.NewParseOptions() + tagOpts := models.NewTagOptions() + + tests := []struct { + querystring string + exMatch []ParsedMatch + exErr bool + exEmpty bool + }{ + {exEmpty: true}, + { + querystring: "match[]=eq_label", + exMatch: []ParsedMatch{ + { + Match: "eq_label", + Matchers: models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("eq_label"), + }, + }, + }, + }, + }, + {querystring: "match[]=illegal%match", exErr: true}, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("GET_%s", tt.querystring), func(t *testing.T) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + fmt.Sprintf("/?%s", tt.querystring), nil) + require.NoError(t, err) + + parsedMatches, ok, err := ParseMatch(req, parseOpts, tagOpts) + + if tt.exErr { + require.Error(t, err) + require.False(t, ok) + require.Empty(t, parsedMatches) + return + } + + require.NoError(t, err) + if tt.exEmpty { + require.False(t, ok) + require.Empty(t, parsedMatches) + } else { + require.True(t, ok) + require.Equal(t, tt.exMatch, parsedMatches) + } + }) + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("POST_%s", tt.querystring), func(t *testing.T) { + b := bytes.NewBuffer([]byte(tt.querystring)) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, "/", b) + require.NoError(t, err) + req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeFormURLEncoded) + + parsedMatches, ok, err := ParseMatch(req, parseOpts, tagOpts) + + if tt.exErr { + require.Error(t, err) + require.False(t, ok) + require.Empty(t, parsedMatches) + return + } + + require.NoError(t, err) + if tt.exEmpty { + require.False(t, ok) + require.Empty(t, parsedMatches) + } else { + require.True(t, ok) + require.Equal(t, tt.exMatch, parsedMatches) + } + }) + } +} diff --git a/src/query/api/v1/handler/prometheus/native/list_tags.go b/src/query/api/v1/handler/prometheus/native/list_tags.go index 23aa9268c8..33e77fad9a 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags.go @@ -22,6 +22,7 @@ package native import ( "context" + "fmt" "net/http" "github.com/m3db/m3/src/query/api/v1/handler" @@ -32,6 +33,7 @@ import ( "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -54,6 +56,7 @@ type ListTagsHandler struct { fetchOptionsBuilder handleroptions.FetchOptionsBuilder parseOpts promql.ParseOptions instrumentOpts instrument.Options + tagOpts models.TagOptions } // NewListTagsHandler returns a new instance of handler. @@ -63,6 +66,7 @@ func NewListTagsHandler(opts options.HandlerOptions) http.Handler { fetchOptionsBuilder: opts.FetchOptionsBuilder(), parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()), instrumentOpts: opts.InstrumentOpts(), + tagOpts: opts.TagOptions(), } } @@ -77,9 +81,26 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + tagMatchers := models.Matchers{{Type: models.MatchAll}} + reqTagMatchers, ok, err := prometheus.ParseMatch(r, h.parseOpts, h.tagOpts) + if err != nil { + err = xerrors.NewInvalidParamsError(err) + xhttp.WriteError(w, err) + return + } + if ok { + if n := len(reqTagMatchers); n != 1 { + err = xerrors.NewInvalidParamsError(fmt.Errorf( + "only single tag matcher allowed: actual=%d", n)) + xhttp.WriteError(w, err) + return + } + tagMatchers = reqTagMatchers[0].Matchers + } + query := &storage.CompleteTagsQuery{ CompleteNameOnly: true, - TagMatchers: models.Matchers{{Type: models.MatchAll}}, + TagMatchers: tagMatchers, Start: start, End: end, } diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index f0e2e24c4b..a240f9535f 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -116,32 +116,63 @@ func testListTags(t *testing.T, meta block.ResultMetadata, header string) { opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb). + SetTagOptions(models.NewTagOptions()). SetNowFn(nowFn) h := NewListTagsHandler(opts) for _, method := range []string{"GET", "POST"} { - matcher := &listTagsMatcher{start: time.Unix(0, 0), end: now} - store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). - Return(storeResult, nil) + testListTagsWithMatch(t, now, store, storeResult, method, header, h, false) + testListTagsWithMatch(t, now, store, storeResult, method, header, h, true) + } +} - req := httptest.NewRequest(method, "/labels", nil) - w := httptest.NewRecorder() +func testListTagsWithMatch( + t *testing.T, + now time.Time, + store *storage.MockStorage, + storeResult *consolidators.CompleteTagsResult, + method string, + header string, + h http.Handler, + withMatchOverride bool, +) { + tagMatcher := models.Matchers{{Type: models.MatchAll}} + target := "/labels" + if withMatchOverride { + tagMatcher = models.Matchers{{ + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("testing"), + }} + target = "/labels?match[]=testing" + } - h.ServeHTTP(w, req) + matcher := &storage.CompleteTagsQuery{ + CompleteNameOnly: true, + TagMatchers: tagMatcher, + Start: time.Unix(0, 0), + End: now, + } + store.EXPECT().CompleteTags(gomock.Any(), gomock.Eq(matcher), gomock.Any()). + Return(storeResult, nil) - require.Equal(t, http.StatusOK, w.Result().StatusCode) + req := httptest.NewRequest(method, target, nil) + w := httptest.NewRecorder() - body := w.Result().Body - defer body.Close() + h.ServeHTTP(w, req) - r, err := ioutil.ReadAll(body) - require.NoError(t, err) + require.Equal(t, http.StatusOK, w.Result().StatusCode) // nolint:bodyclose - ex := `{"status":"success","data":["bar","baz","foo"]}` - require.Equal(t, ex, string(r)) + body := w.Result().Body + defer body.Close() // nolint:errcheck - actual := w.Header().Get(headers.LimitHeader) - assert.Equal(t, header, actual) - } + r, err := ioutil.ReadAll(body) + require.NoError(t, err) + + ex := `{"status":"success","data":["bar","baz","foo"]}` + require.Equal(t, ex, string(r)) + + actual := w.Header().Get(headers.LimitHeader) + assert.Equal(t, header, actual) } func TestListErrorTags(t *testing.T) { diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index 61c5935130..d5b1814756 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -21,7 +21,9 @@ package remote import ( + "bytes" "context" + "fmt" "net/http" "github.com/m3db/m3/src/query/api/v1/handler" @@ -34,6 +36,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3/consolidators" "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -59,6 +62,7 @@ type TagValuesHandler struct { fetchOptionsBuilder handleroptions.FetchOptionsBuilder parseOpts promql.ParseOptions instrumentOpts instrument.Options + tagOpts models.TagOptions } // TagValuesResponse is the response that gets returned to the user @@ -67,12 +71,13 @@ type TagValuesResponse struct { } // NewTagValuesHandler returns a new instance of handler. -func NewTagValuesHandler(options options.HandlerOptions) http.Handler { +func NewTagValuesHandler(opts options.HandlerOptions) http.Handler { return &TagValuesHandler{ - storage: options.Storage(), - fetchOptionsBuilder: options.FetchOptionsBuilder(), - parseOpts: promql.NewParseOptions().SetNowFn(options.NowFn()), - instrumentOpts: options.InstrumentOpts(), + storage: opts.Storage(), + fetchOptionsBuilder: opts.FetchOptionsBuilder(), + parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()), + instrumentOpts: opts.InstrumentOpts(), + tagOpts: opts.TagOptions(), } } @@ -125,16 +130,39 @@ func (h *TagValuesHandler) parseTagValuesToQuery( } nameBytes := []byte(name) + + nameMatcher := models.Matcher{ + Type: models.MatchField, + Name: nameBytes, + } + tagMatchers := models.Matchers{nameMatcher} + reqTagMatchers, ok, err := prometheus.ParseMatch(r, h.parseOpts, h.tagOpts) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + if ok { + if n := len(reqTagMatchers); n != 1 { + err := xerrors.NewInvalidParamsError(fmt.Errorf( + "only single tag matcher allowed: actual=%d", n)) + return nil, err + } + + reqTagMatcher := reqTagMatchers[0] + + //nolint:gocritic + for _, m := range reqTagMatcher.Matchers { + // add all matchers that don't match the default name matcher. + if m.Type != nameMatcher.Type || !bytes.Equal(m.Name, nameMatcher.Name) { + tagMatchers = append(tagMatchers, m) + } + } + } + return &storage.CompleteTagsQuery{ Start: start, End: end, CompleteNameOnly: false, FilterNameTags: [][]byte{nameBytes}, - TagMatchers: models.Matchers{ - models.Matcher{ - Type: models.MatchField, - Name: nameBytes, - }, - }, + TagMatchers: tagMatchers, }, nil } diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go index cf617060ae..8fa1d17a40 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go @@ -111,6 +111,7 @@ func TestTagValues(t *testing.T) { opts := options.EmptyHandlerOptions(). SetStorage(store). SetNowFn(nowFn). + SetTagOptions(models.NewTagOptions()). SetFetchOptionsBuilder(fb) valueHandler := NewTagValuesHandler(opts) @@ -123,50 +124,84 @@ func TestTagValues(t *testing.T) { url := fmt.Sprintf("/label/{%s}/values", NameReplace) for _, tt := range names { - path := fmt.Sprintf("/label/%s/values?start=100", tt.name) - req, err := http.NewRequest("GET", path, nil) - if err != nil { - t.Fatal(err) - } + testTagValuesWithMatch(t, now, store, tt.name, url, valueHandler, false) + testTagValuesWithMatch(t, now, store, tt.name, url, valueHandler, true) + } +} - rr := httptest.NewRecorder() - router := mux.NewRouter() - matcher := &tagValuesMatcher{ - start: time.Unix(100, 0), - end: now, - filterTag: tt.name, +func testTagValuesWithMatch( + t *testing.T, + now time.Time, + store *storage.MockStorage, + name string, + url string, + valueHandler http.Handler, + withMatchOverride bool, +) { + path := fmt.Sprintf("/label/%s/values?start=100", name) + nameMatcher := models.Matcher{ + Type: models.MatchField, + Name: []byte(name), + } + matchers := models.Matchers{nameMatcher} + if withMatchOverride { + path = fmt.Sprintf("/label/%s/values?start=100&match[]=testing", name) + matchers = models.Matchers{ + nameMatcher, + { + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("testing"), + }, } + } - storeResult := &consolidators.CompleteTagsResult{ - CompleteNameOnly: false, - CompletedTags: []consolidators.CompletedTag{ - { - Name: b(tt.name), - Values: bs("a", "b", "c", tt.name), - }, - }, - Metadata: block.ResultMetadata{ - Exhaustive: false, - Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, + matcher := &storage.CompleteTagsQuery{ + Start: time.Unix(100, 0), + End: now, + CompleteNameOnly: false, + FilterNameTags: [][]byte{[]byte(name)}, + TagMatchers: matchers, + } + + // nolint:noctx + req, err := http.NewRequest("GET", path, nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + router := mux.NewRouter() + + storeResult := &consolidators.CompleteTagsResult{ + CompleteNameOnly: false, + CompletedTags: []consolidators.CompletedTag{ + { + Name: b(name), + Values: bs("a", "b", "c", name), }, - } + }, + Metadata: block.ResultMetadata{ + Exhaustive: false, + Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, + }, + } - store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). - Return(storeResult, nil) + store.EXPECT().CompleteTags(gomock.Any(), gomock.Eq(matcher), gomock.Any()). + Return(storeResult, nil) - router.HandleFunc(url, valueHandler.ServeHTTP) - router.ServeHTTP(rr, req) + router.HandleFunc(url, valueHandler.ServeHTTP) + router.ServeHTTP(rr, req) - read, err := ioutil.ReadAll(rr.Body) - require.NoError(t, err) + read, err := ioutil.ReadAll(rr.Body) + require.NoError(t, err) - ex := fmt.Sprintf(`{"status":"success","data":["a","b","c","%s"]}`, tt.name) - assert.Equal(t, ex, string(read)) + ex := fmt.Sprintf(`{"status":"success","data":["a","b","c","%s"]}`, name) + assert.Equal(t, ex, string(read)) - warning := rr.Header().Get(headers.LimitHeader) - exWarn := fmt.Sprintf("%s,foo_bar", headers.LimitHeaderSeriesLimitApplied) - assert.Equal(t, exWarn, warning) - } + warning := rr.Header().Get(headers.LimitHeader) + exWarn := fmt.Sprintf("%s,foo_bar", headers.LimitHeaderSeriesLimitApplied) + assert.Equal(t, exWarn, warning) } func TestTagValueErrors(t *testing.T) { diff --git a/src/query/parser/promql/matchers.go b/src/query/parser/promql/matchers.go index c25e618a61..dec2c38168 100644 --- a/src/query/parser/promql/matchers.go +++ b/src/query/parser/promql/matchers.go @@ -419,7 +419,6 @@ func LabelMatchersToModelMatcher( ) (models.Matchers, error) { matchers := make(models.Matchers, 0, len(lMatchers)) for _, m := range lMatchers { - // here. matchType, err := promTypeToM3(m.Type) if err != nil { return nil, err