From c3977fe1921ad6355e5a9d166941006a940e8f4d Mon Sep 17 00:00:00 2001 From: James Rasell Date: Tue, 6 Oct 2020 09:58:46 +0200 Subject: [PATCH] cli: add scale and scaling-events commands to job cmd. This adds the ability to scale Nomad jobs and view scaling events via the CLI. --- command/commands.go | 10 ++ command/job_scale.go | 185 ++++++++++++++++++++++++++ command/job_scale_test.go | 123 +++++++++++++++++ command/job_scaling_events.go | 206 +++++++++++++++++++++++++++++ command/job_scaling_events_test.go | 90 +++++++++++++ 5 files changed, 614 insertions(+) create mode 100644 command/job_scale.go create mode 100644 command/job_scale_test.go create mode 100644 command/job_scaling_events.go create mode 100644 command/job_scaling_events_test.go diff --git a/command/commands.go b/command/commands.go index 3bb5831bff1..604ed8b7156 100644 --- a/command/commands.go +++ b/command/commands.go @@ -357,6 +357,16 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job scale": func() (cli.Command, error) { + return &JobScaleCommand{ + Meta: meta, + }, nil + }, + "job scaling-events": func() (cli.Command, error) { + return &JobScalingEventsCommand{ + Meta: meta, + }, nil + }, "job status": func() (cli.Command, error) { return &JobStatusCommand{ Meta: meta, diff --git a/command/job_scale.go b/command/job_scale.go new file mode 100644 index 00000000000..7f717b3ae3d --- /dev/null +++ b/command/job_scale.go @@ -0,0 +1,185 @@ +package command + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/hashicorp/nomad/api" + "github.com/mitchellh/cli" + "github.com/posener/complete" +) + +// Ensure JobScaleCommand satisfies the cli.Command interface. +var _ cli.Command = &JobScaleCommand{} + +// JobScaleCommand implements cli.Command. +type JobScaleCommand struct { + Meta +} + +// Help satisfies the cli.Command Help function. +func (j *JobScaleCommand) Help() string { + helpText := ` +Usage: nomad job scale [options] [] + + Perform a scaling action by altering the count within a job group. + + Upon successful job submission, this command will immediately + enter an interactive monitor. This is useful to watch Nomad's + internals make scheduling decisions and place the submitted work + onto nodes. The monitor will end once job placement is done. It + is safe to exit the monitor early using ctrl+c. + +General Options: + + ` + generalOptionsUsage() + ` + +Scale Options: + + -detach + Return immediately instead of entering monitor mode. After job scaling, + the evaluation ID will be printed to the screen, which can be used to + examine the evaluation using the eval-status command. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +// Synopsis satisfies the cli.Command Synopsis function. +func (j *JobScaleCommand) Synopsis() string { + return "Change the count of a Nomad job group" +} + +func (j *JobScaleCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(j.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +// Name returns the name of this command. +func (j *JobScaleCommand) Name() string { return "job scale" } + +// Run satisfies the cli.Command Run function. +func (j *JobScaleCommand) Run(args []string) int { + var detach, verbose bool + + flags := j.Meta.FlagSet(j.Name(), FlagSetClient) + flags.Usage = func() { j.Ui.Output(j.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + if err := flags.Parse(args); err != nil { + return 1 + } + + var jobString, countString, groupString string + args = flags.Args() + + // It is possible to specify either 2 or 3 arguments. Check and assign the + // args so they can be validate later on. + if numArgs := len(args); numArgs < 2 || numArgs > 3 { + j.Ui.Error("Command requires at least two arguments and no more than three") + return 1 + } else if numArgs == 3 { + groupString = args[1] + countString = args[2] + } else { + countString = args[1] + } + jobString = args[0] + + // Convert the count string arg to an int as required by the API. + count, err := strconv.Atoi(countString) + if err != nil { + j.Ui.Error(fmt.Sprintf("Failed to convert count string to int: %s", err)) + return 1 + } + + // Get the HTTP client. + client, err := j.Meta.Client() + if err != nil { + j.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Detail the job so we can perform addition checks before submitting the + // scaling request. + job, _, err := client.Jobs().ScaleStatus(jobString, nil) + if err != nil { + j.Ui.Error(fmt.Sprintf("Error querying job: %v", err)) + return 1 + } + + if err := j.performGroupCheck(job.TaskGroups, &groupString); err != nil { + j.Ui.Error(err.Error()) + return 1 + } + + // This is our default message added to scaling submissions. + msg := "submitted using the Nomad CLI" + + // Perform the scaling action. + resp, _, err := client.Jobs().Scale(jobString, groupString, &count, msg, false, nil, nil) + if err != nil { + j.Ui.Error(fmt.Sprintf("Error submitting scaling request: %s", err)) + return 1 + } + + // Print any warnings if we have some. + if resp.Warnings != "" { + j.Ui.Output( + j.Colorize().Color(fmt.Sprintf("[bold][yellow]Job Warnings:\n%s[reset]\n", resp.Warnings))) + } + + // If we are to detach, log the evaluation ID and exit. + if detach { + j.Ui.Output("Evaluation ID: " + resp.EvalID) + return 0 + } + + // Truncate the ID unless full length is requested. + length := shortId + if verbose { + length = fullId + } + + // Create and monitor the evaluation. + mon := newMonitor(j.Ui, client, length) + return mon.monitor(resp.EvalID, false) +} + +// performGroupCheck performs logic to ensure the user specified the correct +// group argument. +func (j *JobScaleCommand) performGroupCheck(groups map[string]api.TaskGroupScaleStatus, group *string) error { + + // If the job contains multiple groups and the user did not supply a task + // group, return an error. + if len(groups) > 1 && *group == "" { + return errors.New("Group name required") + } + + // We have to iterate the map to have any idea what task groups we are + // dealing with. + for groupName := range groups { + + // If the job has a single task group, and the user did not supply a + // task group, it is assumed we scale the only group in the job. + if len(groups) == 1 && *group == "" { + *group = groupName + return nil + } + + // If we found a match, return. + if groupName == *group { + return nil + } + } + + // If we got here, we didn't find a match and therefore return an error. + return fmt.Errorf("Group %v not found within job", *group) +} diff --git a/command/job_scale_test.go b/command/job_scale_test.go new file mode 100644 index 00000000000..c6154659486 --- /dev/null +++ b/command/job_scale_test.go @@ -0,0 +1,123 @@ +package command + +import ( + "fmt" + "strings" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" +) + +func TestJobScaleCommand_SingleGroup(t *testing.T) { + t.Parallel() + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + if _, ok := nodes[0].Drivers["mock_driver"]; !ok { + return false, fmt.Errorf("mock_driver not ready") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + ui := cli.NewMockUi() + cmd := &JobScaleCommand{Meta: Meta{Ui: ui}} + + // Register a test job and ensure it is running before moving on. + resp, _, err := client.Jobs().Register(testJob("scale_cmd_single_group"), nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("expected waitForSuccess exit code 0, got: %d", code) + } + + // Perform the scaling action. + if code := cmd.Run([]string{"-address=" + url, "-detach", "scale_cmd_single_group", "2"}); code != 0 { + t.Fatalf("expected cmd run exit code 0, got: %d", code) + } + if out := ui.OutputWriter.String(); !strings.Contains(out, "Evaluation ID:") { + t.Fatalf("Expected Evaluation ID within output: %v", out) + } +} + +func TestJobScaleCommand_MultiGroup(t *testing.T) { + t.Parallel() + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + if _, ok := nodes[0].Drivers["mock_driver"]; !ok { + return false, fmt.Errorf("mock_driver not ready") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + ui := cli.NewMockUi() + cmd := &JobScaleCommand{Meta: Meta{Ui: ui}} + + // Create a job with two task groups. + job := testJob("scale_cmd_multi_group") + task := api.NewTask("task2", "mock_driver"). + SetConfig("kill_after", "1s"). + SetConfig("run_for", "5s"). + SetConfig("exit_code", 0). + Require(&api.Resources{ + MemoryMB: helper.IntToPtr(256), + CPU: helper.IntToPtr(100), + }). + SetLogConfig(&api.LogConfig{ + MaxFiles: helper.IntToPtr(1), + MaxFileSizeMB: helper.IntToPtr(2), + }) + group2 := api.NewTaskGroup("group2", 1). + AddTask(task). + RequireDisk(&api.EphemeralDisk{ + SizeMB: helper.IntToPtr(20), + }) + job.AddTaskGroup(group2) + + // Register a test job and ensure it is running before moving on. + resp, _, err := client.Jobs().Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("expected waitForSuccess exit code 0, got: %d", code) + } + + // Attempt to scale without specifying the task group which should fail. + if code := cmd.Run([]string{"-address=" + url, "-detach", "scale_cmd_multi_group", "2"}); code != 1 { + t.Fatalf("expected cmd run exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "Group name required") { + t.Fatalf("unexpected error message: %v", out) + } + + // Specify the target group which should be successful. + if code := cmd.Run([]string{"-address=" + url, "-detach", "scale_cmd_multi_group", "group1", "2"}); code != 0 { + t.Fatalf("expected cmd run exit code 0, got: %d", code) + } + if out := ui.OutputWriter.String(); !strings.Contains(out, "Evaluation ID:") { + t.Fatalf("Expected Evaluation ID within output: %v", out) + } +} diff --git a/command/job_scaling_events.go b/command/job_scaling_events.go new file mode 100644 index 00000000000..3630730fdf1 --- /dev/null +++ b/command/job_scaling_events.go @@ -0,0 +1,206 @@ +package command + +import ( + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/mitchellh/cli" + "github.com/posener/complete" +) + +// Ensure JobScalingEventsCommand satisfies the cli.Command interface. +var _ cli.Command = &JobScalingEventsCommand{} + +// JobScalingEventsCommand implements cli.Command. +type JobScalingEventsCommand struct { + Meta +} + +// Help satisfies the cli.Command Help function. +func (j *JobScalingEventsCommand) Help() string { + helpText := ` +Usage: nomad job scaling-events [options] + + List the scaling events for the specified job. + +General Options: + + ` + generalOptionsUsage() + ` + +Scaling-Events Options: + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +// Synopsis satisfies the cli.Command Synopsis function. +func (j *JobScalingEventsCommand) Synopsis() string { + return "Display the most recent scaling events for a job" +} + +func (j *JobScalingEventsCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(j.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-verbose": complete.PredictNothing, + }) +} + +// Name returns the name of this command. +func (j *JobScalingEventsCommand) Name() string { return "job scaling-events" } + +// Run satisfies the cli.Command Run function. +func (j *JobScalingEventsCommand) Run(args []string) int { + + var verbose bool + + flags := j.Meta.FlagSet(j.Name(), FlagSetClient) + flags.Usage = func() { j.Ui.Output(j.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + if err := flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + if len(args) != 1 { + j.Ui.Error("This command takes one argument: ") + j.Ui.Error(commandErrorText(j)) + return 1 + } + + // Get the job ID. + jobID := args[0] + + // Get the HTTP client. + client, err := j.Meta.Client() + if err != nil { + j.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + events, _, err := client.Jobs().ScaleStatus(jobID, nil) + if err != nil { + j.Ui.Error(fmt.Sprintf("Error listing scaling events: %s", err)) + return 1 + } + + // Check if any of the task groups have scaling events, otherwise exit + // indicating there are not any. + var haveEvents bool + for _, tg := range events.TaskGroups { + if tg.Events != nil { + haveEvents = true + break + } + } + + if !haveEvents { + j.Ui.Output("No events found") + return 0 + } + + // Create our sorted list of events and output. + sortedList := sortedScalingEventList(events) + j.Ui.Output(formatList(formatScalingEventListOutput(sortedList, verbose, 0))) + return 0 +} + +func formatScalingEventListOutput(e scalingEventList, verbose bool, limit int) []string { + + // If the limit is zero, aka no limit or the limit is greater than the + // number of events we have then set it this to the length of the event + // list. + if limit == 0 || limit > len(e) { + limit = len(e) + } + + // Create the initial output heading. + output := make([]string, limit+1) + output[0] = "Task Group|Count|PrevCount" + + // If we are outputting verbose information, add these fields to the header + // and then add our end date field. + if verbose { + output[0] += "|Error|Message|Eval ID" + } + output[0] += "|Date" + + var i int + + for i < limit { + output[i+1] = fmt.Sprintf("%s|%s|%v", e[i].name, valueOrNil(e[i].event.Count), e[i].event.PreviousCount) + if verbose { + output[i+1] += fmt.Sprintf("|%v|%s|%s", + e[i].event.Error, e[i].event.Message, valueOrNil(e[i].event.EvalID)) + } + output[i+1] += fmt.Sprintf("|%v", formatTime(time.Unix(0, int64(e[i].event.Time)))) + i++ + } + return output +} + +// sortedScalingEventList generates a time sorted list of scaling events as +// provided by the api.JobScaleStatusResponse. +func sortedScalingEventList(e *api.JobScaleStatusResponse) []groupEvent { + + // sortedList is our output list. + var sortedList scalingEventList + + // Iterate over the response object to create a sorted list. + for group, status := range e.TaskGroups { + for _, event := range status.Events { + sortedList = append(sortedList, groupEvent{name: group, event: event}) + } + } + sort.Sort(sortedList) + + return sortedList +} + +// valueOrNil helps format the event output in cases where the object has a +// potential to be nil. +func valueOrNil(i interface{}) string { + switch t := i.(type) { + case *int64: + if t != nil { + return strconv.FormatInt(*t, 10) + } + case *string: + if t != nil { + return *t + } + } + return "" +} + +// scalingEventList is a helper list of all events for the job which allows us +// to sort based on time. +type scalingEventList []groupEvent + +// groupEvent contains all the required information of an individual group +// scaling event. +type groupEvent struct { + name string + event api.ScalingEvent +} + +// Len satisfies the Len function on the sort.Interface. +func (s scalingEventList) Len() int { + return len(s) +} + +// Less satisfies the Less function on the sort.Interface and sorts by the +// event time. +func (s scalingEventList) Less(i, j int) bool { + return s[i].event.Time > s[j].event.Time +} + +// Swap satisfies the Swap function on the sort.Interface. +func (s scalingEventList) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/command/job_scaling_events_test.go b/command/job_scaling_events_test.go new file mode 100644 index 00000000000..e9954baab40 --- /dev/null +++ b/command/job_scaling_events_test.go @@ -0,0 +1,90 @@ +package command + +import ( + "fmt" + "strings" + "testing" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" +) + +func TestJobScalingEventsCommand_Run(t *testing.T) { + t.Parallel() + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + if _, ok := nodes[0].Drivers["mock_driver"]; !ok { + return false, fmt.Errorf("mock_driver not ready") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + ui := cli.NewMockUi() + cmd := &JobScalingEventsCommand{Meta: Meta{Ui: ui}} + + // Register a test job and ensure it is running before moving on. + resp, _, err := client.Jobs().Register(testJob("scale_events_test_job"), nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("expected waitForSuccess exit code 0, got: %d", code) + } + + // List events without passing the jobID which should result in an error. + if code := cmd.Run([]string{"-address=" + url}); code != 1 { + t.Fatalf("expected cmd run exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "This command takes one argument: ") { + t.Fatalf("Expected argument error: %v", out) + } + + // List events for the job, which should present zero. + if code := cmd.Run([]string{"-address=" + url, "scale_events_test_job"}); code != 0 { + t.Fatalf("expected cmd run exit code 0, got: %d", code) + } + if out := ui.OutputWriter.String(); !strings.Contains(out, "No events found") { + t.Fatalf("Expected no events output but got: %v", out) + } + + // Perform a scaling action to generate an event. + _, _, err = client.Jobs().Scale( + "scale_events_test_job", + "group1", helper.IntToPtr(2), + "searchable custom test message", false, nil, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + + // List the scaling events which should include an entry. + if code := cmd.Run([]string{"-address=" + url, "scale_events_test_job"}); code != 0 { + t.Fatalf("expected cmd run exit code 0, got: %d", code) + } + if out := ui.OutputWriter.String(); !strings.Contains(out, "Task Group Count PrevCount Date") { + t.Fatalf("Expected table headers but got: %v", out) + } + + // List the scaling events with verbose flag to search for our message as + // well as the verbose table headers. + if code := cmd.Run([]string{"-address=" + url, "-verbose", "scale_events_test_job"}); code != 0 { + t.Fatalf("expected cmd run exit code 0, got: %d", code) + } + out := ui.OutputWriter.String() + if !strings.Contains(out, "searchable custom test message") { + t.Fatalf("Expected to find scaling message but got: %v", out) + } + if !strings.Contains(out, "Eval ID") { + t.Fatalf("Expected to verbose table headers: %v", out) + } +}