From 7fb5b426481f8290ed66ab6452555d69870ec622 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Aug 2020 13:58:16 -0400 Subject: [PATCH] MRD: move 'job stop -global' handling into RPC The initial implementation of global job stop for MRD looped over all the regions in the CLI for expedience. This changeset includes the OSS parts of moving this into the RPC layer so that API consumers don't have to implement this logic themselves. --- api/jobs.go | 24 +++++++++++++++++++ command/agent/job_endpoint.go | 15 ++++++++++-- command/job_stop.go | 20 ++-------------- nomad/job_endpoint.go | 5 ++++ nomad/job_endpoint_oss.go | 6 +++++ nomad/structs/structs.go | 4 ++++ vendor/github.com/hashicorp/nomad/api/jobs.go | 24 +++++++++++++++++++ 7 files changed, 78 insertions(+), 20 deletions(-) diff --git a/api/jobs.go b/api/jobs.go index bd79537252f..fc195238e36 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -277,6 +277,30 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W return resp.EvalID, wm, nil } +// DeregisterOptions is used to pass through job deregistration parameters +type DeregisterOptions struct { + // If Purge is set to true, the job is deregistered and purged from the + // system versus still being queryable and eventually GC'ed from the + // system. Most callers should not specify purge. + Purge bool + + // If Global is set to true, all regions of a multiregion job will be + // stopped. + Global bool +} + +// DeregisterOpts is used to remove an existing job. See DeregisterOptions +// for parameters. +func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) { + var resp JobDeregisterResponse + wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t", + url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + // ForceEvaluate is used to force-evaluate an existing job. func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) { var resp JobRegisterResponse diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 7931180cfe8..98a7a9bb941 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -421,9 +421,20 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, } } + globalStr := req.URL.Query().Get("global") + var globalBool bool + if globalStr != "" { + var err error + globalBool, err = strconv.ParseBool(globalStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err) + } + } + args := structs.JobDeregisterRequest{ - JobID: jobName, - Purge: purgeBool, + JobID: jobName, + Purge: purgeBool, + Global: globalBool, } s.parseWriteRequest(req, &args.WriteRequest) diff --git a/command/job_stop.go b/command/job_stop.go index be2f88efa09..b4ffd75ff11 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -185,26 +185,10 @@ func (c *JobStopCommand) Run(args []string) int { } } - // Scatter-gather job stop for multi-region jobs - if global && job.IsMultiregion() { - for _, region := range job.Multiregion.Regions { - // Invoke the stop - wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace, Region: region.Name} - evalID, _, err := client.Jobs().Deregister(*job.ID, purge, wq) - if err != nil { - c.Ui.Error(fmt.Sprintf("Error deregistering job in %q: %s", region.Name, err)) - return 1 - } - if evalID != "" { - c.Ui.Output(evalID) - } - } - return 0 - } - // Invoke the stop + opts := &api.DeregisterOptions{Purge: purge, Global: global} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} - evalID, _, err := client.Jobs().Deregister(*job.ID, purge, wq) + evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err)) return 1 diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index fabaccb73c8..b0ed35553e7 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -869,6 +869,11 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD reply.Index = evalIndex } + err = j.multiregionStop(job, args, reply) + if err != nil { + return err + } + return nil } diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index 9d36255c844..e234844293a 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -31,6 +31,12 @@ func (j *Job) multiregionDrop(args *structs.JobRegisterRequest, reply *structs.J return nil } +// multiregionStop is used to fan-out Job.Deregister RPCs to all regions if +// the global flag is passed to Job.Deregister +func (j *Job) multiregionStop(job *structs.Job, args *structs.JobDeregisterRequest, reply *structs.JobDeregisterResponse) error { + return nil +} + // interpolateMultiregionFields interpolates a job for a specific region func (j *Job) interpolateMultiregionFields(args *structs.JobPlanRequest) error { return nil diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 493aba30cd3..33ebdf9c4d3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -579,6 +579,10 @@ type JobDeregisterRequest struct { // garbage collector Purge bool + // Global controls whether all regions of a multi-region job are + // deregistered. It is ignored for single-region jobs. + Global bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index bd79537252f..fc195238e36 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -277,6 +277,30 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W return resp.EvalID, wm, nil } +// DeregisterOptions is used to pass through job deregistration parameters +type DeregisterOptions struct { + // If Purge is set to true, the job is deregistered and purged from the + // system versus still being queryable and eventually GC'ed from the + // system. Most callers should not specify purge. + Purge bool + + // If Global is set to true, all regions of a multiregion job will be + // stopped. + Global bool +} + +// DeregisterOpts is used to remove an existing job. See DeregisterOptions +// for parameters. +func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) { + var resp JobDeregisterResponse + wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t", + url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + // ForceEvaluate is used to force-evaluate an existing job. func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) { var resp JobRegisterResponse