Skip to content

Commit

Permalink
Merge pull request #9703 from pierresouchay/streaming_tags_and_case_i…
Browse files Browse the repository at this point in the history
…nsensitive

Streaming filter tags + case insensitive lookups for Service Names
  • Loading branch information
dnephin authored Feb 26, 2021
2 parents b678eab + d5cc206 commit 5c8a631
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 110 deletions.
3 changes: 3 additions & 0 deletions .changelog/9703.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags and node-meta
```
95 changes: 87 additions & 8 deletions agent/cache-types/streaming_health_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"reflect"
"sort"
"strings"
"time"

"github.com/hashicorp/go-bexpr"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
return req
}

materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil {
return cache.FetchResult{}, err
}
Expand All @@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
func newMaterializer(
deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
filter string,
req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) {
view, err := newHealthView(filter)
view, err := newHealthView(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
return result, err
}

func newHealthView(filterExpr string) (*healthView, error) {
fe, err := newFilterEvaluator(filterExpr)
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -192,11 +193,44 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}

func newFilterEvaluator(expr string) (filterEvaluator, error) {
if expr == "" {
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
var evaluators []filterEvaluator

typ := reflect.TypeOf(structs.CheckServiceNode{})
if req.Filter != "" {
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}

if req.ServiceTag != "" {
// Handle backwards compat with old field
req.ServiceTags = []string{req.ServiceTag}
}

if req.TagFilter && len(req.ServiceTags) > 0 {
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
}

for key, value := range req.NodeMetaFilters {
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}

switch len(evaluators) {
case 0:
return noopFilterEvaluator{}, nil
case 1:
return evaluators[0], nil
default:
return &multiFilterEvaluator{evaluators: evaluators}, nil
}
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
}

// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
Expand All @@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil
}

type multiFilterEvaluator struct {
evaluators []filterEvaluator
}

func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
for _, e := range m.evaluators {
match, err := e.Evaluate(data)
if !match || err != nil {
return match, err
}
}
return true, nil
}

// sortCheckServiceNodes sorts the results to match memdb semantics
// Sort results by Node.Node, if 2 instances match, order by Service.ID
// Will allow result to be stable sorted and match queries without cache
Expand Down Expand Up @@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
func (s *healthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode)
}

// serviceTagEvaluator implements the filterEvaluator to perform filtering
// by service tags. bexpr can not be used at this time, because the filtering
// must be case insensitive for backwards compatibility. In the future this
// may be replaced with bexpr once case insensitive support is added.
type serviceTagEvaluator struct {
tags []string
}

func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
csn, ok := data.(structs.CheckServiceNode)
if !ok {
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
}
for _, tag := range m.tags {
if !serviceHasTag(csn.Service, tag) {
// If any one of the expected tags was not found, filter the service
return false, nil
}
}
return true, nil
}

func serviceHasTag(sn *structs.NodeService, tag string) bool {
for _, t := range sn.Tags {
if strings.EqualFold(t, tag) {
return true
}
}
return false
}
208 changes: 208 additions & 0 deletions agent/cache-types/streaming_health_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,211 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func TestNewFilterEvaluator(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
data structs.CheckServiceNode
expected bool
}

fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
}

var testCases = []testCase{
{
name: "single ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single deprecated ServiceTag match",
req: structs.ServiceSpecificRequest{
ServiceTag: "match",
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"other"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "multiple ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "second"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match", "second"},
},
},
expected: true,
},
{
name: "multiple ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "not"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "single NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"extra": "some",
},
},
},
expected: true,
},
{
name: "single NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"extra": "some",
},
},
},
expected: false,
},
{
name: "multiple NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"meta2": "a",
"extra": "some",
},
},
},
expected: true,
},
{
name: "multiple NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
"meta2": "beta",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"meta2": "gamma",
},
},
},
expected: false,
},
{
name: "QueryOptions.Filter match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: true,
},
{
name: "QueryOptions.Filter mismatch",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: false,
},
{
name: "all match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
ServiceTags: []string{"tag1", "tag2"},
NodeMetaFilters: map[string]string{
"meta1": "match1",
"meta2": "match2",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Node: "node3",
Meta: map[string]string{
"meta1": "match1",
"meta2": "match2",
"extra": "other",
},
},
Service: &structs.NodeService{
Tags: []string{"tag1", "tag2", "extra"},
},
},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
4 changes: 3 additions & 1 deletion agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"strings"

memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || key == name) && (namespace == "" || namespace == ns)
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand Down
6 changes: 5 additions & 1 deletion agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,12 +1171,16 @@ func (d *DNSServer) trimDNSResponse(cfg *dnsConfig, network string, req, resp *d

// lookupServiceNodes returns nodes with a given service.
func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (structs.IndexedCheckServiceNodes, error) {
serviceTags := []string{}
if lookup.Tag != "" {
serviceTags = []string{lookup.Tag}
}
args := structs.ServiceSpecificRequest{
Connect: lookup.Connect,
Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter,
ServiceName: lookup.Service,
ServiceTags: []string{lookup.Tag},
ServiceTags: serviceTags,
TagFilter: lookup.Tag != "",
QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(),
Expand Down
Loading

0 comments on commit 5c8a631

Please sign in to comment.