From 7a024ed07408cfe3185b82005695b67c03286466 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Wed, 3 Feb 2021 18:23:10 +0100
Subject: [PATCH] Streaming filter tags + case insensitive lookups for Service
Names
Will fix:
* https://github.com/hashicorp/consul/issues/9695
* https://github.com/hashicorp/consul/issues/9702
---
.changelog/9703.txt | 3 +
agent/consul/state/catalog_events.go | 4 +-
agent/dns_test.go | 143 ++++++++++++++++-----------
agent/rpcclient/health/health.go | 58 ++++++++++-
4 files changed, 146 insertions(+), 62 deletions(-)
create mode 100644 .changelog/9703.txt
diff --git a/.changelog/9703.txt b/.changelog/9703.txt
new file mode 100644
index 000000000000..cde68e58b3d1
--- /dev/null
+++ b/.changelog/9703.txt
@@ -0,0 +1,3 @@
+```release-note:bug
+streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags
+```
diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go
index 526eca3549ee..855d4a3cc15b 100644
--- a/agent/consul/state/catalog_events.go
+++ b/agent/consul/state/catalog_events.go
@@ -1,6 +1,8 @@
package state
import (
+ "strings"
+
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
@@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key
}
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
- return (key == "" || key == name) && (namespace == "" || namespace == ns)
+ return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
diff --git a/agent/dns_test.go b/agent/dns_test.go
index fdc70118f248..e7506db0f90b 100644
--- a/agent/dns_test.go
+++ b/agent/dns_test.go
@@ -3016,72 +3016,95 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
}
t.Parallel()
- a := NewTestAgent(t, "")
- defer a.Shutdown()
- testrpc.WaitForLeader(t, a.RPC, "dc1")
+ tests := []struct {
+ name string
+ config string
+ }{
+ // UDP + EDNS
+ {"normal", ""},
+ {"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`},
+ {"cache-with-streaming", `
+ rpc{
+ enable_streaming=true
+ }
+ use_streaming_backend=true
+ dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}
+ `},
+ }
+ for _, tst := range tests {
+ t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) {
+ a := NewTestAgent(t, tst.config)
+ defer a.Shutdown()
+ testrpc.WaitForLeader(t, a.RPC, "dc1")
- // Register a node with a service.
- {
- args := &structs.RegisterRequest{
- Datacenter: "dc1",
- Node: "foo",
- Address: "127.0.0.1",
- Service: &structs.NodeService{
- Service: "Db",
- Tags: []string{"Primary"},
- Port: 12345,
- },
- }
+ // Register a node with a service.
+ {
+ args := &structs.RegisterRequest{
+ Datacenter: "dc1",
+ Node: "foo",
+ Address: "127.0.0.1",
+ Service: &structs.NodeService{
+ Service: "Db",
+ Tags: []string{"Primary"},
+ Port: 12345,
+ },
+ }
- var out struct{}
- if err := a.RPC("Catalog.Register", args, &out); err != nil {
- t.Fatalf("err: %v", err)
- }
- }
+ var out struct{}
+ if err := a.RPC("Catalog.Register", args, &out); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
- // Register an equivalent prepared query, as well as a name.
- var id string
- {
- args := &structs.PreparedQueryRequest{
- Datacenter: "dc1",
- Op: structs.PreparedQueryCreate,
- Query: &structs.PreparedQuery{
- Name: "somequery",
- Service: structs.ServiceQuery{
- Service: "db",
- },
- },
- }
- if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
- t.Fatalf("err: %v", err)
- }
- }
+ // Register an equivalent prepared query, as well as a name.
+ var id string
+ {
+ args := &structs.PreparedQueryRequest{
+ Datacenter: "dc1",
+ Op: structs.PreparedQueryCreate,
+ Query: &structs.PreparedQuery{
+ Name: "somequery",
+ Service: structs.ServiceQuery{
+ Service: "db",
+ },
+ },
+ }
+ if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
- // Try some variations to make sure case doesn't matter.
- questions := []string{
- "primary.db.service.consul.",
- "pRIMARY.dB.service.consul.",
- "PRIMARY.dB.service.consul.",
- "db.service.consul.",
- "DB.service.consul.",
- "Db.service.consul.",
- "somequery.query.consul.",
- "SomeQuery.query.consul.",
- "SOMEQUERY.query.consul.",
- }
- for _, question := range questions {
- m := new(dns.Msg)
- m.SetQuestion(question, dns.TypeSRV)
+ // Try some variations to make sure case doesn't matter.
+ questions := []string{
+ "primary.Db.service.consul.",
+ "primary.db.service.consul.",
+ "pRIMARY.dB.service.consul.",
+ "PRIMARY.dB.service.consul.",
+ "db.service.consul.",
+ "DB.service.consul.",
+ "Db.service.consul.",
+ "somequery.query.consul.",
+ "SomeQuery.query.consul.",
+ "SOMEQUERY.query.consul.",
+ }
- c := new(dns.Client)
- in, _, err := c.Exchange(m, a.DNSAddr())
- if err != nil {
- t.Fatalf("err: %v", err)
- }
+ for _, question := range questions {
+ m := new(dns.Msg)
+ m.SetQuestion(question, dns.TypeSRV)
- if len(in.Answer) != 1 {
- t.Fatalf("empty lookup: %#v", in)
- }
+ c := new(dns.Client)
+ retry.Run(t, func(r *retry.R) {
+ in, _, err := c.Exchange(m, a.DNSAddr())
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ if len(in.Answer) != 1 {
+ t.Fatalf("empty lookup: %#v", in)
+ }
+ })
+ }
+ })
}
}
diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go
index 0118a363cd9c..e2c6bd63e3a7 100644
--- a/agent/rpcclient/health/health.go
+++ b/agent/rpcclient/health/health.go
@@ -2,6 +2,7 @@ package health
import (
"context"
+ "strings"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
@@ -49,6 +50,7 @@ func (c *Client) getServiceNodes(
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
var out structs.IndexedCheckServiceNodes
+ req.ServiceName = strings.ToLower(req.ServiceName)
if !req.QueryOptions.UseCache {
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
return out, cache.ResultMeta{}, err
@@ -68,5 +70,59 @@ func (c *Client) getServiceNodes(
if !ok {
panic("wrong response type for cachetype.HealthServicesName")
}
- return *value, md, nil
+
+ return filterTags(value, req), md, nil
+}
+
+func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
+ if len(req.ServiceTags) == 0 || len(out.Nodes) == 0 {
+ return *out
+ }
+ tags := make([]string, 0, len(req.ServiceTags))
+ for _, r := range req.ServiceTags {
+ // DNS has the bad habit to setting [""] for ServiceTags
+ if r != "" {
+ tags = append(tags, strings.ToLower(r))
+ }
+ }
+ // No need to filter
+ if len(tags) == 0 {
+ return *out
+ }
+ results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
+ for _, service := range out.Nodes {
+ svc := service.Service
+ if !serviceTagsFilter(svc, tags) {
+ results = append(results, service)
+ }
+ }
+ out.Nodes = results
+ return *out
+}
+
+// serviceTagsFilter return true if service does not contains all the given tags
+func serviceTagsFilter(sn *structs.NodeService, tags []string) bool {
+ for _, tag := range tags {
+ if serviceTagFilter(sn, tag) {
+ // If any one of the expected tags was not found, filter the service
+ return true
+ }
+ }
+
+ // If all tags were found, don't filter the service
+ return false
+}
+
+// serviceTagFilter returns true (should filter) if the given service node
+// doesn't contain the given tag.
+func serviceTagFilter(sn *structs.NodeService, tag string) bool {
+ // Look for the lower cased version of the tag.
+ for _, t := range sn.Tags {
+ if strings.ToLower(t) == tag {
+ return false
+ }
+ }
+
+ // If we didn't hit the tag above then we should filter.
+ return true
}