Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming filter tags + case insensitive lookups for Service Names #9703

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/9703.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags
```
4 changes: 3 additions & 1 deletion agent/consul/state/catalog_events.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package state

import (
"strings"

memdb "github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || key == name) && (namespace == "" || namespace == ns)
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
}

// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
Expand Down
143 changes: 83 additions & 60 deletions agent/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3016,72 +3016,95 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
}

t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
tests := []struct {
name string
config string
}{
// UDP + EDNS
{"normal", ""},
{"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`},
{"cache-with-streaming", `
rpc{
enable_streaming=true
}
use_streaming_backend=true
dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}
`},
}
for _, tst := range tests {
t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) {
a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

// Register a node with a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "Db",
Tags: []string{"Primary"},
Port: 12345,
},
}
// Register a node with a service.
{
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "Db",
Tags: []string{"Primary"},
Port: 12345,
},
}

var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
}

// Register an equivalent prepared query, as well as a name.
var id string
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "somequery",
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Register an equivalent prepared query, as well as a name.
var id string
{
args := &structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "somequery",
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}

// Try some variations to make sure case doesn't matter.
questions := []string{
"primary.db.service.consul.",
"pRIMARY.dB.service.consul.",
"PRIMARY.dB.service.consul.",
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
// Try some variations to make sure case doesn't matter.
questions := []string{
"primary.Db.service.consul.",
"primary.db.service.consul.",
"pRIMARY.dB.service.consul.",
"PRIMARY.dB.service.consul.",
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}

c := new(dns.Client)
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)

if len(in.Answer) != 1 {
t.Fatalf("empty lookup: %#v", in)
}
c := new(dns.Client)
retry.Run(t, func(r *retry.R) {
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}

if len(in.Answer) != 1 {
t.Fatalf("empty lookup: %#v", in)
}
})
}
})
}
}

Expand Down
58 changes: 57 additions & 1 deletion agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package health

import (
"context"
"strings"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -49,6 +50,7 @@ func (c *Client) getServiceNodes(
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes

req.ServiceName = strings.ToLower(req.ServiceName)
dnephin marked this conversation as resolved.
Show resolved Hide resolved
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
Expand All @@ -68,5 +70,59 @@ func (c *Client) getServiceNodes(
if !ok {
panic("wrong response type for cachetype.HealthServicesName")
}
return *value, md, nil

return filterTags(value, req), md, nil
}

func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
if len(req.ServiceTags) == 0 || len(out.Nodes) == 0 {
return *out
}
tags := make([]string, 0, len(req.ServiceTags))
for _, r := range req.ServiceTags {
// DNS has the bad habit to setting [""] for ServiceTags
if r != "" {
tags = append(tags, strings.ToLower(r))
}
pierresouchay marked this conversation as resolved.
Show resolved Hide resolved
}
// No need to filter
if len(tags) == 0 {
return *out
}
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
for _, service := range out.Nodes {
svc := service.Service
if !serviceTagsFilter(svc, tags) {
results = append(results, service)
}
}
out.Nodes = results
return *out
}

// serviceTagsFilter return true if service does not contains all the given tags
func serviceTagsFilter(sn *structs.NodeService, tags []string) bool {
for _, tag := range tags {
if serviceTagFilter(sn, tag) {
// If any one of the expected tags was not found, filter the service
return true
}
}

// If all tags were found, don't filter the service
return false
}

// serviceTagFilter returns true (should filter) if the given service node
// doesn't contain the given tag.
func serviceTagFilter(sn *structs.NodeService, tag string) bool {
// Look for the lower cased version of the tag.
for _, t := range sn.Tags {
if strings.ToLower(t) == tag {
return false
}
}

// If we didn't hit the tag above then we should filter.
return true
}