From bdead318632dea05177bec4b7b8dfa385eebbfa0 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Tue, 3 May 2022 15:21:12 -0500 Subject: [PATCH 1/2] api: enable selecting subset of services using rendezvous hashing This PR adds the 'choose' query parameter to the '/v1/service/' endpoint. The value of 'choose' is in the form '|', number is the number of desired services and key is a value unique but consistent to the requester (e.g. allocID). Folks aren't really expected to use this API directly, but rather through consul-template which will soon be getting a new helper function making use of this query parameter. Example, curl 'localhost:4646/v1/service/redis?choose=2|abc123' Note: consul-templte v0.29.1 includes the necessary nomadServices functionality. --- .changelog/12862.txt | 3 + .../agent/service_registration_endpoint.go | 5 +- .../service_registration_endpoint_test.go | 105 ++++++++- go.mod | 5 +- go.sum | 11 +- nomad/service_registration_endpoint.go | 79 ++++++- nomad/service_registration_endpoint_test.go | 221 +++++++++++++++++- nomad/structs/errors.go | 2 + nomad/structs/service_registration.go | 22 ++ nomad/structs/service_registration_test.go | 25 ++ website/content/api-docs/services.mdx | 7 + 11 files changed, 470 insertions(+), 15 deletions(-) create mode 100644 .changelog/12862.txt diff --git a/.changelog/12862.txt b/.changelog/12862.txt new file mode 100644 index 00000000000..182a8b42dfd --- /dev/null +++ b/.changelog/12862.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: enable setting ?choose parameter when querying services +``` diff --git a/command/agent/service_registration_endpoint.go b/command/agent/service_registration_endpoint.go index 9107f800697..3d6151bea83 100644 --- a/command/agent/service_registration_endpoint.go +++ b/command/agent/service_registration_endpoint.go @@ -90,7 +90,10 @@ func (s *HTTPServer) ServiceRegistrationRequest(resp http.ResponseWriter, req *h func (s *HTTPServer) serviceGetRequest( resp http.ResponseWriter, req *http.Request, serviceName string) (interface{}, error) { - args := structs.ServiceRegistrationByNameRequest{ServiceName: serviceName} + args := structs.ServiceRegistrationByNameRequest{ + ServiceName: serviceName, + Choose: req.URL.Query().Get("choose"), + } if s.parse(resp, req, &args.Region, &args.QueryOptions) { return nil, nil } diff --git a/command/agent/service_registration_endpoint_test.go b/command/agent/service_registration_endpoint_test.go index d6e6954377f..489c6fa983b 100644 --- a/command/agent/service_registration_endpoint_test.go +++ b/command/agent/service_registration_endpoint_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -157,6 +158,7 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { name string }{ { + name: "delete by ID", testFn: func(s *TestAgent) { // Grab the state, so we can manipulate it and test against it. @@ -186,9 +188,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Nil(t, out) require.NoError(t, err) }, - name: "delete by ID", }, { + name: "get service by name", testFn: func(s *TestAgent) { // Grab the state, so we can manipulate it and test against it. @@ -214,9 +216,99 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.NotZero(t, respW.Header().Get("X-Nomad-Index")) require.Equal(t, serviceReg, obj.([]*structs.ServiceRegistration)[0]) }, - name: "get service by name", }, { + name: "get service using choose", + testFn: func(s *TestAgent) { + // Grab the state so we can manipulate and test against it. + testState := s.Agent.server.State() + + err := testState.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, 10, + []*structs.ServiceRegistration{{ + ID: "978d519a-46ad-fb04-966b-000000000001", + ServiceName: "redis", + Namespace: "default", + NodeID: "node1", + Datacenter: "dc1", + JobID: "job1", + AllocID: "8b83191f-cb29-e23a-d955-220b65ef676d", + Tags: nil, + Address: "10.0.0.1", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }, { + ID: "978d519a-46ad-fb04-966b-000000000002", + ServiceName: "redis", + Namespace: "default", + NodeID: "node2", + Datacenter: "dc1", + JobID: "job1", + AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078", + Tags: nil, + Address: "10.0.0.2", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }, { + ID: "978d519a-46ad-fb04-966b-000000000003", + ServiceName: "redis", + Namespace: "default", + NodeID: "node3", + Datacenter: "dc1", + JobID: "job1", + AllocID: "df6de93c-9376-a774-bcdf-3bd817e18078", + Tags: nil, + Address: "10.0.0.3", + Port: 8080, + CreateIndex: 10, + ModifyIndex: 10, + }}, + ) + must.NoError(t, err) + + // Build the HTTP request for 1 instance of the service, using key=abc123 + req, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=1|abc123", nil) + must.NoError(t, err) + respW := httptest.NewRecorder() + + // Send the HTTP request. + obj, err := s.Server.ServiceRegistrationRequest(respW, req) + must.NoError(t, err) + + // Check we got the correct type back. + services, ok := (obj).([]*structs.ServiceRegistration) + must.True(t, ok) + + // Check we got the expected number of services back. + must.Len(t, 1, services) + + // Build the HTTP request for 2 instances of the service, still using key=abc123 + req2, err := http.NewRequest(http.MethodGet, "/v1/service/redis?choose=2|abc123", nil) + must.NoError(t, err) + respW2 := httptest.NewRecorder() + + // Send the 2nd HTTP request. + obj2, err := s.Server.ServiceRegistrationRequest(respW2, req2) + must.NoError(t, err) + + // Check we got the correct type back. + services2, ok := (obj2).([]*structs.ServiceRegistration) + must.True(t, ok) + + // Check we got the expected number of services back. + must.Len(t, 2, services2) + + // Check the first service is the same as the previous service. + must.Eq(t, services[0], services2[0]) + + // Check the second service is not the same as the first service. + must.NotEq(t, services2[0], services2[1]) + }, + }, + { + name: "incorrect URI format", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -230,9 +322,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "invalid URI") require.Nil(t, obj) }, - name: "incorrect URI format", }, { + name: "get service empty name", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -246,9 +338,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "missing service name") require.Nil(t, obj) }, - name: "get service empty name", }, { + name: "get service incorrect method", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -262,9 +354,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "Invalid method") require.Nil(t, obj) }, - name: "get service incorrect method", }, { + name: "delete service empty id", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -278,9 +370,9 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "missing service id") require.Nil(t, obj) }, - name: "delete service empty id", }, { + name: "delete service incorrect method", testFn: func(s *TestAgent) { // Build the HTTP request. @@ -294,7 +386,6 @@ func TestHTTPServer_ServiceRegistrationRequest(t *testing.T) { require.Contains(t, err.Error(), "Invalid method") require.Nil(t, obj) }, - name: "delete service incorrect method", }, } diff --git a/go.mod b/go.mod index 571c0fa69d0..07da08d9248 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/gosuri/uilive v0.0.4 github.com/grpc-ecosystem/go-grpc-middleware v1.2.1-0.20200228141219-3ce3d519df39 github.com/hashicorp/consul v1.7.8 - github.com/hashicorp/consul-template v0.29.0 + github.com/hashicorp/consul-template v0.29.1 github.com/hashicorp/consul/api v1.13.0 github.com/hashicorp/consul/sdk v0.8.0 github.com/hashicorp/cronexpr v1.1.1 @@ -117,7 +117,7 @@ require ( github.com/zclconf/go-cty-yaml v1.0.2 go.etcd.io/bbolt v1.3.5 go.uber.org/goleak v1.1.12 - golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167 + golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d golang.org/x/exp v0.0.0-20220609121020-a51bd0440498 golang.org/x/net v0.0.0-20220225172249-27dd8689420f golang.org/x/sync v0.0.0-20210220032951-036812b2e83c @@ -210,6 +210,7 @@ require ( github.com/hashicorp/go-secure-stdlib/reloadutil v0.1.1 // indirect github.com/hashicorp/go-secure-stdlib/tlsutil v0.1.1 // indirect github.com/hashicorp/mdns v1.0.4 // indirect + github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 // indirect github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 // indirect github.com/huandu/xstrings v1.3.2 // indirect github.com/imdario/mergo v0.3.12 // indirect diff --git a/go.sum b/go.sum index d2eab222ec4..cf7ba380363 100644 --- a/go.sum +++ b/go.sum @@ -655,8 +655,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul v1.7.8 h1:hp308KxAf3zWoGuwp2e+0UUhrm6qHjeBQk3jCZ+bjcY= github.com/hashicorp/consul v1.7.8/go.mod h1:urbfGaVZDmnXC6geg0LYPh/SRUk1E8nfmDHpz+Q0nLw= -github.com/hashicorp/consul-template v0.29.0 h1:rDmF3Wjqp5ztCq054MruzEpi9ArcyJ/Rp4eWrDhMldM= -github.com/hashicorp/consul-template v0.29.0/go.mod h1:p1A8Z6Mz7gbXu38SI1c9nt5ItBK7ACWZG4ZE1A5Tr2M= +github.com/hashicorp/consul-template v0.29.1 h1:icm/H7klHYlxpUoWqSmTIWaSLEfGqUJJBsZA/2JhTLU= +github.com/hashicorp/consul-template v0.29.1/go.mod h1:QIohwBuXlKXtsmGGQdWrISlUy4E6LFg5tLZyrw4MyoU= github.com/hashicorp/consul/api v1.4.0/go.mod h1:xc8u05kyMa3Wjr9eEAsIAo3dg8+LywT5E/Cl7cNS5nU= github.com/hashicorp/consul/api v1.13.0 h1:2hnLQ0GjQvw7f3O61jMO8gbasZviZTrt9R8WzgiirHc= github.com/hashicorp/consul/api v1.13.0/go.mod h1:ZlVrynguJKcYr54zGaDbaL3fOvKC9m72FhPvA8T35KQ= @@ -801,9 +801,13 @@ github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpT github.com/hashicorp/serf v0.9.7 h1:hkdgbqizGQHuU5IPqYM1JdSMV8nKfpuOnZYXssk9muY= github.com/hashicorp/serf v0.9.7/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= github.com/hashicorp/vault/api v1.0.4/go.mod h1:gDcqh3WGcR1cpF5AJz/B1UFheUEneMoIospckxBxk6Q= +github.com/hashicorp/vault/api v1.3.0/go.mod h1:EabNQLI0VWbWoGlA+oBLC8PXmR9D60aUVgQGvangFWQ= github.com/hashicorp/vault/api v1.4.1 h1:mWLfPT0RhxBitjKr6swieCEP2v5pp/M//t70S3kMLRo= github.com/hashicorp/vault/api v1.4.1/go.mod h1:LkMdrZnWNrFaQyYYazWVn7KshilfDidgVBq6YiTq/bM= +github.com/hashicorp/vault/api/auth/kubernetes v0.1.0 h1:6BtyahbF4aQp8gg3ww0A/oIoqzbhpNP1spXU3nHE0n0= +github.com/hashicorp/vault/api/auth/kubernetes v0.1.0/go.mod h1:Pdgk78uIs0mgDOLvc3a+h/vYIT9rznw2sz+ucuH9024= github.com/hashicorp/vault/sdk v0.1.13/go.mod h1:B+hVj7TpuQY1Y/GPbCpffmgd+tSEwvhkWnjtSYCaS2M= +github.com/hashicorp/vault/sdk v0.3.0/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0= github.com/hashicorp/vault/sdk v0.4.1 h1:3SaHOJY687jY1fnB61PtL0cOkKItphrbLmux7T92HBo= github.com/hashicorp/vault/sdk v0.4.1/go.mod h1:aZ3fNuL5VNydQk8GcLJ2TV8YCRVvyaakYkhZRoVuhj0= github.com/hashicorp/vic v1.5.1-0.20190403131502-bbfe86ec9443 h1:O/pT5C1Q3mVXMyuqg7yuAWUg/jMZR1/0QTzTRdNR6Uw= @@ -1329,8 +1333,9 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167 h1:O8uGbHCqlTp2P6QJSLmCojM4mN6UemYv8K+dCnmHmu0= golang.org/x/crypto v0.0.0-20220517005047-85d78b3ac167/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= diff --git a/nomad/service_registration_endpoint.go b/nomad/service_registration_endpoint.go index 3684fc46074..f417a8c8518 100644 --- a/nomad/service_registration_endpoint.go +++ b/nomad/service_registration_endpoint.go @@ -2,6 +2,9 @@ package nomad import ( "net/http" + "sort" + "strconv" + "strings" "time" "github.com/armon/go-metrics" @@ -423,6 +426,16 @@ func (s *ServiceRegistration) GetService( http.StatusBadRequest, "failed to read result page: %v", err) } + // Select which subset and the order of services to return if using ?choose + if args.Choose != "" { + chosen, chooseErr := s.choose(services, args.Choose) + if chooseErr != nil { + return structs.NewErrRPCCodedf( + http.StatusBadRequest, "failed to choose services: %v", chooseErr) + } + services = chosen + } + // Populate the reply. reply.Services = services reply.NextToken = nextToken @@ -434,6 +447,70 @@ func (s *ServiceRegistration) GetService( }) } +// choose uses rendezvous hashing to make a stable selection of a subset of services +// to return. +// +// parameter must in the form "|", where number is the number of services +// to select, and key is incorporated in the hashing function with each service - +// creating a unique yet consistent priority distribution pertaining to the requester. +// In practice (i.e. via consul-template), the key is the AllocID generating a request +// for upstream services. +// +// https://en.wikipedia.org/wiki/Rendezvous_hashing +// w := priority (i.e. hash value) +// h := hash function +// O := object - (i.e. requesting service - using key (allocID) as a proxy) +// S := site (i.e. destination service) +func (*ServiceRegistration) choose(services []*structs.ServiceRegistration, parameter string) ([]*structs.ServiceRegistration, error) { + // extract the number of services + tokens := strings.SplitN(parameter, "|", 2) + if len(tokens) != 2 { + return nil, structs.ErrMalformedChooseParameter + } + n, err := strconv.Atoi(tokens[0]) + if err != nil { + return nil, structs.ErrMalformedChooseParameter + } + + // extract the hash key + key := tokens[1] + if key == "" { + return nil, structs.ErrMalformedChooseParameter + } + + // if there are fewer services than requested, go with the number of services + if l := len(services); l < n { + n = l + } + + type pair struct { + hash string + service *structs.ServiceRegistration + } + + // associate hash for each service + priorities := make([]*pair, len(services)) + for i, service := range services { + priorities[i] = &pair{ + hash: service.HashWith(key), + service: service, + } + } + + // sort by the hash; creating random distribution of priority + sort.SliceStable(priorities, func(i, j int) bool { + return priorities[i].hash < priorities[j].hash + }) + + // choose top n services + chosen := make([]*structs.ServiceRegistration, n) + for i := 0; i < n; i++ { + chosen[i] = priorities[i].service + } + + return chosen, nil +} + // handleMixedAuthEndpoint is a helper to handle auth on RPC endpoints that can // either be called by Nomad nodes, or by external clients. func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, cap string) error { @@ -451,7 +528,7 @@ func (s *ServiceRegistration) handleMixedAuthEndpoint(args structs.QueryOptions, } } default: - // In the event we got any error other than notfound, consider this + // In the event we got any error other than ErrTokenNotFound, consider this // terminal. if err != structs.ErrTokenNotFound { return err diff --git a/nomad/service_registration_endpoint_test.go b/nomad/service_registration_endpoint_test.go index 164f4d8a16f..34c76eed6c7 100644 --- a/nomad/service_registration_endpoint_test.go +++ b/nomad/service_registration_endpoint_test.go @@ -5,12 +5,13 @@ import ( "testing" "github.com/hashicorp/go-memdb" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -1174,6 +1175,148 @@ func TestServiceRegistration_GetService(t *testing.T) { }, name: "filtering and pagination", }, + { + name: "choose 2 of 3", + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // insert 3 instances of service s1 + nodeID, jobID, allocID := "node_id", "job_id", "alloc_id" + services := []*structs.ServiceRegistration{ + { + ID: "id_1", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag1"}, + Address: "10.0.0.1", + Port: 9001, + CreateIndex: 101, + ModifyIndex: 201, + }, + { + ID: "id_2", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag2"}, + Address: "10.0.0.2", + Port: 9002, + CreateIndex: 102, + ModifyIndex: 202, + }, + { + ID: "id_3", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag3"}, + Address: "10.0.0.3", + Port: 9003, + CreateIndex: 103, + ModifyIndex: 103, + }, + } + must.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: "s1", + Choose: "2|abc123", // select 2 in consistent order + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + must.NoError(t, err) + + result := serviceRegResp.Services + + must.Len(t, 2, result) + must.Eq(t, "10.0.0.3", result[0].Address) + must.Eq(t, "10.0.0.2", result[1].Address) + }, + }, + { + name: "choose 3 of 2", // gracefully handle requesting too many + serverFn: func(t *testing.T) (*Server, *structs.ACLToken, func()) { + server, cleanup := TestServer(t, nil) + return server, nil, cleanup + }, + testFn: func(t *testing.T, s *Server, _ *structs.ACLToken) { + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // insert 2 instances of service s1 + nodeID, jobID, allocID := "node_id", "job_id", "alloc_id" + services := []*structs.ServiceRegistration{ + { + ID: "id_1", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag1"}, + Address: "10.0.0.1", + Port: 9001, + CreateIndex: 101, + ModifyIndex: 201, + }, + { + ID: "id_2", + Namespace: "default", + ServiceName: "s1", + NodeID: nodeID, + Datacenter: "dc1", + JobID: jobID, + AllocID: allocID, + Tags: []string{"tag2"}, + Address: "10.0.0.2", + Port: 9002, + CreateIndex: 102, + ModifyIndex: 202, + }, + } + must.NoError(t, s.fsm.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 10, services)) + + serviceRegReq := &structs.ServiceRegistrationByNameRequest{ + ServiceName: "s1", + Choose: "3|abc123", // select 3 in consistent order (though there are only 2 total) + QueryOptions: structs.QueryOptions{ + Namespace: structs.DefaultNamespace, + Region: DefaultRegion, + }, + } + var serviceRegResp structs.ServiceRegistrationByNameResponse + err := msgpackrpc.CallWithCodec( + codec, structs.ServiceRegistrationGetServiceRPCMethod, serviceRegReq, &serviceRegResp) + must.NoError(t, err) + + result := serviceRegResp.Services + + must.Len(t, 2, result) + must.Eq(t, "10.0.0.2", result[0].Address) + must.Eq(t, "10.0.0.1", result[1].Address) + }, + }, } for _, tc := range testCases { @@ -1184,3 +1327,79 @@ func TestServiceRegistration_GetService(t *testing.T) { }) } } + +func TestServiceRegistration_chooseErr(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + must.Empty(t, result) + must.ErrorIs(t, err, structs.ErrMalformedChooseParameter) + } + + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s2"}, + {ID: "abc003", ServiceName: "s3"}, + } + + try(regs, "") + try(regs, "1|") + try(regs, "|abc") + try(regs, "a|abc") +} + +func TestServiceRegistration_choose(t *testing.T) { + ci.Parallel(t) + + sr := (*ServiceRegistration)(nil) + try := func(input, exp []*structs.ServiceRegistration, parameter string) { + result, err := sr.choose(input, parameter) + must.NoError(t, err) + must.Eq(t, exp, result) + } + + // zero services + try(nil, []*structs.ServiceRegistration{}, "1|aaa") + try(nil, []*structs.ServiceRegistration{}, "2|aaa") + + // some unique services + regs := []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + } + + // same key, increasing n -> maintains order (n=1) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + }, "1|aaa") + + // same key, increasing n -> maintains order (n=2) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "2|aaa") + + // same key, increasing n -> maintains order (n=3) + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|aaa") + + // unique key -> different orders + try(regs, []*structs.ServiceRegistration{ + {ID: "abc001", ServiceName: "s1"}, + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + }, "3|bbb") + + // another key -> another order + try(regs, []*structs.ServiceRegistration{ + {ID: "abc002", ServiceName: "s1"}, + {ID: "abc003", ServiceName: "s1"}, + {ID: "abc001", ServiceName: "s1"}, + }, "3|ccc") +} diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 69747a40e2b..b98d814761f 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -20,6 +20,7 @@ const ( errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" errMissingAllocID = "Missing allocation ID" errIncompatibleFiltering = "Filter expression cannot be used with other filter parameters" + errMalformedChooseParameter = "Parameter for choose must be in form '|'" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -55,6 +56,7 @@ var ( ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) ErrIncompatibleFiltering = errors.New(errIncompatibleFiltering) + ErrMalformedChooseParameter = errors.New(errMalformedChooseParameter) ErrUnknownNode = errors.New(ErrUnknownNodePrefix) diff --git a/nomad/structs/service_registration.go b/nomad/structs/service_registration.go index 31554fe5d8f..9929e83fc7c 100644 --- a/nomad/structs/service_registration.go +++ b/nomad/structs/service_registration.go @@ -1,6 +1,8 @@ package structs import ( + "crypto/md5" + "encoding/binary" "fmt" "github.com/hashicorp/nomad/helper" @@ -171,6 +173,25 @@ func (s *ServiceRegistration) GetNamespace() string { return s.Namespace } +// HashWith generates a unique value representative of s based on the contents of s. +func (s *ServiceRegistration) HashWith(key string) string { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(s.Port)) + + sum := md5.New() + sum.Write(buf) + sum.Write([]byte(s.AllocID)) + sum.Write([]byte(s.ID)) + sum.Write([]byte(s.Namespace)) + sum.Write([]byte(s.Address)) + sum.Write([]byte(s.ServiceName)) + for _, tag := range s.Tags { + sum.Write([]byte(tag)) + } + sum.Write([]byte(key)) + return fmt.Sprintf("%x", sum.Sum(nil)) +} + // ServiceRegistrationUpsertRequest is the request object used to upsert one or // more service registrations. type ServiceRegistrationUpsertRequest struct { @@ -245,6 +266,7 @@ type ServiceRegistrationStub struct { // of services matching a specific name. type ServiceRegistrationByNameRequest struct { ServiceName string + Choose string // stable selection of n services QueryOptions } diff --git a/nomad/structs/service_registration_test.go b/nomad/structs/service_registration_test.go index da9c0773bc9..cfa8e3bb23d 100644 --- a/nomad/structs/service_registration_test.go +++ b/nomad/structs/service_registration_test.go @@ -3,6 +3,7 @@ package structs import ( "testing" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -447,3 +448,27 @@ func TestServiceRegistrationByNameRequest_StaleReadSupport(t *testing.T) { req := &ServiceRegistrationByNameRequest{} require.True(t, req.IsRead()) } + +func TestServiceRegistration_HashWith(t *testing.T) { + a := ServiceRegistration{ + Address: "10.0.0.1", + Port: 9999, + } + + // same service, same key -> same hash + must.Eq(t, a.HashWith("aaa"), a.HashWith("aaa")) + + // same service, different key -> different hash + must.NotEq(t, a.HashWith("aaa"), a.HashWith("bbb")) + + b := ServiceRegistration{ + Address: "10.0.0.2", + Port: 9998, + } + + // different service, same key -> different hash + must.NotEq(t, a.HashWith("aaa"), b.HashWith("aaa")) + + // different service, different key -> different hash + must.NotEq(t, a.HashWith("aaa"), b.HashWith("bbb")) +} diff --git a/website/content/api-docs/services.mdx b/website/content/api-docs/services.mdx index 465a2ce9aee..7905d3cb99d 100644 --- a/website/content/api-docs/services.mdx +++ b/website/content/api-docs/services.mdx @@ -94,6 +94,11 @@ The table below shows this endpoint's support for used to filter the results. Consider using pagination or a query parameter to reduce resource used to serve the request. +- `choose` `(string: "")` - Specifies the number of services to return and a hash + key. Must be in the form `|`. Nomad uses [rendezvous hashing][hash] to deliver + consistent results for a given key, and stable results when the number of services + changes. + ### Sample Request ```shell-session @@ -173,3 +178,5 @@ $ curl \ --request DELETE \ https://localhost:4646/v1/service/example-cache-redis/_nomad-task-ba731da0-6df9-9858-ef23-806e9758a899-redis-example-cache-redis-db ``` + +[hash]: https://en.wikipedia.org/wiki/Rendezvous_hashing \ No newline at end of file From 4733db4e14fde43c12d7de39af71617e5907bcc3 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Thu, 30 Jun 2022 15:10:47 -0500 Subject: [PATCH 2/2] cl: fixup changelog comment Co-authored-by: James Rasell --- .changelog/12862.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/12862.txt b/.changelog/12862.txt index 182a8b42dfd..ec238e923f6 100644 --- a/.changelog/12862.txt +++ b/.changelog/12862.txt @@ -1,3 +1,3 @@ ```release-note:improvement -api: enable setting ?choose parameter when querying services +api: enable setting `?choose` parameter when querying services ```