Skip to content

Commit

Permalink
client/metadata: fix crasher caused by AllowStale = false
Browse files Browse the repository at this point in the history
Fixes #16517

Given a 3 Server cluster with at least 1 Client connected to Follower 1:

If a NodeMeta.{Apply,Read} for the Client request is received by
Follower 1 with `AllowStale = false` the Follower will forward the
request to the Leader.

The Leader, not being connected to the target Client, will forward the
RPC to Follower 1.

Follower 1, seeing AllowStale=false, will forward the request to the
Leader.

The Leader, not being connected to... well hoppefully you get the
picture: an infinite loop occurs.
  • Loading branch information
schmichael committed Mar 17, 2023
1 parent 1cfa95e commit 9897e86
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 13 deletions.
6 changes: 0 additions & 6 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Restart(allocID string, taskName string, allTasks bool, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
req := AllocationRestartRequest{
TaskName: taskName,
Expand Down Expand Up @@ -223,9 +220,6 @@ type AllocStopResponse struct {
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Signal(allocID string, task string, signal string, w *WriteOptions) (*WriteMeta, error) {
func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
req := AllocSignalRequest{
Signal: signal,
Expand Down
30 changes: 27 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,8 @@ func (c *Client) query(endpoint string, out any, q *QueryOptions) (*QueryMeta, e
return qm, nil
}

// putQuery is used to do a PUT request when doing a read against an endpoint
// and deserialize the response into an interface using standard Nomad
// conventions.
// putQuery is used to do a PUT request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) putQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("PUT", endpoint)
if err != nil {
Expand Down Expand Up @@ -969,6 +968,31 @@ func (c *Client) put(endpoint string, in, out any, q *WriteOptions) (*WriteMeta,
return c.write(http.MethodPut, endpoint, in, out, q)
}

// postQuery is used to do a POST request when doing a "write" to a Client RPC.
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
func (c *Client) postQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
r, err := c.newRequest("POST", endpoint)
if err != nil {
return nil, err
}
r.setQueryOptions(q)
r.obj = in
rtt, resp, err := requireOK(c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

if err := decodeBody(resp, out); err != nil {
return nil, err
}
return qm, nil
}

// post is used to do a POST request against an endpoint and
// serialize/deserialized using the standard Nomad conventions.
func (c *Client) post(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, error) {
Expand Down
4 changes: 2 additions & 2 deletions api/node_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func (n *Nodes) Meta() *NodeMeta {

// Apply dynamic Node metadata updates to a Node. If NodeID is unset then Node
// receiving the request is modified.
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *WriteOptions) (*NodeMetaResponse, error) {
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *QueryOptions) (*NodeMetaResponse, error) {
var out NodeMetaResponse
_, err := n.client.post("/v1/client/metadata", meta, &out, qo)
_, err := n.client.postQuery("/v1/client/metadata", meta, &out, qo)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/agent/meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *HTTPServer) nodeMetaApply(resp http.ResponseWriter, req *http.Request)
return nil, CodedError(http.StatusBadRequest, err.Error())
}

s.parseWriteRequest(req, &args.WriteRequest)
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
parseNode(req, &args.NodeID)

// Determine the handler to use
Expand Down
8 changes: 8 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ Prefer adding a new message to changing any existing RPC messages.
upgraded, so use this to guard sending the new RPC, else send the old RPC
* Version must match the actual release version!

* [ ] If implementing a Client RPC...
* Use `QueryOptions` instead of `WriteRequest` in the Request struct as
`WriteRequest` is only for *Raft* writes.
* Set `QueryOptions.AllowStale = true` in the *Server* RPC forwarder to avoid
an infinite loop between leaders and followers when a Client RPC is
forwarded through a follower. See
https://github.com/hashicorp/nomad/issues/16517

## Docs

* [ ] Changelog
Expand Down
8 changes: 8 additions & 0 deletions nomad/client_meta_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func newNodeMetaEndpoint(srv *Server) *NodeMeta {
func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Apply"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand All @@ -48,6 +52,10 @@ func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.Node
func (n *NodeMeta) Read(args *structs.NodeSpecificRequest, reply *structs.NodeMetaResponse) error {
const method = "NodeMeta.Read"

// Prevent infinite loop between leader and
// follower-with-the-target-node-connection.
args.QueryOptions.AllowStale = true

authErr := n.srv.Authenticate(nil, args)
if done, err := n.srv.forward(method, args, args, reply); done {
return err
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (di *DriverInfo) HealthCheckEquals(other *DriverInfo) bool {

// NodeMetaApplyRequest is used to update Node metadata on Client agents.
type NodeMetaApplyRequest struct {
WriteRequest
QueryOptions // Client RPCs must use QueryOptions to set AllowStale=true

// NodeID is the node being targeted by this request (or the node
// receiving this request if NodeID is empty).
Expand Down

0 comments on commit 9897e86

Please sign in to comment.