Skip to content

Commit

Permalink
Streaming filter tags + case insensitive lookups for Service Names
Browse files Browse the repository at this point in the history
Will fix:
 * #9695
 * #9702
  • Loading branch information
pierresouchay committed Feb 4, 2021
1 parent 0d1301c commit 7a024ed
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 62 deletions.
3 changes: 3 additions & 0 deletions .changelog/9703.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags
```
4 changes: 3 additions & 1 deletion agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"strings"

memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || key == name) && (namespace == "" || namespace == ns)
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand Down
143 changes: 83 additions & 60 deletions agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3016,72 +3016,95 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
}

t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
tests := []struct {
name string
config string
}{
// UDP + EDNS
{"normal", ""},
{"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`},
{"cache-with-streaming", `
rpc{
enable_streaming=true
}
use_streaming_backend=true
dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}
`},
}
for _, tst := range tests {
t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) {
a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

// Register a node with a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "Db",
Tags: []string{"Primary"},
Port: 12345,
},
}
// Register a node with a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "Db",
Tags: []string{"Primary"},
Port: 12345,
},
}

var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Register an equivalent prepared query, as well as a name.
var id string
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "somequery",
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Register an equivalent prepared query, as well as a name.
var id string
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "somequery",
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}

// Try some variations to make sure case doesn't matter.
questions := []string{
"primary.db.service.consul.",
"pRIMARY.dB.service.consul.",
"PRIMARY.dB.service.consul.",
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
// Try some variations to make sure case doesn't matter.
questions := []string{
"primary.Db.service.consul.",
"primary.db.service.consul.",
"pRIMARY.dB.service.consul.",
"PRIMARY.dB.service.consul.",
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}

c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)

if len(in.Answer) != 1 {
t.Fatalf("empty lookup: %#v", in)
}
c := new(dns.Client)
retry.Run(t, func(r *retry.R) {
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("empty lookup: %#v", in)
}
})
}
})
}
}

Expand Down
58 changes: 57 additions & 1 deletion agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package health

import (
"context"
"strings"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -49,6 +50,7 @@ func (c *Client) getServiceNodes(
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes

req.ServiceName = strings.ToLower(req.ServiceName)
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
Expand All @@ -68,5 +70,59 @@ func (c *Client) getServiceNodes(
if !ok {
panic("wrong response type for cachetype.HealthServicesName")
}
return *value, md, nil

return filterTags(value, req), md, nil
}

func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
if len(req.ServiceTags) == 0 || len(out.Nodes) == 0 {
return *out
}
tags := make([]string, 0, len(req.ServiceTags))
for _, r := range req.ServiceTags {
// DNS has the bad habit to setting [""] for ServiceTags
if r != "" {
tags = append(tags, strings.ToLower(r))
}
}
// No need to filter
if len(tags) == 0 {
return *out
}
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
for _, service := range out.Nodes {
svc := service.Service
if !serviceTagsFilter(svc, tags) {
results = append(results, service)
}
}
out.Nodes = results
return *out
}

// serviceTagsFilter return true if service does not contains all the given tags
func serviceTagsFilter(sn *structs.NodeService, tags []string) bool {
for _, tag := range tags {
if serviceTagFilter(sn, tag) {
// If any one of the expected tags was not found, filter the service
return true
}
}

// If all tags were found, don't filter the service
return false
}

// serviceTagFilter returns true (should filter) if the given service node
// doesn't contain the given tag.
func serviceTagFilter(sn *structs.NodeService, tag string) bool {
// Look for the lower cased version of the tag.
for _, t := range sn.Tags {
if strings.ToLower(t) == tag {
return false
}
}

// If we didn't hit the tag above then we should filter.
return true
}

0 comments on commit 7a024ed

Please sign in to comment.