Skip to content

Commit

Permalink
Merge pull request #5502 from hashicorp/dani/f-alloc-restart
Browse files Browse the repository at this point in the history
alloc-lifecycle: nomad alloc restart
  • Loading branch information
endocrimes authored Apr 11, 2019
2 parents 7764778 + 419d70c commit 6b60068
Show file tree
Hide file tree
Showing 13 changed files with 1,020 additions and 4 deletions.
14 changes: 14 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
return err
}

func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
req := AllocationRestartRequest{
TaskName: taskName,
}

var resp struct{}
_, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q)
return err
}

// Allocation is used for serialization of allocations.
type Allocation struct {
ID string
Expand Down Expand Up @@ -246,6 +256,10 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) {
return attempted, availableAttempts
}

type AllocationRestartRequest struct {
TaskName string
}

// RescheduleTracker encapsulates previous reschedule events
type RescheduleTracker struct {
Events []*RescheduleEvent
Expand Down
13 changes: 13 additions & 0 deletions client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (a *Allocations) GarbageCollect(args *nstructs.AllocSpecificRequest, reply
return nil
}

// Restart is used to trigger a restart of an allocation or a subtask on a client.
func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstructs.GenericResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "restart"}, time.Now())

if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) {
return nstructs.ErrPermissionDenied
}

return a.c.RestartAllocation(args.AllocID, args.TaskName)
}

// Stats is used to collect allocation statistics
func (a *Allocations) Stats(args *cstructs.AllocStatsRequest, reply *cstructs.AllocStatsResponse) error {
defer metrics.MeasureSince([]string{"client", "allocations", "stats"}, time.Now())
Expand Down
97 changes: 97 additions & 0 deletions client/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"fmt"
"strings"
"testing"

"github.com/hashicorp/nomad/acl"
Expand All @@ -13,6 +14,102 @@ import (
"github.com/stretchr/testify/require"
)

func TestAllocations_Restart(t *testing.T) {
t.Parallel()
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()

a := mock.Alloc()
a.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
a.Job.TaskGroups[0].RestartPolicy = &nstructs.RestartPolicy{
Attempts: 0,
Mode: nstructs.RestartPolicyModeFail,
}
a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10ms",
}
require.Nil(client.addAlloc(a, ""))

// Try with bad alloc
req := &nstructs.AllocRestartRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.Error(err)

// Try with good alloc
req.AllocID = a.ID

testutil.WaitForResult(func() (bool, error) {
var resp2 nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp2)
if err != nil && strings.Contains(err.Error(), "not running") {
return false, err
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocations_Restart_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
server, addr, root := testACLServer(t, nil)
defer server.Shutdown()

client, cleanup := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
c.ACLEnabled = true
})
defer cleanup()

// Try request without a token and expect failure
{
req := &nstructs.AllocRestartRequest{}
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with an invalid token and expect failure
{
token := mock.CreatePolicyAndToken(t, server.State(), 1005, "invalid", mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{}))
req := &nstructs.AllocRestartRequest{}
req.AuthToken = token.SecretID

var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)

require.NotNil(err)
require.EqualError(err, nstructs.ErrPermissionDenied.Error())
}

// Try request with a valid token
{
policyHCL := mock.NamespacePolicy(nstructs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})
token := mock.CreatePolicyAndToken(t, server.State(), 1007, "valid", policyHCL)
require.NotNil(token)
req := &nstructs.AllocRestartRequest{}
req.AuthToken = token.SecretID
req.Namespace = nstructs.DefaultNamespace
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err)
}

// Try request with a management token
{
req := &nstructs.AllocRestartRequest{}
req.AuthToken = root.SecretID
var resp nstructs.GenericResponse
err := client.ClientRPC("Allocations.Restart", &req, &resp)
require.True(nstructs.IsErrUnknownAllocation(err), "Expected unknown alloc, found: %v", err)
}
}

func TestAllocations_GarbageCollectAll(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
27 changes: 27 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/allocrunner/state"
Expand Down Expand Up @@ -936,3 +937,29 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH
}
return nil
}

// RestartTask signalls the task runner for the provided task to restart.
func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error {
tr, ok := ar.tasks[taskName]
if !ok {
return fmt.Errorf("Could not find task runner for task: %s", taskName)
}

return tr.Restart(context.TODO(), taskEvent, false)
}

// RestartAll signalls all task runners in the allocation to restart and passes
// a copy of the task event to each restart event.
// Returns any errors in a concatenated form.
func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error {
var err *multierror.Error

for tn := range ar.tasks {
rerr := ar.RestartTask(tn, taskEvent.Copy())
if rerr != nil {
err = multierror.Append(err, rerr)
}
}

return err.ErrorOrNil()
}
19 changes: 19 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type AllocRunner interface {
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
GetTaskEventHandler(taskName string) drivermanager.EventHandler

RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
}

// Client is used to implement the client interaction with Nomad. Clients
Expand Down Expand Up @@ -703,6 +706,22 @@ func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}

func (c *Client) RestartAllocation(allocID, taskName string) error {
ar, err := c.getAllocRunner(allocID)
if err != nil {
return err
}

event := structs.NewTaskEvent(structs.TaskRestartSignal).
SetRestartReason("User requested restart")

if taskName != "" {
return ar.RestartTask(taskName, event)
}

return ar.RestartAll(event)
}

// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
c.configLock.RLock()
Expand Down
49 changes: 48 additions & 1 deletion command/agent/alloc_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"encoding/json"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -96,8 +97,9 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
if s.agent.client == nil {
return nil, clientNotRunning
}

return s.allocSnapshot(allocID, resp, req)
case "restart":
return s.allocRestart(allocID, resp, req)
case "gc":
return s.allocGC(allocID, resp, req)
}
Expand Down Expand Up @@ -140,6 +142,51 @@ func (s *HTTPServer) ClientGCRequest(resp http.ResponseWriter, req *http.Request
return nil, rpcErr
}

func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Build the request and parse the ACL token
args := structs.AllocRestartRequest{
AllocID: allocID,
TaskName: "",
}
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)

// Explicitly parse the body separately to disallow overriding AllocID in req Body.
var reqBody struct {
TaskName string
}
err := json.NewDecoder(req.Body).Decode(&reqBody)
if err != nil {
return nil, err
}
if reqBody.TaskName != "" {
args.TaskName = reqBody.TaskName
}

// Determine the handler to use
useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID)

// Make the RPC
var reply structs.GenericResponse
var rpcErr error
if useLocalClient {
rpcErr = s.agent.Client().ClientRPC("Allocations.Restart", &args, &reply)
} else if useClientRPC {
rpcErr = s.agent.Client().RPC("ClientAllocations.Restart", &args, &reply)
} else if useServerRPC {
rpcErr = s.agent.Server().RPC("ClientAllocations.Restart", &args, &reply)
} else {
rpcErr = CodedError(400, "No local Node and node_id not provided")
}

if rpcErr != nil {
if structs.IsErrNoNodeConn(rpcErr) || structs.IsErrUnknownAllocation(rpcErr) {
rpcErr = CodedError(404, rpcErr.Error())
}
}

return reply, rpcErr
}

func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Build the request and parse the ACL token
args := structs.AllocSpecificRequest{
Expand Down
Loading

0 comments on commit 6b60068

Please sign in to comment.