Skip to content

Commit

Permalink
allocs: Add nomad alloc signal command
Browse files Browse the repository at this point in the history
This command will be used to send a signal to either a single task within an
allocation, or all of the tasks if <task-name> is omitted. If the sent signal
terminates the allocation, it will be treated as if the allocation has crashed,
rather than as if it was operator-terminated.

Signal validation is currently handled by the driver itself and nomad
does not attempt to restrict or validate them.
  • Loading branch information
endocrimes committed Apr 25, 2019
1 parent a4ed109 commit 023d0df
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 0 deletions.
19 changes: 19 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sort"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

var (
Expand Down Expand Up @@ -103,6 +105,23 @@ type AllocStopResponse struct {
WriteMeta
}

func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
nodeClient, err := a.client.GetNodeClient(alloc.NodeID, q)
if err != nil {
return err
}

req := structs.AllocSignalRequest{
AllocID: alloc.ID,
Signal: signal,
Task: task,
}

var resp structs.GenericResponse
_, err = nodeClient.putQuery("/v1/client/allocation/"+alloc.ID+"/signal", &req, &resp, q)
return err
}

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Expand Down
13 changes: 13 additions & 0 deletions client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply
return nil
}

func (a *Allocations) Signal(args *nstructs.AllocSignalRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "signal"}, time.Now())

// Check submit job permissions
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return nstructs.ErrPermissionDenied
}

return a.c.SignalAllocation(args.AllocID, args.Task, args.Signal)
}

// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now())
Expand Down
92 changes: 92 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,98 @@ func TestAllocations_GarbageCollect_ACL(t *testing.T) {
}
}

func TestAllocations_Signal(t *testing.T) {
t.Parallel()

client, cleanup := TestClient(t, nil)
defer cleanup()

a := mock.Alloc()
require.Nil(t, client.addAlloc(a, ""))

// Try with bad alloc
req := &nstructs.AllocSignalRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.NotNil(t, err)
require.True(t, nstructs.IsErrUnknownAllocation(err))

// Try with good alloc
req.AllocID = a.ID

var resp2 nstructs.GenericResponse
err = client.ClientRPC("Allocations.Signal", &req, &resp2)

require.Error(t, err, "Expected error, got: %s, resp: %#+v", err, resp2)
require.Equal(t, "1 error(s) occurred:\n\n* Failed to signal task: web, err: Task not running", err.Error())
}

func TestAllocations_Signal_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()

client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer cleanup()

// Try request without a token and expect failure
{
req := &nstructs.AllocSignalRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NodePolicy(acl.PolicyDeny))
req := &nstructs.AllocSignalRequest{}
req.AuthToken = token.SecretID

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)

require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with a valid token
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "test-valid",
mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle}))
req := &nstructs.AllocSignalRequest{}
req.AuthToken = token.SecretID
req.Namespace = nstructs.DefaultNamespace

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err))
}

// Try request with a management token
{
req := &nstructs.AllocSignalRequest{}
req.AuthToken = root.SecretID

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Signal", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err))
}
}

func TestAllocations_Signal_Subtask(t *testing.T) {
t.Parallel()
}

func TestAllocations_Signal_EntireAlloc(t *testing.T) {
t.Parallel()
}

func TestAllocations_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
26 changes: 26 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,3 +963,29 @@ func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {

return err.ErrorOrNil()
}

// Signal sends a signal request to task runners inside an allocation. If the
// taskName is empty, then it is sent to all tasks.
func (ar *allocRunner) Signal(taskName, signal string) error {
event := structs.NewTaskEvent(structs.TaskSignaling).SetSignalText(signal)

if taskName != "" {
tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Task not found")
}

return tr.Signal(event, signal)
}

var err *multierror.Error

for tn, tr := range ar.tasks {
rerr := tr.Signal(event.Copy(), signal)
if rerr != nil {
err = multierror.Append(err, fmt.Errorf("Failed to signal task: %s, err: %v", tn, rerr))
}
}

return err.ErrorOrNil()
}
13 changes: 13 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type AllocRunner interface {
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
Signal(taskName, signal string) error
GetTaskEventHandler(taskName string) drivermanager.EventHandler

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
Expand Down Expand Up @@ -706,6 +707,18 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}

// SignalAllocation sends a signal to the tasks within an allocation.
// If the provided task is empty, then every allocation will be signalled.
// If a task is provided, then only an exactly matching task will be signalled.
func (c *Client) SignalAllocation(allocID, task, signal string) error {
ar, err := c.getAllocRunner(allocID)
if err != nil {
return err
}

return ar.Signal(task, signal)
}

// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
Expand Down
41 changes: 41 additions & 0 deletions command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
return s.allocRestart(allocID, resp, req)
case "gc":
return s.allocGC(allocID, resp, req)
case "signal":
return s.allocSignal(allocID, resp, req)
}

return nil, CodedError(404, resourceNotFoundErr)
Expand Down Expand Up @@ -255,6 +257,45 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http
return nil, rpcErr
}

func (s *HTTPServer) allocSignal(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if !(req.Method == "POST" || req.Method == "PUT") {
return nil, CodedError(405, ErrInvalidMethod)
}

// Build the request and parse the ACL token
args := structs.AllocSignalRequest{}
err := decodeBody(req, &args)
if err != nil {
return nil, CodedError(400, fmt.Sprintf("Failed to decode body: %v", err))
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
args.AllocID = allocID

// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID)

// Make the RPC
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.Signal", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.Signal", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.Signal", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}

if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}

return reply, rpcErr
}

func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
Expand Down
2 changes: 2 additions & 0 deletions command/agent/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ func TestHTTP_AllocStop(t *testing.T) {

require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc}))

// Test that the happy path works
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
Expand All @@ -420,6 +421,7 @@ func TestHTTP_AllocStop(t *testing.T) {
require.NotEmpty(a.Index, "missing index")
}

// Test that we 404 when the allocid is invalid
{
// Make the HTTP request
req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil)
Expand Down
Loading

0 comments on commit 023d0df

Please sign in to comment.