Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of client/metadata: fix crasher caused by AllowStale = false into release/1.5.x #16576

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16549.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
core: Fixed a bug where Dynamic Node Metadata requests could crash servers
```
6 changes: 0 additions & 6 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Restart(allocID string, taskName string, allTasks bool, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
req := AllocationRestartRequest{
TaskName: taskName,
Expand Down Expand Up @@ -223,9 +220,6 @@ type AllocStopResponse struct {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Signal(allocID string, task string, signal string, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
req := AllocSignalRequest{
Signal: signal,
Expand Down
30 changes: 27 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,8 @@ func (c *Client) query(endpoint string, out any, q *QueryOptions) (*QueryMeta, e
return qm, nil
}

// putQuery is used to do a PUT request when doing a read against an endpoint
// and deserialize the response into an interface using standard Nomad
// conventions.
// putQuery is used to do a PUT request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) putQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("PUT", endpoint)
if err != nil {
Expand Down Expand Up @@ -969,6 +968,31 @@ func (c *Client) put(endpoint string, in, out any, q *WriteOptions) (*WriteMeta,
return c.write(http.MethodPut, endpoint, in, out, q)
}

// postQuery is used to do a POST request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) postQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("POST", endpoint)
if err != nil {
return nil, err
}
r.setQueryOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}

// post is used to do a POST request against an endpoint and
// serialize/deserialized using the standard Nomad conventions.
func (c *Client) post(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, error) {
Expand Down
4 changes: 2 additions & 2 deletions api/node_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func (n *Nodes) Meta() *NodeMeta {

// Apply dynamic Node metadata updates to a Node. If NodeID is unset then Node
// receiving the request is modified.
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *WriteOptions) (*NodeMetaResponse, error) {
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *QueryOptions) (*NodeMetaResponse, error) {
var out NodeMetaResponse
_, err := n.client.post("/v1/client/metadata", meta, &out, qo)
_, err := n.client.postQuery("/v1/client/metadata", meta, &out, qo)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *HTTPServer) nodeMetaApply(resp http.ResponseWriter, req *http.Request)
return nil, CodedError(http.StatusBadRequest, err.Error())
}

s.parseWriteRequest(req, &args.WriteRequest)
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
parseNode(req, &args.NodeID)

// Determine the handler to use
Expand Down
8 changes: 8 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ Prefer adding a new message to changing any existing RPC messages.
upgraded, so use this to guard sending the new RPC, else send the old RPC
* Version must match the actual release version!

* [ ] If implementing a Client RPC...
* Use `QueryOptions` instead of `WriteRequest` in the Request struct as
`WriteRequest` is only for *Raft* writes.
* Set `QueryOptions.AllowStale = true` in the *Server* RPC forwarder to avoid
an infinite loop between leaders and followers when a Client RPC is
forwarded through a follower. See
https://github.com/hashicorp/nomad/issues/16517

## Docs

* [ ] Changelog
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/mitchellh/go-glint v0.0.0-20210722152315-6515ceb4a127
github.com/mitchellh/go-homedir v1.1.0
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/mitchellh/go-testing-interface v1.14.1
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770
github.com/mitchellh/hashstructure v1.1.0
github.com/mitchellh/mapstructure v1.5.0
github.com/mitchellh/reflectwalk v1.0.2
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,9 @@ github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b h1:9+ke9YJ9KGWw5AN
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk=
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770 h1:drhDO54gdT/a15GBcMRmunZiNcLgPiFIJa23KzmcvcU=
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770/go.mod h1:SO/iHr6q2EzbqRApt+8/E9wqebTwQn5y+UlB04bxzo0=
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
Expand Down
8 changes: 8 additions & 0 deletions nomad/client_meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newNodeMetaEndpoint(srv *Server) *NodeMeta {
func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Apply"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand All @@ -48,6 +52,10 @@ func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.Node
func (n *NodeMeta) Read(args *structs.NodeSpecificRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Read"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand Down
129 changes: 129 additions & 0 deletions nomad/client_meta_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package nomad

import (
"testing"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
)

// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite
// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node
// connected to Follower 1:
//
// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it
// will honor AllowStale=false and forward the request to the Leader.
//
// The Leader will accept the RPC, notice that Follower 1 has a connection to
// the Node, and the Leader will send the request back to Follower 1.
//
// Follower 1, ever respectful of AllowStale=false, will forward it back to the
// Leader.
//
// The Leader, being unable to forward to the Node, will send it back to
// Follower 1.
//
// This argument will continue until one of the Servers runs out of memory or
// patience and stomps away in anger (crashes). Like any good argument the
// ending is never pretty as the Servers will suffer CPU starvation and
// potentially Raft flapping before anyone actually OOMs.
//
// See https://github.com/hashicorp/nomad/issues/16517 for details.
//
// If test fails it will do so spectacularly by consuming all available CPU and
// potentially all available memory. Running it in a VM or container is
// suggested.
func TestNodeMeta_Forward(t *testing.T) {
ci.Parallel(t)

servers := []*Server{}
for i := 0; i < 3; i++ {
s, cleanup := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.NumSchedulers = 0
})
t.Cleanup(cleanup)
servers = append(servers, s)
}

TestJoin(t, servers...)
leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC)

followers := []string{}
for _, s := range servers {
if addr := s.config.RPCAddr.String(); addr != leader {
followers = append(followers, addr)
}
}
t.Logf("leader=%s followers=%q", leader, followers)

clients := []*client.Client{}
for i := 0; i < 4; i++ {
c, cleanup := client.TestClient(t, func(c *config.Config) {
// Clients will rebalance across all servers, but try to get them to use
// followers to ensure we don't hit the loop in #16517
c.Servers = followers
})
defer cleanup()
clients = append(clients, c)
}
for _, c := range clients {
testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region())
}

agentRPCs := []func(string, any, any) error{}
nodeIDs := make([]string, 0, len(clients))

// Build list of agents and node IDs
for _, s := range servers {
agentRPCs = append(agentRPCs, s.RPC)
}

for _, c := range clients {
agentRPCs = append(agentRPCs, c.RPC)
nodeIDs = append(nodeIDs, c.NodeID())
}

region := clients[0].Region()

// Apply metadata to every client through every agent to ensure forwarding
// always works regardless of path taken.
for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeMetaApplyRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
Meta: map[string]*string{"testing": pointer.Of("123")},
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Apply", args, reply))
must.MapNotEmpty(t, reply.Meta)
}
}

for _, rpc := range agentRPCs {
for _, nodeID := range nodeIDs {
args := &structs.NodeSpecificRequest{
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
QueryOptions: structs.QueryOptions{
Region: region,
},
NodeID: nodeID,
}
reply := &structs.NodeMetaResponse{}
must.NoError(t, rpc("NodeMeta.Read", args, reply))
must.MapNotEmpty(t, reply.Meta)
must.Eq(t, reply.Meta["testing"], "123")
must.MapNotEmpty(t, reply.Dynamic)
must.Eq(t, *reply.Dynamic["testing"], "123")
}
}
}
2 changes: 1 addition & 1 deletion nomad/structs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (di *DriverInfo) HealthCheckEquals(other *DriverInfo) bool {

// NodeMetaApplyRequest is used to update Node metadata on Client agents.
type NodeMetaApplyRequest struct {
WriteRequest
QueryOptions // Client RPCs must use QueryOptions to set AllowStale=true

// NodeID is the node being targeted by this request (or the node
// receiving this request if NodeID is empty).
Expand Down
18 changes: 7 additions & 11 deletions nomad/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"math/rand"
"net"
"sync/atomic"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
Expand All @@ -15,14 +14,15 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/version"
testing "github.com/mitchellh/go-testing-interface"
"github.com/shoenig/test/must"
)

var (
nodeNumber int32 = 0
)

func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
server, cleanup := TestServer(t, func(c *Config) {
c.ACLEnabled = true
if cb != nil {
Expand All @@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken,
return server, token, cleanup
}

func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) {
func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
s, c, err := TestServerErr(t, cb)
must.NoError(t, err, must.Sprint("failed to start test server"))
return s, c
}

func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) {
// Setup the default settings
config := DefaultConfig()

Expand Down Expand Up @@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
return nil, nil, errors.New("unable to acquire ports for test server")
}

func TestJoin(t *testing.T, servers ...*Server) {
func TestJoin(t testing.T, servers ...*Server) {
for i := 0; i < len(servers)-1; i++ {
addr := fmt.Sprintf("127.0.0.1:%d",
servers[i].config.SerfConfig.MemberlistConfig.BindPort)

for j := i + 1; j < len(servers); j++ {
num, err := servers[j].Join([]string{addr})
if err != nil {
t.Fatalf("err: %v", err)
}
if num != 1 {
t.Fatalf("bad: %d", num)
}
must.NoError(t, err)
must.Eq(t, 1, num)
}
}
}
13 changes: 8 additions & 5 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
})
}

// WaitForLeaders blocks until each serverRPC knows the leader.
func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
// WaitForLeaders blocks until each rpcs knows the leader.
func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string {
t.Helper()

for i := 0; i < len(serverRPCs); i++ {
var leader string
for i := 0; i < len(rpcs); i++ {
ok := func() (bool, error) {
leader = ""
args := &structs.GenericRequest{}
var leader string
err := serverRPCs[i]("Status.Leader", args, &leader)
err := rpcs[i]("Status.Leader", args, &leader)
return leader != "", err
}
must.Wait(t, wait.InitialSuccess(
Expand All @@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
wait.Gap(1*time.Second),
))
}

return leader
}

// WaitForClient blocks until the client can be found
Expand Down