Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming filter tags + case insensitive lookups for Service Names #9703

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/9703.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags and node-meta
```
95 changes: 87 additions & 8 deletions agent/cache-types/streaming_health_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"time"

"github.com/hashicorp/go-bexpr"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
return req
}

materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil {
return cache.FetchResult{}, err
}
Expand All @@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
func newMaterializer(
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
filter string,
req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) {
view, err := newHealthView(filter)
view, err := newHealthView(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
return result, err
}

func newHealthView(filterExpr string) (*healthView, error) {
fe, err := newFilterEvaluator(filterExpr)
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -192,11 +193,44 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}

func newFilterEvaluator(expr string) (filterEvaluator, error) {
if expr == "" {
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator

typ := reflect.TypeOf(structs.CheckServiceNode{})
if req.Filter != "" {
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}

if req.ServiceTag != "" {
// Handle backwards compat with old field
req.ServiceTags = []string{req.ServiceTag}
}

if req.TagFilter && len(req.ServiceTags) > 0 {
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
}

for key, value := range req.NodeMetaFilters {
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}

switch len(evaluators) {
case 0:
return noopFilterEvaluator{}, nil
case 1:
return evaluators[0], nil
default:
return &multiFilterEvaluator{evaluators: evaluators}, nil
}
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
}

// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
Expand All @@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}

type multiFilterEvaluator struct {
evaluators []filterEvaluator
}

func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
for _, e := range m.evaluators {
match, err := e.Evaluate(data)
if !match || err != nil {
return match, err
}
}
return true, nil
}

// sortCheckServiceNodes sorts the results to match memdb semantics
// Sort results by Node.Node, if 2 instances match, order by Service.ID
// Will allow result to be stable sorted and match queries without cache
Expand Down Expand Up @@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
func (s *healthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode)
}

// serviceTagEvaluator implements the filterEvaluator to perform filtering
// by service tags. bexpr can not be used at this time, because the filtering
// must be case insensitive for backwards compatibility. In the future this
// may be replaced with bexpr once case insensitive support is added.
type serviceTagEvaluator struct {
tags []string
}

func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
csn, ok := data.(structs.CheckServiceNode)
if !ok {
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
}
for _, tag := range m.tags {
if !serviceHasTag(csn.Service, tag) {
// If any one of the expected tags was not found, filter the service
return false, nil
}
}
return true, nil
}

func serviceHasTag(sn *structs.NodeService, tag string) bool {
for _, t := range sn.Tags {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure the answer is no, but do we ever need to handle the backcompat case where sn.Tag is set? I think not since this would have come from Consul's state store and every CSN from a version recent enough to support streaming would only use the plural field right?

Copy link
Contributor

@dnephin dnephin Feb 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if strings.EqualFold(t, tag) {
return true
}
}
return false
}
208 changes: 208 additions & 0 deletions agent/cache-types/streaming_health_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,211 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func TestNewFilterEvaluator(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
data structs.CheckServiceNode
expected bool
}

fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
}

var testCases = []testCase{
{
name: "single ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single deprecated ServiceTag match",
req: structs.ServiceSpecificRequest{
ServiceTag: "match",
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"other"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "multiple ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "second"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match", "second"},
},
},
expected: true,
},
{
name: "multiple ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "not"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "single NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"extra": "some",
},
},
},
expected: true,
},
{
name: "single NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"extra": "some",
},
},
},
expected: false,
},
{
name: "multiple NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"meta2": "a",
"extra": "some",
},
},
},
expected: true,
},
{
name: "multiple NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
"meta2": "beta",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"meta2": "gamma",
},
},
},
expected: false,
},
{
name: "QueryOptions.Filter match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: true,
},
{
name: "QueryOptions.Filter mismatch",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: false,
},
{
name: "all match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
ServiceTags: []string{"tag1", "tag2"},
NodeMetaFilters: map[string]string{
"meta1": "match1",
"meta2": "match2",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Node: "node3",
Meta: map[string]string{
"meta1": "match1",
"meta2": "match2",
"extra": "other",
},
},
Service: &structs.NodeService{
Tags: []string{"tag1", "tag2", "extra"},
},
},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
4 changes: 3 additions & 1 deletion agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"strings"

memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || key == name) && (namespace == "" || namespace == ns)
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand Down
6 changes: 5 additions & 1 deletion agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,12 +1171,16 @@ func (d *DNSServer) trimDNSResponse(cfg *dnsConfig, network string, req, resp *d

// lookupServiceNodes returns nodes with a given service.
func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (structs.IndexedCheckServiceNodes, error) {
serviceTags := []string{}
if lookup.Tag != "" {
serviceTags = []string{lookup.Tag}
}
args := structs.ServiceSpecificRequest{
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
ServiceName: lookup.Service,
ServiceTags: []string{lookup.Tag},
ServiceTags: serviceTags,
TagFilter: lookup.Tag != "",
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
Expand Down
Loading