Skip to content

Commit

Permalink
[Streaming] Properly filters node-meta queries on health
Browse files Browse the repository at this point in the history
This wil fix hashicorp#9730
  • Loading branch information
pierresouchay committed Feb 8, 2021
1 parent fbb47f0 commit 8034ec1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 40 deletions.
115 changes: 76 additions & 39 deletions agent/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strconv"
"testing"

"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -685,54 +686,90 @@ func TestHealthServiceNodes(t *testing.T) {

func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
tests := []struct {
name string
config string
}{
{"normal", ""},
{"cache-with-streaming", `
rpc{
enable_streaming=true
}
use_streaming_backend=true
`},
}
for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) {

assertIndex(t, resp)
a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")

// Should be a non-nil empty list
nodes := obj.(structs.CheckServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
assertIndex(t, resp)

var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64)
require.NoError(t, err)

req, _ = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be a non-nil empty list
nodes := obj.(structs.CheckServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}

assertIndex(t, resp)
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}

// Should be a non-nil empty list for checks
nodes = obj.(structs.CheckServiceNodes)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}

args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar2",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "othervalue"},
Service: &structs.NodeService{
ID: "test2",
Service: "test",
},
}

if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}

req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil)
resp = httptest.NewRecorder()
obj, err = a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}

assertIndex(t, resp)

// Should be a non-nil empty list for checks
nodes = obj.(structs.CheckServiceNodes)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}
})
}
}

Expand Down
17 changes: 16 additions & 1 deletion agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Client) getServiceNodes(
panic("wrong response type for cachetype.HealthServicesName")
}

return filterTags(value, req), md, nil
return filterTags(filterNodeMeta(value, req), req), md, nil
}

func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
Expand Down Expand Up @@ -126,3 +126,18 @@ func serviceTagFilter(sn *structs.NodeService, tag string) bool {
// If we didn't hit the tag above then we should filter.
return true
}

func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes {
if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 {
return out
}
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
for _, service := range out.Nodes {
serviceNode := service.Node
if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) {
results = append(results, service)
}
}
out.Nodes = results
return out
}

0 comments on commit 8034ec1

Please sign in to comment.