Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: enable selecting subset of services using rendezvous hashing #12862

Merged
merged 2 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12862.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
api: enable setting ?choose parameter when querying services
shoenig marked this conversation as resolved.
Show resolved Hide resolved
```
5 changes: 4 additions & 1 deletion command/agent/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h
func (s *HTTPServer) serviceGetRequest(
resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) {

args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName}
args := structs.ServiceRegistrationByNameRequest{
ServiceName: serviceName,
Choose: req.URL.Query().Get("choose"),
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
Expand Down
105 changes: 98 additions & 7 deletions command/agent/service_registration_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -157,6 +158,7 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
name string
}{
{
name: "delete by ID",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand Down Expand Up @@ -186,9 +188,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Nil(t, out)
require.NoError(t, err)
},
name: "delete by ID",
},
{
name: "get service by name",
testFn: func(s *TestAgent) {

// Grab the state, so we can manipulate it and test against it.
Expand All @@ -214,9 +216,99 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.NotZero(t, respW.Header().Get("X-Nomad-Index"))
require.Equal(t, serviceReg, obj.([]*structs.ServiceRegistration)[0])
},
name: "get service by name",
},
{
name: "get service using choose",
testFn: func(s *TestAgent) {
// Grab the state so we can manipulate and test against it.
testState := s.Agent.server.State()

err := testState.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, 10,
[]*structs.ServiceRegistration{{
ID: "978d519a-46ad-fb04-966b-000000000001",
ServiceName: "redis",
Namespace: "default",
NodeID: "node1",
Datacenter: "dc1",
JobID: "job1",
AllocID: "8b83191f-cb29-e23a-d955-220b65ef676d",
Tags: nil,
Address: "10.0.0.1",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000002",
ServiceName: "redis",
Namespace: "default",
NodeID: "node2",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.2",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}, {
ID: "978d519a-46ad-fb04-966b-000000000003",
ServiceName: "redis",
Namespace: "default",
NodeID: "node3",
Datacenter: "dc1",
JobID: "job1",
AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078",
Tags: nil,
Address: "10.0.0.3",
Port: 8080,
CreateIndex: 10,
ModifyIndex: 10,
}},
)
must.NoError(t, err)

// Build the HTTP request for 1 instance of the service, using key=abc123
req, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=1|abc123", nil)
must.NoError(t, err)
respW := httptest.NewRecorder()

// Send the HTTP request.
obj, err := s.Server.ServiceRegistrationRequest(respW, req)
must.NoError(t, err)

// Check we got the correct type back.
services, ok := (obj).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 1, services)

// Build the HTTP request for 2 instances of the service, still using key=abc123
req2, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=2|abc123", nil)
must.NoError(t, err)
respW2 := httptest.NewRecorder()

// Send the 2nd HTTP request.
obj2, err := s.Server.ServiceRegistrationRequest(respW2, req2)
must.NoError(t, err)

// Check we got the correct type back.
services2, ok := (obj2).([]*structs.ServiceRegistration)
must.True(t, ok)

// Check we got the expected number of services back.
must.Len(t, 2, services2)

// Check the first service is the same as the previous service.
must.Eq(t, services[0], services2[0])

// Check the second service is not the same as the first service.
must.NotEq(t, services2[0], services2[1])
},
},
{
name: "incorrect URI format",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -230,9 +322,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "invalid URI")
require.Nil(t, obj)
},
name: "incorrect URI format",
},
{
name: "get service empty name",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -246,9 +338,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service name")
require.Nil(t, obj)
},
name: "get service empty name",
},
{
name: "get service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -262,9 +354,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "get service incorrect method",
},
{
name: "delete service empty id",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -278,9 +370,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "missing service id")
require.Nil(t, obj)
},
name: "delete service empty id",
},
{
name: "delete service incorrect method",
testFn: func(s *TestAgent) {

// Build the HTTP request.
Expand All @@ -294,7 +386,6 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) {
require.Contains(t, err.Error(), "Invalid method")
require.Nil(t, obj)
},
name: "delete service incorrect method",
},
}

Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/gosuri/uilive v0.0.4
github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39
github.com/hashicorp/consul v1.7.8
github.com/hashicorp/consul-template v0.29.0
github.com/hashicorp/consul-template v0.29.1
github.com/hashicorp/consul/api v1.13.0
github.com/hashicorp/consul/sdk v0.8.0
github.com/hashicorp/cronexpr v1.1.1
Expand Down Expand Up @@ -117,7 +117,7 @@ require (
github.com/zclconf/go-cty-yaml v1.0.2
go.etcd.io/bbolt v1.3.5
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220609121020-a51bd0440498
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down Expand Up @@ -210,6 +210,7 @@ require (
github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1 // indirect
github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 // indirect
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 // indirect
github.com/huandu/xstrings v1.3.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul v1.7.8 h1:hp308KxAf3zWoGuwp2e+0UUhrm6qHjeBQk3jCZ+bjcY=
github.com/hashicorp/consul v1.7.8/go.mod h1:urbfGaVZDmnXC6geg0LYPh/SRUk1E8nfmDHpz+Q0nLw=
github.com/hashicorp/consul-template v0.29.0 h1:rDmF3Wjqp5ztCq054MruzEpi9ArcyJ/Rp4eWrDhMldM=
github.com/hashicorp/consul-template v0.29.0/go.mod h1:p1A8Z6Mz7gbXu38SI1c9nt5ItBK7ACWZG4ZE1A5Tr2M=
github.com/hashicorp/consul-template v0.29.1 h1:icm/H7klHYlxpUoWqSmTIWaSLEfGqUJJBsZA/2JhTLU=
github.com/hashicorp/consul-template v0.29.1/go.mod h1:QIohwBuXlKXtsmGGQdWrISlUy4E6LFg5tLZyrw4MyoU=
github.com/hashicorp/consul/api v1.4.0/go.mod h1:xc8u05kyMa3Wjr9eEAsIAo3dg8+LywT5E/Cl7cNS5nU=
github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc=
github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ=
Expand Down Expand Up @@ -801,9 +801,13 @@ github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT
github.com/hashicorp/serf v0.9.7 h1:hkdgbqizGQHuU5IPqYM1JdSMV8nKfpuOnZYXssk9muY=
github.com/hashicorp/serf v0.9.7/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q=
github.com/hashicorp/vault/api v1.3.0/go.mod h1:EabNQLI0VWbWoGlA+oBLC8PXmR9D60aUVgQGvangFWQ=
github.com/hashicorp/vault/api v1.4.1 h1:mWLfPT0RhxBitjKr6swieCEP2v5pp/M//t70S3kMLRo=
github.com/hashicorp/vault/api v1.4.1/go.mod h1:LkMdrZnWNrFaQyYYazWVn7KshilfDidgVBq6YiTq/bM=
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 h1:6BtyahbF4aQp8gg3ww0A/oIoqzbhpNP1spXU3nHE0n0=
github.com/hashicorp/vault/api/auth/kubernetes v0.1.0/go.mod h1:Pdgk78uIs0mgDOLvc3a+h/vYIT9rznw2sz+ucuH9024=
github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M=
github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
github.com/hashicorp/vault/sdk v0.4.1 h1:3SaHOJY687jY1fnB61PtL0cOkKItphrbLmux7T92HBo=
github.com/hashicorp/vault/sdk v0.4.1/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0=
github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 h1:O/pT5C1Q3mVXMyuqg7yuAWUg/jMZR1/0QTzTRdNR6Uw=
Expand Down Expand Up @@ -1329,8 +1333,9 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167 h1:O8uGbHCqlTp2P6QJSLmCojM4mN6UemYv8K+dCnmHmu0=
golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down
79 changes: 78 additions & 1 deletion nomad/service_registration_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package nomad

import (
"net/http"
"sort"
"strconv"
"strings"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService(
http.StatusBadRequest, "failed to read result page: %v", err)
}

// Select which subset and the order of services to return if using ?choose
if args.Choose != "" {
chosen, chooseErr := s.choose(services, args.Choose)
if chooseErr != nil {
return structs.NewErrRPCCodedf(
http.StatusBadRequest, "failed to choose services: %v", chooseErr)
}
services = chosen
}

// Populate the reply.
reply.Services = services
reply.NextToken = nextToken
Expand All @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService(
})
}

// choose uses rendezvous hashing to make a stable selection of a subset of services
// to return.
//
// parameter must in the form "<number>|<key>", where number is the number of services
// to select, and key is incorporated in the hashing function with each service -
// creating a unique yet consistent priority distribution pertaining to the requester.
// In practice (i.e. via consul-template), the key is the AllocID generating a request
// for upstream services.
//
// https://en.wikipedia.org/wiki/Rendezvous_hashing
// w := priority (i.e. hash value)
// h := hash function
// O := object - (i.e. requesting service - using key (allocID) as a proxy)
// S := site (i.e. destination service)
func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) {
// extract the number of services
tokens := strings.SplitN(parameter, "|", 2)
if len(tokens) != 2 {
return nil, structs.ErrMalformedChooseParameter
}
n, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, structs.ErrMalformedChooseParameter
}
Comment on lines +471 to +473
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I already know the answer to this, but would it be worth logging the error content for cluster operators so this is available somewhere to lookup alongside the generic error return message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the only intended user is template/consul-template which will always be correct, though I could see the benefit for someone writing their own API client. If we do start logging RPC failures though we should probably do it at all the err points for consistency


// extract the hash key
key := tokens[1]
if key == "" {
return nil, structs.ErrMalformedChooseParameter
}

// if there are fewer services than requested, go with the number of services
if l := len(services); l < n {
n = l
}

type pair struct {
hash string
service *structs.ServiceRegistration
}

// associate hash for each service
priorities := make([]*pair, len(services))
for i, service := range services {
priorities[i] = &pair{
hash: service.HashWith(key),
service: service,
}
}

// sort by the hash; creating random distribution of priority
sort.SliceStable(priorities, func(i, j int) bool {
return priorities[i].hash < priorities[j].hash
})

// choose top n services
chosen := make([]*structs.ServiceRegistration, n)
for i := 0; i < n; i++ {
chosen[i] = priorities[i].service
}

return chosen, nil
}

// handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can
// either be called by Nomad nodes, or by external clients.
func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error {
Expand All @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions,
}
}
default:
// In the event we got any error other than notfound, consider this
// In the event we got any error other than ErrTokenNotFound, consider this
// terminal.
if err != structs.ErrTokenNotFound {
return err
Expand Down
Loading