diff --git a/api/allocations.go b/api/allocations.go index d622f86b0a5..e0790cb8dce 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -79,6 +79,20 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { return err } +func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) { + var resp AllocStopResponse + _, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q) + return &resp, err +} + +// AllocStopResponse is the response to an `AllocStopRequest` +type AllocStopResponse struct { + // EvalID is the id of the follow up evalution for the rescheduled alloc. + EvalID string + + WriteMeta +} + // Allocation is used for serialization of allocations. type Allocation struct { ID string diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 422f8906cb0..e8d2d3da6a6 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -41,7 +41,29 @@ func (s *HTTPServer) AllocsRequest(resp http.ResponseWriter, req *http.Request) } func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - allocID := strings.TrimPrefix(req.URL.Path, "/v1/allocation/") + reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/allocation/") + + // tokenize the suffix of the path to get the alloc id and find the action + // invoked on the alloc id + tokens := strings.Split(reqSuffix, "/") + if len(tokens) > 2 || len(tokens) < 1 { + return nil, CodedError(404, resourceNotFoundErr) + } + allocID := tokens[0] + + if len(tokens) == 1 { + return s.allocGet(allocID, resp, req) + } + + switch tokens[1] { + case "stop": + return s.allocStop(allocID, resp, req) + } + + return nil, CodedError(404, resourceNotFoundErr) +} + +func (s *HTTPServer) allocGet(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { return nil, CodedError(405, ErrInvalidMethod) } @@ -78,8 +100,22 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re return alloc, nil } -func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if !(req.Method == "POST" || req.Method == "PUT") { + return nil, CodedError(405, ErrInvalidMethod) + } + transReq := &structs.AllocStopRequest{ + AllocID: allocID, + } + s.parseWriteRequest(req, &transReq.WriteRequest) + + var out structs.AllocStopResponse + err := s.agent.RPC("Alloc.Stop", &transReq, &out) + return &out, err +} + +func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/") // tokenize the suffix of the path to get the alloc id and find the action diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 1b6008cecd2..011cdd78f99 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -261,6 +261,32 @@ func TestHTTP_AllocQuery_Payload(t *testing.T) { }) } +func TestHTTP_AllocStop(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Directly manipulate the state + state := s.Agent.server.State() + alloc := mock.Alloc() + require := require.New(t) + require.NoError(state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) + + require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil) + require.NoError(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.AllocSpecificRequest(respW, req) + require.NoError(err) + + a := obj.(*structs.AllocStopResponse) + require.NotEmpty(a.EvalID, "missing eval") + require.NotEmpty(a.Index, "missing index") + }) +} + func TestHTTP_AllocStats(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/command/alloc_stop.go b/command/alloc_stop.go new file mode 100644 index 00000000000..a44842f69df --- /dev/null +++ b/command/alloc_stop.go @@ -0,0 +1,128 @@ +package command + +import ( + "fmt" + "strings" +) + +type AllocStopCommand struct { + Meta +} + +func (a *AllocStopCommand) Help() string { + helpText := ` +Usage: nomad alloc stop [options] +Alias: nomad stop + + stop an existing allocation. This command is used to signal a specific alloc + to shut down. When the allocation has been shut down, it will then be + rescheduled. An interactive monitoring session will display log lines as the + allocation completes shutting down. It is safe to exit the monitor early with + ctrl-c. + +General Options: + + ` + generalOptionsUsage() + ` + +Stop Specific Options: + + -detach + Return immediately instead of entering monitor mode. After the + stop command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the rescheduling evaluation using the + eval-status command. + + -verbose + Show full information. +` + return strings.TrimSpace(helpText) +} + +func (c *AllocStopCommand) Name() string { return "alloc stop" } + +func (c *AllocStopCommand) Run(args []string) int { + var detach, verbose bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got exactly one alloc + args = flags.Args() + if len(args) != 1 { + c.Ui.Error("This command takes one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + allocID := args[0] + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Query the allocation info + if len(allocID) == 1 { + c.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + + allocID = sanitizeUUIDPrefix(allocID) + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + + if len(allocs) == 0 { + c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + + if len(allocs) > 1 { + // Format the allocs + out := formatAllocListStubs(allocs, verbose, length) + c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out)) + return 1 + } + + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + resp, err := client.Allocations().Stop(alloc, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) + return 1 + } + + if detach { + c.Ui.Output(resp.EvalID) + return 0 + } + + mon := newMonitor(c.Ui, client, length) + return mon.monitor(resp.EvalID, false) +} + +func (a *AllocStopCommand) Synopsis() string { + return "Stop and reschedule a running allocation" +} diff --git a/command/alloc_stop_test.go b/command/alloc_stop_test.go new file mode 100644 index 00000000000..f46532ef86f --- /dev/null +++ b/command/alloc_stop_test.go @@ -0,0 +1,112 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestAllocStopCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &AllocStopCommand{} +} + +func TestAllocStop_Fails(t *testing.T) { + srv, _, url := testServer(t, false, nil) + defer srv.Shutdown() + + require := require.New(t) + ui := new(cli.MockUi) + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + require.Equal(cmd.Run([]string{"some", "garbage", "args"}), 1, "Expected failure") + require.Contains(ui.ErrorWriter.String(), commandErrorText(cmd), "Expected help output") + ui.ErrorWriter.Reset() + + // Fails on connection failure + require.Equal(cmd.Run([]string{"-address=nope", "foobar"}), 1, "expected failure") + require.Contains(ui.ErrorWriter.String(), "Error querying allocation") + ui.ErrorWriter.Reset() + + // Fails on missing alloc + require.Equal(cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + require.Equal(cmd.Run([]string{"-address=" + url, "2"}), 1) + require.Contains(ui.ErrorWriter.String(), "must contain at least two characters") + ui.ErrorWriter.Reset() + + // Identifiers with uneven length should produce a query result + require.Equal(cmd.Run([]string{"-address=" + url, "123"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() +} + +func TestAllocStop_Run(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + require := require.New(t) + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if _, ok := node.Drivers["mock_driver"]; ok && + node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + ui := new(cli.MockUi) + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + require.NoError(err) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + // get an alloc id + allocId1 := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocId1 = allocs[0].ID + } + } + require.NotEmpty(allocId1, "unable to find allocation") + + // Wait for alloc to be running + testutil.WaitForResult(func() (bool, error) { + alloc, _, err := client.Allocations().Info(allocId1, nil) + if err != nil { + return false, err + } + if alloc.ClientStatus == api.AllocClientStatusRunning { + return true, nil + } + return false, fmt.Errorf("alloc is not running, is: %s", alloc.ClientStatus) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + require.Equal(cmd.Run([]string{"-address=" + url, allocId1}), 0, "expected successful exit code") + + ui.OutputWriter.Reset() +} diff --git a/command/commands.go b/command/commands.go index ab2173904e6..362ffe95c36 100644 --- a/command/commands.go +++ b/command/commands.go @@ -140,6 +140,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "alloc stop": func() (cli.Command, error) { + return &AllocStopCommand{ + Meta: meta, + }, nil + }, "alloc fs": func() (cli.Command, error) { return &AllocFSCommand{ Meta: meta, diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 643fa2bf878..8e67730db05 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -10,6 +10,8 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -205,6 +207,64 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, return a.srv.blockingRPC(&opts) } +// Stop is used to stop an allocation and migrate it to another node. +func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopResponse) error { + if done, err := a.srv.forward("Alloc.Stop", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "stop"}, time.Now()) + + // Check that it is a management token. + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return structs.ErrPermissionDenied + } + + if args.AllocID == "" { + return fmt.Errorf("must provide an alloc id") + } + + // TODO: Maybe use blocking query + ws := memdb.NewWatchSet() + alloc, err := a.srv.State().AllocByID(ws, args.AllocID) + if err != nil { + return err + } + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerAllocStop, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + + transitionReq := &structs.AllocUpdateDesiredTransitionRequest{ + Evals: []*structs.Evaluation{eval}, + Allocs: map[string]*structs.DesiredTransition{ + args.AllocID: { + Migrate: helper.BoolToPtr(true), + }, + }, + } + + // Commit this update via Raft + _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, transitionReq) + if err != nil { + a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err) + return err + } + + // Setup the response + reply.Index = index + reply.EvalID = eval.ID + return nil +} + // UpdateDesiredTransition is used to update the desired transitions of an // allocation. func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error { diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index b34dc7345a7..f26c6a20383 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -567,3 +567,65 @@ func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) { require.True(*out1.DesiredTransition.Migrate) require.True(*out2.DesiredTransition.Migrate) } + +func TestAllocEndpoint_Stop_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, _ := TestACLServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + alloc := mock.Alloc() + alloc2 := mock.Alloc() + state := s1.fsm.State() + require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))) + require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})) + + req := &structs.AllocStopRequest{ + AllocID: alloc.ID, + } + req.Namespace = structs.DefaultNamespace + req.Region = alloc.Job.Region + + // Try without permissions + var resp structs.AllocStopResponse + err := msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp) + require.True(structs.IsErrPermissionDenied(err), "expected permissions error, got: %v", err) + + // Try with management permissions + req.WriteRequest.AuthToken = s1.getLeaderAcl() + var resp2 structs.AllocStopResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp2)) + require.NotZero(resp2.Index) + + // Try with alloc-lifecycle permissions + validToken := mock.CreatePolicyAndToken(t, state, 1002, "valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})) + req.WriteRequest.AuthToken = validToken.SecretID + req.AllocID = alloc2.ID + + var resp3 structs.AllocStopResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp3)) + require.NotZero(resp3.Index) + + // Look up the allocations + out1, err := state.AllocByID(nil, alloc.ID) + require.Nil(err) + out2, err := state.AllocByID(nil, alloc2.ID) + require.Nil(err) + e1, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(err) + e2, err := state.EvalByID(nil, resp3.EvalID) + require.Nil(err) + + require.NotNil(out1.DesiredTransition.Migrate) + require.NotNil(out2.DesiredTransition.Migrate) + require.NotNil(e1) + require.NotNil(e2) + require.True(*out1.DesiredTransition.Migrate) + require.True(*out2.DesiredTransition.Migrate) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 88b2db3727c..89320fd1685 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -694,6 +694,21 @@ type AllocUpdateDesiredTransitionRequest struct { WriteRequest } +// AllocStopRequest is used to stop and reschedule a running Allocation. +type AllocStopRequest struct { + AllocID string + + WriteRequest +} + +// AllocStopResponse is the response to an `AllocStopRequest` +type AllocStopResponse struct { + // EvalID is the id of the follow up evalution for the rescheduled alloc. + EvalID string + + WriteMeta +} + // AllocListRequest is used to request a list of allocations type AllocListRequest struct { QueryOptions @@ -7955,6 +7970,7 @@ const ( EvalTriggerPeriodicJob = "periodic-job" EvalTriggerNodeDrain = "node-drain" EvalTriggerNodeUpdate = "node-update" + EvalTriggerAllocStop = "alloc-stop" EvalTriggerScheduled = "scheduled" EvalTriggerRollingUpdate = "rolling-update" EvalTriggerDeploymentWatcher = "deployment-watcher" diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 03b35b943ae..3f87921ac96 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -127,6 +127,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister, structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerAllocStop, structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index e3f4015fbd1..04f1daad21a 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -61,7 +61,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, - structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy)