Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allocs: Add nomad alloc signal command #5515

Merged
merged 2 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sort"
"time"

"github.com/hashicorp/nomad/nomad/structs"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API package shouldn't reference the internal nomad/structs package - doing so leaks many internal packages and transitive dependencies to api clients and make dependency management more complicated, specially for go mod users.

Can you copy the relevant structs here?

)

var (
Expand Down Expand Up @@ -103,6 +105,23 @@ type AllocStopResponse struct {
WriteMeta
}

func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return err
}

req := structs.AllocSignalRequest{
AllocID: alloc.ID,
Signal: signal,
Task: task,
}

var resp structs.GenericResponse
_, err = nodeClient.putQuery("/v1/client/allocation/"+alloc.ID+"/signal", &req, &resp, q)
return err
}

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Expand Down
14 changes: 14 additions & 0 deletions client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply
return nil
}

// Signal is used to send a signal to an allocation's tasks on a client.
func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "signal"}, time.Now())

// Check alloc-lifecycle permissions
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return nstructs.ErrPermissionDenied
}

return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal)
}

// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now())
Expand Down
84 changes: 84 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,90 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
}
}

func TestAllocations_Signal(t *testing.T) {
t.Parallel()

client, cleanup := TestClient(t, nil)
defer cleanup()

a := mock.Alloc()
require.Nil(t, client.addAlloc(a, ""))

// Try with bad alloc
req := &nstructs.AllocSignalRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.NotNil(t, err)
require.True(t, nstructs.IsErrUnknownAllocation(err))

// Try with good alloc
req.AllocID = a.ID

var resp2 nstructs.GenericResponse
err = client.ClientRPC("Allocations.Signal", &req, &resp2)

require.Error(t, err, "Expected error, got: %s, resp: %#+v", err, resp2)
require.Equal(t, "1 error(s) occurred:\n\n* Failed to signal task: web, err: Task not running", err.Error())
}

func TestAllocations_Signal_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()

client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer cleanup()

// Try request without a token and expect failure
{
req := &nstructs.AllocSignalRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &nstructs.AllocSignalRequest{}
req.AuthToken = token.SecretID

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)

require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}))
req := &nstructs.AllocSignalRequest{}
req.AuthToken = token.SecretID
req.Namespace = nstructs.DefaultNamespace

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err))
}

// Try request with a management token
{
req := &nstructs.AllocSignalRequest{}
req.AuthToken = root.SecretID

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err))
}
}

func TestAllocations_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
26 changes: 26 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,29 @@ func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {

return err.ErrorOrNil()
}

// Signal sends a signal request to task runners inside an allocation. If the
// taskName is empty, then it is sent to all tasks.
func (ar *allocRunner) Signal(taskName, signal string) error {
event := structs.NewTaskEvent(structs.TaskSignaling).SetSignalText(signal)

if taskName != "" {
tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Task not found")
}

return tr.Signal(event, signal)
}

var err *multierror.Error

for tn, tr := range ar.tasks {
rerr := tr.Signal(event.Copy(), signal)
if rerr != nil {
err = multierror.Append(err, fmt.Errorf("Failed to signal task: %s, err: %v", tn, rerr))
}
}

return err.ErrorOrNil()
}
13 changes: 13 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type AllocRunner interface {
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
Expand Down Expand Up @@ -706,6 +707,18 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// SignalAllocation sends a signal to the tasks within an allocation.
// If the provided task is empty, then every allocation will be signalled.
// If a task is provided, then only an exactly matching task will be signalled.
func (c *Client) SignalAllocation(allocID, task, signal string) error {
ar, err := c.getAllocRunner(allocID)
if err != nil {
return err
}

return ar.Signal(task, signal)
}

// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
Expand Down
41 changes: 41 additions & 0 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
return s.allocRestart(allocID, resp, req)
case "gc":
return s.allocGC(allocID, resp, req)
case "signal":
return s.allocSignal(allocID, resp, req)
}

return nil, CodedError(404, resourceNotFoundErr)
Expand Down Expand Up @@ -255,6 +257,45 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http
return nil, rpcErr
}

func (s *HTTPServer) allocSignal(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if !(req.Method == "POST" || req.Method == "PUT") {
return nil, CodedError(405, ErrInvalidMethod)
}

// Build the request and parse the ACL token
args := structs.AllocSignalRequest{}
err := decodeBody(req, &args)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Failed to decode body: %v", err))
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
args.AllocID = allocID

// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID)

// Make the RPC
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.Signal", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.Signal", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.Signal", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}

if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}

return reply, rpcErr
}

func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
Expand Down
2 changes: 2 additions & 0 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func TestHTTP_AllocStop(t *testing.T) {

require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))

// Test that the happy path works
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
Expand All @@ -420,6 +421,7 @@ func TestHTTP_AllocStop(t *testing.T) {
require.NotEmpty(a.Index, "missing index")
}

// Test that we 404 when the allocid is invalid
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
Expand Down
Loading