Skip to content

Commit

Permalink
client: fix RPC forwarding when querying checks for alloc. (#14498)
Browse files Browse the repository at this point in the history
When querying the checks for an allocation, the request must be
forwarded to the agent that is running the allocation. If the
initial request is made to a server agent, the request can be made
directly to the client agent running the allocation. If the
request is made to a client agent not running the alloc, the
request needs to be forwarded to a server and then the correct
client.
  • Loading branch information
jrasell authored Sep 8, 2022
1 parent 1d9c996 commit 6a6e4a3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
4 changes: 2 additions & 2 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,9 @@ func (s *HTTPServer) allocChecks(allocID string, resp http.ResponseWriter, req *
case useLocalClient:
rpcErr = s.agent.Client().ClientRPC("Allocations.Checks", &args, &reply)
case useClientRPC:
rpcErr = s.agent.Client().RPC("Allocations.Checks", &args, &reply)
rpcErr = s.agent.Client().RPC("ClientAllocations.Checks", &args, &reply)
case useServerRPC:
rpcErr = s.agent.Server().RPC("Allocations.Checks", &args, &reply)
rpcErr = s.agent.Server().RPC("ClientAllocations.Checks", &args, &reply)
default:
rpcErr = CodedError(400, "No local Node and node_id not provided")
}
Expand Down
53 changes: 53 additions & 0 deletions nomad/client_alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,59 @@ func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstru
return NodeRpc(state.Session, "Allocations.Stats", args, reply)
}

// Checks is the server implementation of the allocation checks RPC. The
// ultimate response is provided by the node running the allocation. This RPC
// is needed to handle queries which hit the server agent API directly, or via
// another node which is not running the allocation.
func (a *ClientAllocations) Checks(args *cstructs.AllocChecksRequest, reply *cstructs.AllocChecksResponse) error {

// We only allow stale reads since the only potentially stale information
// is the Node registration and the cost is fairly high for adding another
// hop in the forwarding chain.
args.QueryOptions.AllowStale = true

// Potentially forward to a different region.
if done, err := a.srv.forward("ClientAllocations.Checks", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "client_allocations", "checks"}, time.Now())

// Grab the state snapshot, as we need this to perform lookups for a number
// of objects, all things being well.
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}

// Get the full allocation object, so we have information such as the
// namespace and node ID.
alloc, err := getAlloc(snap, args.AllocID)
if err != nil {
return err
}

// Check for namespace read-job permissions.
if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}

// Make sure Node is valid and new enough to support RPC.
if _, err = getNodeForRpc(snap, alloc.NodeID); err != nil {
return err
}

// Get the connection to the client.
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Checks", args, reply)
}

// Make the RPC
return NodeRpc(state.Session, "Allocations.Checks", args, reply)
}

// exec is used to execute command in a running task
func (a *ClientAllocations) exec(conn io.ReadWriteCloser) {
defer conn.Close()
Expand Down

0 comments on commit 6a6e4a3

Please sign in to comment.