diff --git a/api/deployments.go b/api/deployments.go index 8a241243c36..4a4844246fa 100644 --- a/api/deployments.go +++ b/api/deployments.go @@ -107,6 +107,19 @@ func (d *Deployments) PromoteGroups(deploymentID string, groups []string, q *Wri return &resp, wm, nil } +// Unblock is used to unblock the given deployment. +func (d *Deployments) Unblock(deploymentID string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) { + var resp DeploymentUpdateResponse + req := &DeploymentUnblockRequest{ + DeploymentID: deploymentID, + } + wm, err := d.client.write("/v1/deployment/unblock/"+deploymentID, req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // SetAllocHealth is used to set allocation health for allocs that are part of // the given deployment func (d *Deployments) SetAllocHealth(deploymentID string, healthy, unhealthy []string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) { @@ -150,6 +163,9 @@ type Deployment struct { // present the correct list of deployments for the job and not old ones. JobCreateIndex uint64 + // IsMultiregion specifies if this deployment is part of a multi-region deployment + IsMultiregion bool + // TaskGroups is the set of task groups effected by the deployment and their // current deployment status. TaskGroups map[string]*DeploymentState @@ -257,6 +273,12 @@ type DeploymentFailRequest struct { WriteRequest } +// DeploymentUnblockRequest is used to unblock a particular deployment +type DeploymentUnblockRequest struct { + DeploymentID string + WriteRequest +} + // SingleDeploymentResponse is used to respond with a single deployment type SingleDeploymentResponse struct { Deployment *Deployment diff --git a/api/jobs.go b/api/jobs.go index b8f56acbeff..7c1d66714f8 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -622,6 +622,78 @@ func (u *UpdateStrategy) Empty() bool { return true } +type Multiregion struct { + Strategy *MultiregionStrategy + Regions []*MultiregionRegion +} + +func (m *Multiregion) Canonicalize() { + if m.Strategy == nil { + m.Strategy = &MultiregionStrategy{ + MaxParallel: intToPtr(0), + OnFailure: stringToPtr(""), + } + } else { + if m.Strategy.MaxParallel == nil { + m.Strategy.MaxParallel = intToPtr(0) + } + if m.Strategy.OnFailure == nil { + m.Strategy.OnFailure = stringToPtr("") + } + } + if m.Regions == nil { + m.Regions = []*MultiregionRegion{} + } + for _, region := range m.Regions { + if region.Count == nil { + region.Count = intToPtr(1) + } + if region.Datacenters == nil { + region.Datacenters = []string{} + } + if region.Meta == nil { + region.Meta = map[string]string{} + } + } +} + +func (m *Multiregion) Copy() *Multiregion { + if m == nil { + return nil + } + copy := new(Multiregion) + if m.Strategy != nil { + copy.Strategy = new(MultiregionStrategy) + copy.Strategy.MaxParallel = intToPtr(*m.Strategy.MaxParallel) + copy.Strategy.OnFailure = stringToPtr(*m.Strategy.OnFailure) + } + for _, region := range m.Regions { + copyRegion := new(MultiregionRegion) + copyRegion.Name = region.Name + copyRegion.Count = intToPtr(*region.Count) + for _, dc := range region.Datacenters { + copyRegion.Datacenters = append(copyRegion.Datacenters, dc) + } + for k, v := range region.Meta { + copyRegion.Meta[k] = v + } + copy.Regions = append(copy.Regions, copyRegion) + } + return copy +} + +type MultiregionStrategy struct { + MaxParallel *int `mapstructure:"max_parallel"` + OnFailure *string `mapstructure:"on_failure"` +} + +type MultiregionRegion struct { + Name string + Count *int + Datacenters []string + Meta map[string]string +} + // PeriodicConfig is for serializing periodic config for a job. type PeriodicConfig struct { Enabled *bool @@ -711,6 +783,7 @@ type Job struct { Affinities []*Affinity TaskGroups []*TaskGroup Update *UpdateStrategy + Multiregion *Multiregion Spreads []*Spread Periodic *PeriodicConfig ParameterizedJob *ParameterizedJobConfig @@ -741,6 +814,11 @@ func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } +// IsMultiregion returns whether a job is a multiregion job +func (j *Job) IsMultiregion() bool { + return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 +} + func (j *Job) Canonicalize() { if j.ID == nil { j.ID = stringToPtr("") @@ -807,6 +885,9 @@ func (j *Job) Canonicalize() { } else if *j.Type == JobTypeService { j.Update = DefaultUpdateStrategy() } + if j.Multiregion != nil { + j.Multiregion.Canonicalize() + } for _, tg := range j.TaskGroups { tg.Canonicalize(j) diff --git a/api/jobs_test.go b/api/jobs_test.go index 34ebb91d747..31ceebfcd22 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -1096,6 +1096,68 @@ func TestJobs_Canonicalize(t *testing.T) { }, }, }, + + { + name: "multiregion", + input: &Job{ + Name: stringToPtr("foo"), + ID: stringToPtr("bar"), + ParentID: stringToPtr("lol"), + Multiregion: &Multiregion{ + Regions: []*MultiregionRegion{ + { + Name: "west", + Count: intToPtr(1), + }, + }, + }, + }, + expected: &Job{ + Multiregion: &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: intToPtr(0), + OnFailure: stringToPtr(""), + }, + Regions: []*MultiregionRegion{ + { + Name: "west", + Count: intToPtr(1), + Datacenters: []string{}, + Meta: map[string]string{}, + }, + }, + }, + Namespace: stringToPtr(DefaultNamespace), + ID: stringToPtr("bar"), + Name: stringToPtr("foo"), + Region: stringToPtr("global"), + Type: stringToPtr("service"), + ParentID: stringToPtr("lol"), + Priority: intToPtr(50), + AllAtOnce: boolToPtr(false), + ConsulToken: stringToPtr(""), + VaultToken: stringToPtr(""), + Stop: boolToPtr(false), + Stable: boolToPtr(false), + Version: uint64ToPtr(0), + Status: stringToPtr(""), + StatusDescription: stringToPtr(""), + CreateIndex: uint64ToPtr(0), + ModifyIndex: uint64ToPtr(0), + JobModifyIndex: uint64ToPtr(0), + Update: &UpdateStrategy{ + Stagger: timeToPtr(30 * time.Second), + MaxParallel: intToPtr(1), + HealthCheck: stringToPtr("checks"), + MinHealthyTime: timeToPtr(10 * time.Second), + HealthyDeadline: timeToPtr(5 * time.Minute), + ProgressDeadline: timeToPtr(10 * time.Minute), + AutoRevert: boolToPtr(false), + Canary: intToPtr(0), + AutoPromote: boolToPtr(false), + }, + }, + }, } for _, tc := range testCases { diff --git a/command/agent/deployment_endpoint.go b/command/agent/deployment_endpoint.go index dd18a6564fd..58829adce36 100644 --- a/command/agent/deployment_endpoint.go +++ b/command/agent/deployment_endpoint.go @@ -8,8 +8,8 @@ import ( ) func (s *HTTPServer) DeploymentsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } args := structs.DeploymentListRequest{} @@ -47,6 +47,9 @@ func (s *HTTPServer) DeploymentSpecificRequest(resp http.ResponseWriter, req *ht case strings.HasPrefix(path, "allocation-health/"): deploymentID := strings.TrimPrefix(path, "allocation-health/") return s.deploymentSetAllocHealth(resp, req, deploymentID) + case strings.HasPrefix(path, "unblock/"): + deploymentID := strings.TrimPrefix(path, "unblock/") + return s.deploymentUnblock(resp, req, deploymentID) default: return s.deploymentQuery(resp, req, path) } @@ -54,8 +57,8 @@ func (s *HTTPServer) DeploymentSpecificRequest(resp http.ResponseWriter, req *ht // TODO test and api func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "PUT" && req.Method != "POST" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodPut && req.Method != http.MethodPost { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } args := structs.DeploymentFailRequest{ DeploymentID: deploymentID, @@ -71,19 +74,19 @@ func (s *HTTPServer) deploymentFail(resp http.ResponseWriter, req *http.Request, } func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "PUT" && req.Method != "POST" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodPut && req.Method != http.MethodPost { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } var pauseRequest structs.DeploymentPauseRequest if err := decodeBody(req, &pauseRequest); err != nil { - return nil, CodedError(400, err.Error()) + return nil, CodedError(http.StatusBadRequest, err.Error()) } if pauseRequest.DeploymentID == "" { - return nil, CodedError(400, "DeploymentID must be specified") + return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified") } if pauseRequest.DeploymentID != deploymentID { - return nil, CodedError(400, "Deployment ID does not match") + return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match") } s.parseWriteRequest(req, &pauseRequest.WriteRequest) @@ -96,19 +99,19 @@ func (s *HTTPServer) deploymentPause(resp http.ResponseWriter, req *http.Request } func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "PUT" && req.Method != "POST" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodPut && req.Method != http.MethodPost { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } var promoteRequest structs.DeploymentPromoteRequest if err := decodeBody(req, &promoteRequest); err != nil { - return nil, CodedError(400, err.Error()) + return nil, CodedError(http.StatusBadRequest, err.Error()) } if promoteRequest.DeploymentID == "" { - return nil, CodedError(400, "DeploymentID must be specified") + return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified") } if promoteRequest.DeploymentID != deploymentID { - return nil, CodedError(400, "Deployment ID does not match") + return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match") } s.parseWriteRequest(req, &promoteRequest.WriteRequest) @@ -120,20 +123,45 @@ func (s *HTTPServer) deploymentPromote(resp http.ResponseWriter, req *http.Reque return out, nil } +func (s *HTTPServer) deploymentUnblock(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { + if req.Method != http.MethodPut && req.Method != http.MethodPost { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) + } + + var unblockRequest structs.DeploymentUnblockRequest + if err := decodeBody(req, &unblockRequest); err != nil { + return nil, CodedError(http.StatusBadRequest, err.Error()) + } + if unblockRequest.DeploymentID == "" { + return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified") + } + if unblockRequest.DeploymentID != deploymentID { + return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match") + } + s.parseWriteRequest(req, &unblockRequest.WriteRequest) + + var out structs.DeploymentUpdateResponse + if err := s.agent.RPC("Deployment.Unblock", &unblockRequest, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} + func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "PUT" && req.Method != "POST" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodPut && req.Method != http.MethodPost { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } var healthRequest structs.DeploymentAllocHealthRequest if err := decodeBody(req, &healthRequest); err != nil { - return nil, CodedError(400, err.Error()) + return nil, CodedError(http.StatusBadRequest, err.Error()) } if healthRequest.DeploymentID == "" { - return nil, CodedError(400, "DeploymentID must be specified") + return nil, CodedError(http.StatusBadRequest, "DeploymentID must be specified") } if healthRequest.DeploymentID != deploymentID { - return nil, CodedError(400, "Deployment ID does not match") + return nil, CodedError(http.StatusBadRequest, "Deployment ID does not match") } s.parseWriteRequest(req, &healthRequest.WriteRequest) @@ -146,8 +174,8 @@ func (s *HTTPServer) deploymentSetAllocHealth(resp http.ResponseWriter, req *htt } func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } args := structs.DeploymentSpecificRequest{ @@ -173,8 +201,8 @@ func (s *HTTPServer) deploymentAllocations(resp http.ResponseWriter, req *http.R } func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request, deploymentID string) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) + if req.Method != http.MethodGet { + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } args := structs.DeploymentSpecificRequest{ @@ -191,7 +219,7 @@ func (s *HTTPServer) deploymentQuery(resp http.ResponseWriter, req *http.Request setMeta(resp, &out.QueryMeta) if out.Deployment == nil { - return nil, CodedError(404, "deployment not found") + return nil, CodedError(http.StatusNotFound, "deployment not found") } return out.Deployment, nil } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index b7671b4ac60..b2c6db3f9c6 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -146,16 +146,20 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request, return nil, CodedError(400, "Job ID does not match") } + var region *string + // Region in http request query param takes precedence over region in job hcl config if args.WriteRequest.Region != "" { - args.Job.Region = helper.StringToPtr(args.WriteRequest.Region) + region = helper.StringToPtr(args.WriteRequest.Region) } // If 'global' region is specified or if no region is given, // default to region of the node you're submitting to - if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { - args.Job.Region = &s.agent.config.Region + if region == nil || args.Job.Region == nil || + *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { + region = &s.agent.config.Region } + args.Job.Region = regionForJob(args.Job, region) sJob := ApiJobToStructJob(args.Job) planReq := structs.JobPlanRequest{ @@ -163,7 +167,7 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request, Diff: args.Diff, PolicyOverride: args.PolicyOverride, WriteRequest: structs.WriteRequest{ - Region: sJob.Region, + Region: *region, }, } // parseWriteRequest overrides Namespace, Region and AuthToken @@ -395,17 +399,27 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, if jobName != "" && *args.Job.ID != jobName { return nil, CodedError(400, "Job ID does not match name") } + if args.Job.Multiregion != nil && args.Job.Region != nil { + region := *args.Job.Region + if !(region == "global" || region == "") { + return nil, CodedError(400, "Job can't have both multiregion and region blocks") + } + } + + var region *string // Region in http request query param takes precedence over region in job hcl config if args.WriteRequest.Region != "" { - args.Job.Region = helper.StringToPtr(args.WriteRequest.Region) + region = helper.StringToPtr(args.WriteRequest.Region) } // If 'global' region is specified or if no region is given, // default to region of the node you're submitting to - if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { - args.Job.Region = &s.agent.config.Region + if region == nil || args.Job.Region == nil || + *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { + region = &s.agent.config.Region } + args.Job.Region = regionForJob(args.Job, region) sJob := ApiJobToStructJob(args.Job) regReq := structs.JobRegisterRequest{ @@ -415,7 +429,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, PolicyOverride: args.PolicyOverride, PreserveCounts: args.PreserveCounts, WriteRequest: structs.WriteRequest{ - Region: sJob.Region, + Region: *region, AuthToken: args.WriteRequest.SecretID, }, } @@ -763,6 +777,23 @@ func ApiJobToStructJob(job *api.Job) *structs.Job { } } + if job.Multiregion != nil { + j.Multiregion = &structs.Multiregion{} + j.Multiregion.Strategy = &structs.MultiregionStrategy{ + MaxParallel: *job.Multiregion.Strategy.MaxParallel, + OnFailure: *job.Multiregion.Strategy.OnFailure, + } + j.Multiregion.Regions = []*structs.MultiregionRegion{} + for _, region := range job.Multiregion.Regions { + r := &structs.MultiregionRegion{} + r.Name = region.Name + r.Count = *region.Count + r.Datacenters = region.Datacenters + r.Meta = region.Meta + j.Multiregion.Regions = append(j.Multiregion.Regions, r) + } + } + if l := len(job.TaskGroups); l != 0 { j.TaskGroups = make([]*structs.TaskGroup, l) for i, taskGroup := range job.TaskGroups { diff --git a/command/agent/job_endpoint_oss.go b/command/agent/job_endpoint_oss.go new file mode 100644 index 00000000000..58d34d5b17d --- /dev/null +++ b/command/agent/job_endpoint_oss.go @@ -0,0 +1,11 @@ +// +build !ent + +package agent + +import ( + "github.com/hashicorp/nomad/api" +) + +func regionForJob(job *api.Job, requestRegion *string) *string { + return requestRegion +} diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index b0d4befa1d6..7d6f5f568be 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1575,6 +1575,20 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Meta: map[string]string{ "foo": "bar", }, + Multiregion: &api.Multiregion{ + Strategy: &api.MultiregionStrategy{ + MaxParallel: helper.IntToPtr(2), + OnFailure: helper.StringToPtr("fail_all"), + }, + Regions: []*api.MultiregionRegion{ + { + Name: "west", + Count: helper.IntToPtr(1), + Datacenters: []string{"dc1", "dc2"}, + Meta: map[string]string{"region_code": "W"}, + }, + }, + }, TaskGroups: []*api.TaskGroup{ { Name: helper.StringToPtr("group1"), @@ -1928,6 +1942,20 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Meta: map[string]string{ "foo": "bar", }, + Multiregion: &structs.Multiregion{ + Strategy: &structs.MultiregionStrategy{ + MaxParallel: 2, + OnFailure: "fail_all", + }, + Regions: []*structs.MultiregionRegion{ + { + Name: "west", + Count: 1, + Datacenters: []string{"dc1", "dc2"}, + Meta: map[string]string{"region_code": "W"}, + }, + }, + }, TaskGroups: []*structs.TaskGroup{ { Name: "group1", diff --git a/command/commands.go b/command/commands.go index 7aabefe7fea..6d9d60ea113 100644 --- a/command/commands.go +++ b/command/commands.go @@ -236,6 +236,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "deployment unblock": func() (cli.Command, error) { + return &DeploymentUnblockCommand{ + Meta: meta, + }, nil + }, "eval": func() (cli.Command, error) { return &EvalCommand{ Meta: meta, diff --git a/command/deployment_status.go b/command/deployment_status.go index 61b0b5130c7..d1a4adfb1d1 100644 --- a/command/deployment_status.go +++ b/command/deployment_status.go @@ -1,6 +1,7 @@ package command import ( + "errors" "fmt" "sort" "strings" @@ -140,7 +141,7 @@ func (c *DeploymentStatusCommand) Run(args []string) int { return 0 } - c.Ui.Output(c.Colorize().Color(formatDeployment(deploy, length))) + c.Ui.Output(c.Colorize().Color(formatDeployment(client, deploy, length))) return 0 } @@ -182,7 +183,7 @@ func getDeployment(client *api.Deployments, dID string) (match *api.Deployment, } } -func formatDeployment(d *api.Deployment, uuidLength int) string { +func formatDeployment(c *api.Client, d *api.Deployment, uuidLength int) string { if d == nil { return "No deployment found" } @@ -196,6 +197,18 @@ func formatDeployment(d *api.Deployment, uuidLength int) string { } base := formatKV(high) + + // Fetch and Format Multi-region info + if d.IsMultiregion { + regions, err := fetchMultiRegionDeployments(c, d) + if err != nil { + base += "\n\nError fetching Multiregion deployments\n\n" + } else if len(regions) > 0 { + base += "\n\n[bold]Multiregion Deployment[reset]\n" + base += formatMultiregionDeployment(regions, uuidLength) + } + } + if len(d.TaskGroups) == 0 { return base } @@ -204,6 +217,73 @@ func formatDeployment(d *api.Deployment, uuidLength int) string { return base } +type regionResult struct { + region string + d *api.Deployment + err error +} + +func fetchMultiRegionDeployments(c *api.Client, d *api.Deployment) (map[string]*api.Deployment, error) { + results := make(map[string]*api.Deployment) + + job, _, err := c.Jobs().Info(d.JobID, &api.QueryOptions{}) + if err != nil { + return nil, err + } + + requests := make(chan regionResult, len(job.Multiregion.Regions)) + for i := 0; i < cap(requests); i++ { + go func(itr int) { + region := job.Multiregion.Regions[itr] + d, err := fetchRegionDeployment(c, d, region) + requests <- regionResult{d: d, err: err, region: region.Name} + }(i) + } + for i := 0; i < cap(requests); i++ { + res := <-requests + if res.err != nil { + key := fmt.Sprintf("%s (error)", res.region) + results[key] = &api.Deployment{} + continue + } + results[res.region] = res.d + + } + return results, nil +} + +func fetchRegionDeployment(c *api.Client, d *api.Deployment, region *api.MultiregionRegion) (*api.Deployment, error) { + if region == nil { + return nil, errors.New("Region not found") + } + + opts := &api.QueryOptions{Region: region.Name} + deploys, _, err := c.Jobs().Deployments(d.JobID, false, opts) + if err != nil { + return nil, err + } + for _, dep := range deploys { + if dep.JobVersion == d.JobVersion { + return dep, nil + } + } + return nil, fmt.Errorf("Could not find job version %d for region", d.JobVersion) +} + +func formatMultiregionDeployment(regions map[string]*api.Deployment, uuidLength int) string { + rowString := "Region|ID|Status" + rows := make([]string, len(regions)+1) + rows[0] = rowString + i := 1 + for k, v := range regions { + row := fmt.Sprintf("%s|%s|%s", k, limit(v.ID, uuidLength), v.Status) + rows[i] = row + i++ + } + sort.Strings(rows) + return formatList(rows) +} + func formatDeploymentGroups(d *api.Deployment, uuidLength int) string { // Detect if we need to add these columns var canaries, autorevert, progressDeadline bool diff --git a/command/deployment_status_test.go b/command/deployment_status_test.go index 89ad05f4ac8..59ea74c2ca3 100644 --- a/command/deployment_status_test.go +++ b/command/deployment_status_test.go @@ -1,9 +1,13 @@ package command import ( + "fmt" "testing" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/command/agent" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/cli" "github.com/posener/complete" "github.com/stretchr/testify/assert" @@ -64,3 +68,103 @@ func TestDeploymentStatusCommand_AutocompleteArgs(t *testing.T) { assert.Equal(1, len(res)) assert.Equal(d.ID, res[0]) } + +func TestDeploymentStatusCommand_Multiregion(t *testing.T) { + t.Parallel() + + cbe := func(config *agent.Config) { + config.Region = "east" + config.Datacenter = "east-1" + } + cbw := func(config *agent.Config) { + config.Region = "west" + config.Datacenter = "west-1" + } + + srv, clientEast, url := testServer(t, true, cbe) + defer srv.Shutdown() + + srv2, clientWest, _ := testServer(t, true, cbw) + defer srv2.Shutdown() + + // Join with srv1 + addr1 := fmt.Sprintf("127.0.0.1:%d", + srv.Agent.Server().GetConfig().SerfConfig.MemberlistConfig.BindPort) + + if _, err := srv2.Agent.Server().Join([]string{addr1}); err != nil { + t.Fatalf("Join err: %v", err) + } + + // wait for client node + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := clientEast.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + if _, ok := nodes[0].Drivers["mock_driver"]; !ok { + return false, fmt.Errorf("mock_driver not ready") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + ui := new(cli.MockUi) + cmd := &DeploymentStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Register multiregion job in east + jobEast := testMultiRegionJob("job1_sfxx", "east", "east-1") + resp, _, err := clientEast.Jobs().Register(jobEast, nil) + require.NoError(t, err) + if code := waitForSuccess(ui, clientEast, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + + // Register multiregion job in west + jobWest := testMultiRegionJob("job1_sfxx", "west", "west-1") + resp2, _, err := clientWest.Jobs().Register(jobWest, &api.WriteOptions{Region: "west"}) + require.NoError(t, err) + if code := waitForSuccess(ui, clientWest, fullId, t, resp2.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + + jobs, _, err := clientEast.Jobs().List(&api.QueryOptions{}) + require.NoError(t, err) + require.Len(t, jobs, 1) + + deploys, _, err := clientEast.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{}) + require.NoError(t, err) + require.Len(t, deploys, 1) + + // Grab both deployments to verify output + eastDeploys, _, err := clientEast.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{Region: "east"}) + require.NoError(t, err) + require.Len(t, eastDeploys, 1) + + westDeploys, _, err := clientWest.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{Region: "west"}) + require.NoError(t, err) + require.Len(t, westDeploys, 1) + + // Run command for specific deploy + if code := cmd.Run([]string{"-region=east", "-address=" + url, deploys[0].ID}); code != 0 { + t.Fatalf("expected exit 0, got: %d", code) + } + + // Verify Multi-region Deployment info populated + out := ui.OutputWriter.String() + require.Contains(t, out, "Multiregion Deployment") + require.Contains(t, out, "Region") + require.Contains(t, out, "ID") + require.Contains(t, out, "Status") + require.Contains(t, out, "east") + require.Contains(t, out, eastDeploys[0].ID[0:7]) + require.Contains(t, out, "west") + require.Contains(t, out, westDeploys[0].ID[0:7]) + require.Contains(t, out, "running") + + require.NotContains(t, out, "") + +} diff --git a/command/deployment_unblock.go b/command/deployment_unblock.go new file mode 100644 index 00000000000..d530fc6c30e --- /dev/null +++ b/command/deployment_unblock.go @@ -0,0 +1,131 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type DeploymentUnblockCommand struct { + Meta +} + +func (c *DeploymentUnblockCommand) Help() string { + helpText := ` +Usage: nomad deployment unblock [options] + + Unblock is used to unblock a multiregion deployment that's waiting for + peer region deployments to complete. + +General Options: + + ` + generalOptionsUsage() + ` + +Unblock Options: + + -detach + Return immediately instead of entering monitor mode. After deployment + unblock, the evaluation ID will be printed to the screen, which can be used + to examine the evaluation using the eval-status command. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +func (c *DeploymentUnblockCommand) Synopsis() string { + return "Unblock a blocked deployment" +} + +func (c *DeploymentUnblockCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +func (c *DeploymentUnblockCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Deployments, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Deployments] + }) +} + +func (c *DeploymentUnblockCommand) Name() string { return "deployment unblock" } +func (c *DeploymentUnblockCommand) Run(args []string) int { + var detach, verbose bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got exactly one argument + args = flags.Args() + if l := len(args); l != 1 { + c.Ui.Error("This command takes one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + dID := args[0] + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Do a prefix lookup + deploy, possible, err := getDeployment(client.Deployments(), dID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving deployment: %s", err)) + return 1 + } + + if len(possible) != 0 { + c.Ui.Error(fmt.Sprintf("Prefix matched multiple deployments\n\n%s", formatDeployments(possible, length))) + return 1 + } + + u, _, err := client.Deployments().Unblock(deploy.ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error unblocking deployment: %s", err)) + return 1 + } + + c.Ui.Output(fmt.Sprintf("Deployment %q unblocked", deploy.ID)) + evalCreated := u.EvalID != "" + + // Nothing to do + if detach || !evalCreated { + return 0 + } + + c.Ui.Output("") + mon := newMonitor(c.Ui, client, length) + return mon.monitor(u.EvalID, false) +} diff --git a/command/deployment_unblock_test.go b/command/deployment_unblock_test.go new file mode 100644 index 00000000000..72adc8be926 --- /dev/null +++ b/command/deployment_unblock_test.go @@ -0,0 +1,63 @@ +package command + +import ( + "strings" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/stretchr/testify/assert" +) + +func TestDeploymentUnblockCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &DeploymentUnblockCommand{} +} + +func TestDeploymentUnblockCommand_Fails(t *testing.T) { + t.Parallel() + ui := new(cli.MockUi) + cmd := &DeploymentUnblockCommand{Meta: Meta{Ui: ui}} + + // Unblocks on misuse + if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, commandErrorText(cmd)) { + t.Fatalf("expected help output, got: %s", out) + } + ui.ErrorWriter.Reset() + + if code := cmd.Run([]string{"-address=nope", "12"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error retrieving deployment") { + t.Fatalf("expected unblocked query error, got: %s", out) + } + ui.ErrorWriter.Reset() +} + +func TestDeploymentUnblockCommand_AutocompleteArgs(t *testing.T) { + assert := assert.New(t) + t.Parallel() + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &DeploymentUnblockCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a fake deployment + state := srv.Agent.Server().State() + d := mock.Deployment() + assert.Nil(state.UpsertDeployment(1000, d)) + + prefix := d.ID[:5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + assert.Equal(1, len(res)) + assert.Equal(d.ID, res[0]) +} diff --git a/command/job_deployments.go b/command/job_deployments.go index 66ca76ca802..39de2f94ddc 100644 --- a/command/job_deployments.go +++ b/command/job_deployments.go @@ -148,7 +148,7 @@ func (c *JobDeploymentsCommand) Run(args []string) int { return 0 } - c.Ui.Output(c.Colorize().Color(formatDeployment(deploy, length))) + c.Ui.Output(c.Colorize().Color(formatDeployment(client, deploy, length))) return 0 } diff --git a/command/job_run.go b/command/job_run.go index b52ae2b0ac9..3af389b2183 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -191,6 +191,7 @@ func (c *JobRunCommand) Run(args []string) int { // Check if the job is periodic or is a parameterized job periodic := job.IsPeriodic() paramjob := job.IsParameterized() + multiregion := job.IsMultiregion() // Parse the Consul token if consulToken == "" { @@ -271,7 +272,7 @@ func (c *JobRunCommand) Run(args []string) int { evalID := resp.EvalID // Check if we should enter monitor mode - if detach || periodic || paramjob { + if detach || periodic || paramjob || multiregion { c.Ui.Output("Job registration successful") if periodic && !paramjob { loc, err := job.Periodic.GetLocation() diff --git a/command/job_status.go b/command/job_status.go index 7448cd0bfe0..3dd50583099 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -381,7 +381,7 @@ func (c *JobStatusCommand) outputJobInfo(client *api.Client, job *api.Job) error if latestDeployment != nil { c.Ui.Output(c.Colorize().Color("\n[bold]Latest Deployment[reset]")) - c.Ui.Output(c.Colorize().Color(c.formatDeployment(latestDeployment))) + c.Ui.Output(c.Colorize().Color(c.formatDeployment(client, latestDeployment))) } // Format the allocs @@ -390,7 +390,7 @@ func (c *JobStatusCommand) outputJobInfo(client *api.Client, job *api.Job) error return nil } -func (c *JobStatusCommand) formatDeployment(d *api.Deployment) string { +func (c *JobStatusCommand) formatDeployment(client *api.Client, d *api.Deployment) string { // Format the high-level elements high := []string{ fmt.Sprintf("ID|%s", limit(d.ID, c.length)), @@ -399,6 +399,17 @@ func (c *JobStatusCommand) formatDeployment(d *api.Deployment) string { } base := formatKV(high) + + if d.IsMultiregion { + regions, err := fetchMultiRegionDeployments(client, d) + if err != nil { + base += "\n\nError fetching Multiregion deployments\n\n" + } else if len(regions) > 0 { + base += "\n\n[bold]Multiregion Deployment[reset]\n" + base += formatMultiregionDeployment(regions, 8) + } + } + if len(d.TaskGroups) == 0 { return base } diff --git a/command/job_status_test.go b/command/job_status_test.go index 745af018e4c..116ffe624ac 100644 --- a/command/job_status_test.go +++ b/command/job_status_test.go @@ -385,6 +385,107 @@ func TestJobStatusCommand_RescheduleEvals(t *testing.T) { require.Contains(out, e.ID[:8]) } +// TestJobStatusCommand_Multiregion tests multiregion deployment output +func TestJobStatusCommand_Multiregion(t *testing.T) { + t.Parallel() + + cbe := func(config *agent.Config) { + config.Region = "east" + config.Datacenter = "east-1" + } + cbw := func(config *agent.Config) { + config.Region = "west" + config.Datacenter = "west-1" + } + + srv, clientEast, url := testServer(t, true, cbe) + defer srv.Shutdown() + + srv2, clientWest, _ := testServer(t, true, cbw) + defer srv2.Shutdown() + + // Join with srv1 + addr := fmt.Sprintf("127.0.0.1:%d", + srv.Agent.Server().GetConfig().SerfConfig.MemberlistConfig.BindPort) + + if _, err := srv2.Agent.Server().Join([]string{addr}); err != nil { + t.Fatalf("Join err: %v", err) + } + + // wait for client node + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := clientEast.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + if _, ok := nodes[0].Drivers["mock_driver"]; !ok { + return false, fmt.Errorf("mock_driver not ready") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + ui := new(cli.MockUi) + cmd := &JobStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Register multiregion job + // Register multiregion job in east + jobEast := testMultiRegionJob("job1_sfxx", "east", "east-1") + resp, _, err := clientEast.Jobs().Register(jobEast, nil) + require.NoError(t, err) + if code := waitForSuccess(ui, clientEast, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + + // Register multiregion job in west + jobWest := testMultiRegionJob("job1_sfxx", "west", "west-1") + resp2, _, err := clientWest.Jobs().Register(jobWest, &api.WriteOptions{Region: "west"}) + require.NoError(t, err) + if code := waitForSuccess(ui, clientWest, fullId, t, resp2.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + + jobs, _, err := clientEast.Jobs().List(&api.QueryOptions{}) + require.NoError(t, err) + require.Len(t, jobs, 1) + + deploys, _, err := clientEast.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{}) + require.NoError(t, err) + require.Len(t, deploys, 1) + + // Grab both deployments to verify output + eastDeploys, _, err := clientEast.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{Region: "east"}) + require.NoError(t, err) + require.Len(t, eastDeploys, 1) + + westDeploys, _, err := clientWest.Jobs().Deployments(jobs[0].ID, true, &api.QueryOptions{Region: "west"}) + require.NoError(t, err) + // require.Len(t, westDeploys, 1) + + // Run command for specific deploy + if code := cmd.Run([]string{"-address=" + url, jobs[0].ID}); code != 0 { + t.Fatalf("expected exit 0, got: %d", code) + } + + // Verify Multi-region Deployment info populated + out := ui.OutputWriter.String() + require.Contains(t, out, "Multiregion Deployment") + require.Contains(t, out, "Region") + require.Contains(t, out, "ID") + require.Contains(t, out, "Status") + require.Contains(t, out, "east") + require.Contains(t, out, eastDeploys[0].ID[0:7]) + require.Contains(t, out, "west") + require.Contains(t, out, westDeploys[0].ID[0:7]) + require.Contains(t, out, "running") + + require.NotContains(t, out, "") + +} func waitForSuccess(ui cli.Ui, client *api.Client, length int, t *testing.T, evalId string) int { mon := newMonitor(ui, client, length) monErr := mon.monitor(evalId, false) diff --git a/command/job_stop.go b/command/job_stop.go index 9f05e0d9055..c6d088592a1 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -171,7 +171,7 @@ func (c *JobStopCommand) Run(args []string) int { return 0 } - if detach { + if detach || job.IsMultiregion() { c.Ui.Output(evalID) return 0 } diff --git a/command/util_test.go b/command/util_test.go index a989e532eea..8abd3786041 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -48,3 +48,41 @@ func testJob(jobID string) *api.Job { return job } + +func testMultiRegionJob(jobID, region, datacenter string) *api.Job { + task := api.NewTask("task1", "mock_driver"). + SetConfig("kill_after", "10s"). + SetConfig("run_for", "15s"). + SetConfig("exit_code", 0). + Require(&api.Resources{ + MemoryMB: helper.IntToPtr(256), + CPU: helper.IntToPtr(100), + }). + SetLogConfig(&api.LogConfig{ + MaxFiles: helper.IntToPtr(1), + MaxFileSizeMB: helper.IntToPtr(2), + }) + + group := api.NewTaskGroup("group1", 1). + AddTask(task). + RequireDisk(&api.EphemeralDisk{ + SizeMB: helper.IntToPtr(20), + }) + + job := api.NewServiceJob(jobID, jobID, region, 1).AddDatacenter(datacenter).AddTaskGroup(group) + job.Region = nil + job.Multiregion = &api.Multiregion{ + Regions: []*api.MultiregionRegion{ + { + Name: "east", + Datacenters: []string{"east-1"}, + }, + { + Name: "west", + Datacenters: []string{"west-1"}, + }, + }, + } + + return job +} diff --git a/jobspec/parse_job.go b/jobspec/parse_job.go index 3a683005995..e9143b03631 100644 --- a/jobspec/parse_job.go +++ b/jobspec/parse_job.go @@ -38,6 +38,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { delete(m, "update") delete(m, "vault") delete(m, "spread") + delete(m, "multiregion") // Set the ID and name to the object key result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string)) @@ -80,6 +81,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { "vault", "vault_token", "consul_token", + "multiregion", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, "job:") @@ -141,6 +143,15 @@ func parseJob(result *api.Job, list *ast.ObjectList) error { } } + // If we have a multiregion block, then parse that + if o := listVal.Filter("multiregion"); len(o.Items) > 0 { + var mr api.Multiregion + if err := parseMultiregion(&mr, o); err != nil { + return multierror.Prefix(err, "multiregion ->") + } + result.Multiregion = &mr + } + // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { diff --git a/jobspec/parse_multiregion.go b/jobspec/parse_multiregion.go new file mode 100644 index 00000000000..c0e0ed06882 --- /dev/null +++ b/jobspec/parse_multiregion.go @@ -0,0 +1,163 @@ +package jobspec + +import ( + "fmt" + + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/hcl" + "github.com/hashicorp/hcl/hcl/ast" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/helper" + "github.com/mitchellh/mapstructure" +) + +func parseMultiregion(result *api.Multiregion, list *ast.ObjectList) error { + + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'multiregion' block allowed") + } + if len(list.Items) == 0 { + return nil + } + + // Get our multiregion object and decode it + obj := list.Items[0] + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + // Value should be an object + var listVal *ast.ObjectList + if ot, ok := obj.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return fmt.Errorf("multiregion should be an object") + } + + // Check for invalid keys + valid := []string{ + "strategy", + "region", + } + if err := helper.CheckHCLKeys(obj.Val, valid); err != nil { + return err + } + + // If we have a strategy, then parse that + if o := listVal.Filter("strategy"); len(o.Items) > 0 { + if err := parseMultiregionStrategy(&result.Strategy, o); err != nil { + return multierror.Prefix(err, "strategy ->") + } + } + // If we have regions, then parse those + if o := listVal.Filter("region"); len(o.Items) > 0 { + if err := parseMultiregionRegions(result, o); err != nil { + return multierror.Prefix(err, "regions ->") + } + } else { + return fmt.Errorf("'multiregion' requires one or more 'region' blocks") + } + return nil +} + +func parseMultiregionStrategy(final **api.MultiregionStrategy, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'strategy' block allowed") + } + + // Get our job object + obj := list.Items[0] + + // Check for invalid keys + valid := []string{ + "max_parallel", + "on_failure", + } + if err := helper.CheckHCLKeys(obj.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, obj.Val); err != nil { + return err + } + + var result api.MultiregionStrategy + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &result, + }) + + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + *final = &result + return nil +} + +func parseMultiregionRegions(result *api.Multiregion, list *ast.ObjectList) error { + list = list.Children() + if len(list.Items) == 0 { + return nil + } + + // Go through each object and turn it into an actual result. + collection := make([]*api.MultiregionRegion, 0, len(list.Items)) + seen := make(map[string]struct{}) + for _, item := range list.Items { + n := item.Keys[0].Token.Value().(string) + + // Make sure we haven't already found this + if _, ok := seen[n]; ok { + return fmt.Errorf("region '%s' defined more than once", n) + } + seen[n] = struct{}{} + + // We need this later + var listVal *ast.ObjectList + if ot, ok := item.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return fmt.Errorf("region '%s': should be an object", n) + } + + // Check for invalid keys + valid := []string{ + "count", + "datacenters", + "meta", + } + if err := helper.CheckHCLKeys(listVal, valid); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return err + } + + // Build the region with the basic decode + var r api.MultiregionRegion + r.Name = n + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &r, + }) + if err != nil { + return err + } + if err := dec.Decode(m); err != nil { + return err + } + collection = append(collection, &r) + } + + result.Regions = append(result.Regions, collection...) + return nil +} diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index a676d8f5b19..426ccbd3456 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -1337,6 +1337,35 @@ func TestParse(t *testing.T) { nil, true, }, + + { + "multiregion.hcl", + &api.Job{ + ID: helper.StringToPtr("multiregion_job"), + Name: helper.StringToPtr("multiregion_job"), + Multiregion: &api.Multiregion{ + Strategy: &api.MultiregionStrategy{ + MaxParallel: helper.IntToPtr(1), + OnFailure: helper.StringToPtr("fail_all"), + }, + Regions: []*api.MultiregionRegion{ + { + Name: "west", + Count: helper.IntToPtr(2), + Datacenters: []string{"west-1"}, + Meta: map[string]string{"region_code": "W"}, + }, + { + Name: "east", + Count: helper.IntToPtr(1), + Datacenters: []string{"east-1", "east-2"}, + Meta: map[string]string{"region_code": "E"}, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/multiregion.hcl b/jobspec/test-fixtures/multiregion.hcl new file mode 100644 index 00000000000..8b808dec040 --- /dev/null +++ b/jobspec/test-fixtures/multiregion.hcl @@ -0,0 +1,26 @@ +job "multiregion_job" { + + multiregion { + + strategy { + max_parallel = 1 + on_failure = "fail_all" + } + + region "west" { + count = 2 + datacenters = ["west-1"] + meta { + region_code = "W" + } + } + + region "east" { + count = 1 + datacenters = ["east-1", "east-2"] + meta { + region_code = "E" + } + } + } +} diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index 105290e88d3..1a9d812d567 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -112,7 +112,7 @@ func (d *Deployment) Fail(args *structs.DeploymentFailRequest, reply *structs.De } if !deploy.Active() { - return fmt.Errorf("can't fail terminal deployment") + return structs.ErrDeploymentTerminalNoFail } // Call into the deployment watcher @@ -155,10 +155,10 @@ func (d *Deployment) Pause(args *structs.DeploymentPauseRequest, reply *structs. if !deploy.Active() { if args.Pause { - return fmt.Errorf("can't pause terminal deployment") + return structs.ErrDeploymentTerminalNoPause } - return fmt.Errorf("can't resume terminal deployment") + return structs.ErrDeploymentTerminalNoResume } // Call into the deployment watcher @@ -200,13 +200,139 @@ func (d *Deployment) Promote(args *structs.DeploymentPromoteRequest, reply *stru } if !deploy.Active() { - return fmt.Errorf("can't promote terminal deployment") + return structs.ErrDeploymentTerminalNoPromote } // Call into the deployment watcher return d.srv.deploymentWatcher.PromoteDeployment(args, reply) } +// Run is used to start a pending deployment +func (d *Deployment) Run(args *structs.DeploymentRunRequest, reply *structs.DeploymentUpdateResponse) error { + if done, err := d.srv.forward("Deployment.Run", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "run"}, time.Now()) + + // Validate the arguments + if args.DeploymentID == "" { + return fmt.Errorf("missing deployment ID") + } + + // Lookup the deployment + snap, err := d.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + deploy, err := snap.DeploymentByID(ws, args.DeploymentID) + if err != nil { + return err + } + if deploy == nil { + return fmt.Errorf("deployment not found") + } + + // Check namespace submit-job permissions + if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + + if !deploy.Active() { + return structs.ErrDeploymentTerminalNoRun + } + + // Call into the deployment watcher + return d.srv.deploymentWatcher.RunDeployment(args, reply) +} + +// Unblock is used to unblock a deployment +func (d *Deployment) Unblock(args *structs.DeploymentUnblockRequest, reply *structs.DeploymentUpdateResponse) error { + if done, err := d.srv.forward("Deployment.Unblock", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "unblock"}, time.Now()) + + // Validate the arguments + if args.DeploymentID == "" { + return fmt.Errorf("missing deployment ID") + } + + // Lookup the deployment + snap, err := d.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + deploy, err := snap.DeploymentByID(ws, args.DeploymentID) + if err != nil { + return err + } + if deploy == nil { + return fmt.Errorf("deployment not found") + } + + // Check namespace submit-job permissions + if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + + if !deploy.Active() { + return structs.ErrDeploymentTerminalNoUnblock + } + + // Call into the deployment watcher + return d.srv.deploymentWatcher.UnblockDeployment(args, reply) +} + +// Cancel is used to cancel a deployment +func (d *Deployment) Cancel(args *structs.DeploymentCancelRequest, reply *structs.DeploymentUpdateResponse) error { + if done, err := d.srv.forward("Deployment.Cancel", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "cancel"}, time.Now()) + + // Validate the arguments + if args.DeploymentID == "" { + return fmt.Errorf("missing deployment ID") + } + + // Lookup the deployment + snap, err := d.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + deploy, err := snap.DeploymentByID(ws, args.DeploymentID) + if err != nil { + return err + } + if deploy == nil { + return fmt.Errorf("deployment not found") + } + + // Check namespace submit-job permissions + if aclObj, err := d.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(deploy.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + + if !deploy.Active() { + return structs.ErrDeploymentTerminalNoCancel + } + + // Call into the deployment watcher + return d.srv.deploymentWatcher.CancelDeployment(args, reply) +} + // SetAllocHealth is used to set the health of allocations that are part of the // deployment. func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, reply *structs.DeploymentUpdateResponse) error { @@ -247,7 +373,7 @@ func (d *Deployment) SetAllocHealth(args *structs.DeploymentAllocHealthRequest, } if !deploy.Active() { - return fmt.Errorf("can't set health of allocations for a terminal deployment") + return structs.ErrDeploymentTerminalNoSetHealth } // Call into the deployment watcher diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 4c18757cb4d..5c10343b1e5 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -62,6 +62,14 @@ type deploymentWatcher struct { // deployment deploymentTriggers + // DeploymentRPC holds methods for interacting with peer regions + // in enterprise edition + DeploymentRPC + + // JobRPC holds methods for interacting with peer regions + // in enterprise edition + JobRPC + // state is the state that is watched for state changes. state *state.StateStore @@ -100,7 +108,8 @@ type deploymentWatcher struct { // deployments and trigger the scheduler as needed. func newDeploymentWatcher(parent context.Context, queryLimiter *rate.Limiter, logger log.Logger, state *state.StateStore, d *structs.Deployment, - j *structs.Job, triggers deploymentTriggers) *deploymentWatcher { + j *structs.Job, triggers deploymentTriggers, + deploymentRPC DeploymentRPC, jobRPC JobRPC) *deploymentWatcher { ctx, exitFn := context.WithCancel(parent) w := &deploymentWatcher{ @@ -111,6 +120,8 @@ func newDeploymentWatcher(parent context.Context, queryLimiter *rate.Limiter, j: j, state: state, deploymentTriggers: triggers, + DeploymentRPC: deploymentRPC, + JobRPC: jobRPC, logger: logger.With("deployment_id", d.ID, "job", j.NamespacedID()), ctx: ctx, exitFn: exitFn, @@ -432,6 +443,10 @@ FAIL: w.logger.Debug("deadline hit", "rollback", rback) rollback = rback + err = w.nextRegion(structs.DeploymentStatusFailed) + if err != nil { + w.logger.Error("multiregion deployment error", "error", err) + } break FAIL case <-w.deploymentUpdateCh: // Get the updated deployment and check if we should change the @@ -463,6 +478,11 @@ FAIL: } } + err := w.nextRegion(w.getStatus()) + if err != nil { + break FAIL + } + case updates = <-w.getAllocsCh(allocIndex): if err := updates.err; err != nil { if err == context.Canceled || w.ctx.Err() == context.Canceled { @@ -490,6 +510,10 @@ FAIL: // handle the failure if res.failDeployment { rollback = res.rollback + err := w.nextRegion(structs.DeploymentStatusFailed) + if err != nil { + w.logger.Error("multiregion deployment error", "error", err) + } break FAIL } @@ -815,6 +839,13 @@ func (w *deploymentWatcher) getDeploymentStatusUpdate(status, desc string) *stru } } +// getStatus returns the current status of the deployment +func (w *deploymentWatcher) getStatus() string { + w.l.RLock() + defer w.l.RUnlock() + return w.d.Status +} + type allocUpdates struct { allocs []*structs.AllocListStub index uint64 diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index 7cfbb94abaf..d42128321f0 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -75,6 +75,12 @@ type Watcher struct { // state is the state that is watched for state changes. state *state.StateStore + // server interface for Deployment RPCs + deploymentRPC DeploymentRPC + + // server interface for Job RPCs + jobRPC JobRPC + // watchers is the set of active watchers, one per deployment watchers map[string]*deploymentWatcher @@ -92,11 +98,16 @@ type Watcher struct { // NewDeploymentsWatcher returns a deployments watcher that is used to watch // deployments and trigger the scheduler as needed. func NewDeploymentsWatcher(logger log.Logger, - raft DeploymentRaftEndpoints, stateQueriesPerSecond float64, - updateBatchDuration time.Duration) *Watcher { + raft DeploymentRaftEndpoints, + deploymentRPC DeploymentRPC, jobRPC JobRPC, + stateQueriesPerSecond float64, + updateBatchDuration time.Duration, +) *Watcher { return &Watcher{ raft: raft, + deploymentRPC: deploymentRPC, + jobRPC: jobRPC, queryLimiter: rate.NewLimiter(rate.Limit(stateQueriesPerSecond), 100), updateBatchDuration: updateBatchDuration, logger: logger.Named("deployments_watcher"), @@ -257,7 +268,8 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) { return nil, fmt.Errorf("deployment %q references unknown job %q", d.ID, d.JobID) } - watcher := newDeploymentWatcher(w.ctx, w.queryLimiter, w.logger, w.state, d, job, w) + watcher := newDeploymentWatcher(w.ctx, w.queryLimiter, w.logger, w.state, d, job, + w, w.deploymentRPC, w.jobRPC) w.watchers[d.ID] = watcher return watcher, nil } @@ -363,6 +375,40 @@ func (w *Watcher) FailDeployment(req *structs.DeploymentFailRequest, resp *struc return watcher.FailDeployment(req, resp) } +// RunDeployment is used to run a pending multiregion deployment. In +// single-region deployments, the pending state is unused. +func (w *Watcher) RunDeployment(req *structs.DeploymentRunRequest, resp *structs.DeploymentUpdateResponse) error { + watcher, err := w.getOrCreateWatcher(req.DeploymentID) + if err != nil { + return err + } + + return watcher.RunDeployment(req, resp) +} + +// UnblockDeployment is used to unblock a multiregion deployment. In +// single-region deployments, the blocked state is unused. +func (w *Watcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error { + watcher, err := w.getOrCreateWatcher(req.DeploymentID) + if err != nil { + return err + } + + return watcher.UnblockDeployment(req, resp) +} + +// CancelDeployment is used to cancel a multiregion deployment. In +// single-region deployments, the deploymentwatcher has sole responsibility to +// cancel deployments so this RPC is never used. +func (w *Watcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error { + watcher, err := w.getOrCreateWatcher(req.DeploymentID) + if err != nil { + return err + } + + return watcher.CancelDeployment(req, resp) +} + // createUpdate commits the given allocation desired transition and evaluation // to Raft but batches the commit with other calls. func (w *Watcher) createUpdate(allocs map[string]*structs.DesiredTransition, eval *structs.Evaluation) (uint64, error) { diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index 78c7aeb0aab..8373a900697 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -19,7 +19,7 @@ import ( func testDeploymentWatcher(t *testing.T, qps float64, batchDur time.Duration) (*Watcher, *mockBackend) { m := newMockBackend(t) - w := NewDeploymentsWatcher(testlog.HCLogger(t), m, qps, batchDur) + w := NewDeploymentsWatcher(testlog.HCLogger(t), m, nil, nil, qps, batchDur) return w, m } diff --git a/nomad/deploymentwatcher/multiregion_oss.go b/nomad/deploymentwatcher/multiregion_oss.go new file mode 100644 index 00000000000..427fa904f8c --- /dev/null +++ b/nomad/deploymentwatcher/multiregion_oss.go @@ -0,0 +1,33 @@ +// +build !ent + +package deploymentwatcher + +import "github.com/hashicorp/nomad/nomad/structs" + +// DeploymentRPC and JobRPC hold methods for interacting with peer regions +// in enterprise edition. +type DeploymentRPC interface{} +type JobRPC interface{} + +func (w *deploymentWatcher) nextRegion(status string) error { + return nil +} + +// RunDeployment is used to run a pending multiregion deployment. In +// single-region deployments, the pending state is unused. +func (w *deploymentWatcher) RunDeployment(req *structs.DeploymentRunRequest, resp *structs.DeploymentUpdateResponse) error { + return nil +} + +// UnblockDeployment is used to unblock a multiregion deployment. In +// single-region deployments, the blocked state is unused. +func (w *deploymentWatcher) UnblockDeployment(req *structs.DeploymentUnblockRequest, resp *structs.DeploymentUpdateResponse) error { + return nil +} + +// CancelDeployment is used to cancel a multiregion deployment. In +// single-region deployments, the deploymentwatcher has sole responsibility to +// cancel deployments so this RPC is never used. +func (w *deploymentWatcher) CancelDeployment(req *structs.DeploymentCancelRequest, resp *structs.DeploymentUpdateResponse) error { + return nil +} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 1d68217aa0c..52b3ff790b8 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -291,6 +291,13 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + // Submit a multiregion job to other regions (enterprise only). + // The job will have its region interpolated. + err = j.multiregionRegister(args, reply) + if err != nil { + return err + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { // Set the submit time diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index fd4aaa8e75e..214b934122a 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -8,3 +8,8 @@ import "github.com/hashicorp/nomad/nomad/structs" func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) { return nil, nil } + +// multiregionRegister is used to send a job across multiple regions +func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error { + return nil +} diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 23738503cfd..c63530a49c9 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1287,6 +1287,31 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { return job, policy } +func MultiregionJob() *structs.Job { + job := Job() + job.Multiregion = &structs.Multiregion{ + Strategy: &structs.MultiregionStrategy{ + MaxParallel: 1, + OnFailure: "fail_all", + }, + Regions: []*structs.MultiregionRegion{ + { + Name: "west", + Count: 2, + Datacenters: []string{"west-1", "west-2"}, + Meta: map[string]string{"region_code": "W"}, + }, + { + Name: "east", + Count: 1, + Datacenters: []string{"east-1"}, + Meta: map[string]string{"region_code": "E"}, + }, + }, + } + return job +} + func CSIPlugin() *structs.CSIPlugin { return &structs.CSIPlugin{ ID: uuid.Generate(), diff --git a/nomad/server.go b/nomad/server.go index 5bb25e6b33e..36d5ee72be5 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1005,9 +1005,13 @@ func (s *Server) setupDeploymentWatcher() error { // Create the deployment watcher s.deploymentWatcher = deploymentwatcher.NewDeploymentsWatcher( - s.logger, raftShim, + s.logger, + raftShim, + s.staticEndpoints.Deployment, + s.staticEndpoints.Job, deploymentwatcher.LimitStateQueriesPerSecond, - deploymentwatcher.CrossDeploymentUpdateBatchDuration) + deploymentwatcher.CrossDeploymentUpdateBatchDuration, + ) return nil } diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index ebd927397ac..2ed549a94ae 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -132,6 +132,11 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) { diff.Objects = append(diff.Objects, cDiff) } + // Multiregion diff + if mrDiff := multiregionDiff(j.Multiregion, other.Multiregion, contextual); mrDiff != nil { + diff.Objects = append(diff.Objects, mrDiff) + } + // Check to see if there is a diff. We don't use reflect because we are // filtering quite a few fields that will change on each diff. if diff.Type == DiffTypeNone { @@ -1009,6 +1014,124 @@ func parameterizedJobDiff(old, new *ParameterizedJobConfig, contextual bool) *Ob return diff } +func multiregionDiff(old, new *Multiregion, contextual bool) *ObjectDiff { + + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Multiregion"} + + if reflect.DeepEqual(old, new) { + return nil + } else if old == nil { + old = &Multiregion{} + old.Canonicalize() + diff.Type = DiffTypeAdded + } else if new == nil { + new = &Multiregion{} + diff.Type = DiffTypeDeleted + } else { + diff.Type = DiffTypeEdited + } + + // strategy diff + stratDiff := primitiveObjectDiff( + old.Strategy, + new.Strategy, + []string{}, + "Strategy", + contextual) + if stratDiff != nil { + diff.Objects = append(diff.Objects, stratDiff) + } + + oldMap := make(map[string]*MultiregionRegion, len(old.Regions)) + newMap := make(map[string]*MultiregionRegion, len(new.Regions)) + for _, o := range old.Regions { + oldMap[o.Name] = o + } + for _, n := range new.Regions { + newMap[n.Name] = n + } + + for name, oldRegion := range oldMap { + // Diff the same, deleted and edited + newRegion := newMap[name] + rdiff := multiregionRegionDiff(newRegion, oldRegion, contextual) + if rdiff != nil { + diff.Objects = append(diff.Objects, rdiff) + } + } + + for name, newRegion := range newMap { + // Diff the added + if oldRegion, ok := oldMap[name]; !ok { + rdiff := multiregionRegionDiff(oldRegion, newRegion, contextual) + if rdiff != nil { + diff.Objects = append(diff.Objects, rdiff) + } + } + } + sort.Sort(FieldDiffs(diff.Fields)) + sort.Sort(ObjectDiffs(diff.Objects)) + return diff +} + +func multiregionRegionDiff(r, other *MultiregionRegion, contextual bool) *ObjectDiff { + diff := &ObjectDiff{Type: DiffTypeNone, Name: "Region"} + var oldPrimitiveFlat, newPrimitiveFlat map[string]string + + if reflect.DeepEqual(r, other) { + return nil + } else if r == nil { + r = &MultiregionRegion{} + diff.Type = DiffTypeAdded + newPrimitiveFlat = flatmap.Flatten(other, nil, true) + } else if other == nil { + other = &MultiregionRegion{} + diff.Type = DiffTypeDeleted + oldPrimitiveFlat = flatmap.Flatten(r, nil, true) + } else { + diff.Type = DiffTypeEdited + oldPrimitiveFlat = flatmap.Flatten(r, nil, true) + newPrimitiveFlat = flatmap.Flatten(other, nil, true) + } + + // Diff the primitive fields. + diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, contextual) + + // Datacenters diff + setDiff := stringSetDiff(r.Datacenters, other.Datacenters, "Datacenters", contextual) + if setDiff != nil && setDiff.Type != DiffTypeNone { + diff.Objects = append(diff.Objects, setDiff) + } + + sort.Sort(ObjectDiffs(diff.Objects)) + sort.Sort(FieldDiffs(diff.Fields)) + + var added, deleted, edited bool + for _, f := range diff.Fields { + switch f.Type { + case DiffTypeEdited: + edited = true + break + case DiffTypeDeleted: + deleted = true + case DiffTypeAdded: + added = true + } + } + + if edited || added && deleted { + diff.Type = DiffTypeEdited + } else if added { + diff.Type = DiffTypeAdded + } else if deleted { + diff.Type = DiffTypeDeleted + } else { + return nil + } + + return diff +} + // Diff returns a diff of two resource objects. If contextual diff is enabled, // non-changed fields will still be returned. func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index ed6ea07970c..510cc92afef 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1181,6 +1181,118 @@ func TestJobDiff(t *testing.T) { }, }, }, + + { + // Multiregion: region added + Old: &Job{ + Multiregion: &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: 1, + OnFailure: "fail_all", + }, + Regions: []*MultiregionRegion{ + { + Name: "west", + Count: 1, + Datacenters: []string{"west-1"}, + Meta: map[string]string{"region_code": "W"}, + }, + }, + }, + }, + + New: &Job{ + Multiregion: &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: 2, + OnFailure: "fail_all", + }, + Regions: []*MultiregionRegion{ + { + Name: "west", + Count: 1, + Datacenters: []string{"west-1"}, + Meta: map[string]string{"region_code": "W"}, + }, + { + Name: "east", + Count: 2, + Datacenters: []string{"east-1", "east-2"}, + Meta: map[string]string{"region_code": "E"}, + }, + }, + }, + }, + + Expected: &JobDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Multiregion", + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "Region", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Count", + Old: "", + New: "2", + }, + { + Type: DiffTypeAdded, + Name: "Meta[region_code]", + Old: "", + New: "E", + }, + { + Type: DiffTypeAdded, + Name: "Name", + Old: "", + New: "east", + }, + }, + + Objects: []*ObjectDiff{ + { + Type: DiffTypeAdded, + Name: "Datacenters", + Fields: []*FieldDiff{ + { + Type: DiffTypeAdded, + Name: "Datacenters", + Old: "", + New: "east-1", + }, + { + Type: DiffTypeAdded, + Name: "Datacenters", + Old: "", + New: "east-2", + }, + }, + }, + }, + }, + { + Type: DiffTypeEdited, + Name: "Strategy", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "MaxParallel", + Old: "1", + New: "2", + }, + }, + }, + }, + }, + }, + }, + }, } for i, c := range cases { diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index 2e3e1edd256..7dbfbad8dbe 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -28,6 +28,16 @@ const ( ErrUnknownDeploymentPrefix = "Unknown deployment" errRPCCodedErrorPrefix = "RPC Error:: " + + errDeploymentTerminalNoCancel = "can't cancel terminal deployment" + errDeploymentTerminalNoFail = "can't fail terminal deployment" + errDeploymentTerminalNoPause = "can't pause terminal deployment" + errDeploymentTerminalNoPromote = "can't promote terminal deployment" + errDeploymentTerminalNoResume = "can't resume terminal deployment" + errDeploymentTerminalNoUnblock = "can't unblock terminal deployment" + errDeploymentTerminalNoRun = "can't run terminal deployment" + errDeploymentTerminalNoSetHealth = "can't set health of allocations for a terminal deployment" + errDeploymentRunningNoUnblock = "can't unblock running deployment" ) var ( @@ -41,6 +51,16 @@ var ( ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ErrMissingAllocID = errors.New(errMissingAllocID) + + ErrDeploymentTerminalNoCancel = errors.New(errDeploymentTerminalNoCancel) + ErrDeploymentTerminalNoFail = errors.New(errDeploymentTerminalNoFail) + ErrDeploymentTerminalNoPause = errors.New(errDeploymentTerminalNoPause) + ErrDeploymentTerminalNoPromote = errors.New(errDeploymentTerminalNoPromote) + ErrDeploymentTerminalNoResume = errors.New(errDeploymentTerminalNoResume) + ErrDeploymentTerminalNoUnblock = errors.New(errDeploymentTerminalNoUnblock) + ErrDeploymentTerminalNoRun = errors.New(errDeploymentTerminalNoRun) + ErrDeploymentTerminalNoSetHealth = errors.New(errDeploymentTerminalNoSetHealth) + ErrDeploymentRunningNoUnblock = errors.New(errDeploymentRunningNoUnblock) ) // IsErrNoLeader returns whether the error is due to there being no leader. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4949f0fb69e..875ebb82654 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1064,6 +1064,30 @@ type DeploymentPauseRequest struct { WriteRequest } +// DeploymentRunRequest is used to remotely start a pending deployment. +// Used only for multiregion deployments. +type DeploymentRunRequest struct { + DeploymentID string + + WriteRequest +} + +// DeploymentUnblockRequest is used to remotely unblock a deployment. +// Used only for multiregion deployments. +type DeploymentUnblockRequest struct { + DeploymentID string + + WriteRequest +} + +// DeploymentCancelRequest is used to remotely cancel a deployment. +// Used only for multiregion deployments. +type DeploymentCancelRequest struct { + DeploymentID string + + WriteRequest +} + // DeploymentSpecificRequest is used to make a request specific to a particular // deployment type DeploymentSpecificRequest struct { @@ -3595,6 +3619,8 @@ type Job struct { // Update provides defaults for the TaskGroup Update stanzas Update UpdateStrategy + Multiregion *Multiregion + // Periodic is used to define the interval the job is run at. Periodic *PeriodicConfig @@ -3688,6 +3714,10 @@ func (j *Job) Canonicalize() (warnings error) { j.ParameterizedJob.Canonicalize() } + if j.Multiregion != nil { + j.Multiregion.Canonicalize() + } + if j.Periodic != nil { j.Periodic.Canonicalize() } @@ -3706,6 +3736,7 @@ func (j *Job) Copy() *Job { nj.Datacenters = helper.CopySliceString(nj.Datacenters) nj.Constraints = CopySliceConstraints(nj.Constraints) nj.Affinities = CopySliceAffinities(nj.Affinities) + nj.Multiregion = nj.Multiregion.Copy() if j.TaskGroups != nil { tgs := make([]*TaskGroup, len(nj.TaskGroups)) @@ -3725,7 +3756,7 @@ func (j *Job) Copy() *Job { func (j *Job) Validate() error { var mErr multierror.Error - if j.Region == "" { + if j.Region == "" && j.Multiregion == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing job region")) } if j.ID == "" { @@ -3749,7 +3780,7 @@ func (j *Job) Validate() error { if j.Priority < JobMinPriority || j.Priority > JobMaxPriority { mErr.Errors = append(mErr.Errors, fmt.Errorf("Job priority must be between [%d, %d]", JobMinPriority, JobMaxPriority)) } - if len(j.Datacenters) == 0 { + if len(j.Datacenters) == 0 && !j.IsMultiregion() { mErr.Errors = append(mErr.Errors, errors.New("Missing job datacenters")) } else { for _, v := range j.Datacenters { @@ -3850,6 +3881,12 @@ func (j *Job) Validate() error { } } + if j.IsMultiregion() { + if err := j.Multiregion.Validate(j.Type); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -3949,6 +3986,7 @@ func (j *Job) Stub(summary *JobSummary) *JobListStub { ParentID: j.ParentID, Name: j.Name, Datacenters: j.Datacenters, + Multiregion: j.Multiregion, Type: j.Type, Priority: j.Priority, Periodic: j.IsPeriodic(), @@ -3980,6 +4018,11 @@ func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } +// IsMultiregion returns whether a job is multiregion +func (j *Job) IsMultiregion() bool { + return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 +} + // VaultPolicies returns the set of Vault policies per task group, per task func (j *Job) VaultPolicies() map[string]map[string]*Vault { policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) @@ -4116,6 +4159,7 @@ type JobListStub struct { Name string Namespace string `json:",omitempty"` Datacenters []string + Multiregion *Multiregion Type string Priority int Periodic bool @@ -4332,6 +4376,101 @@ func (u *UpdateStrategy) Rolling() bool { return u.Stagger > 0 && u.MaxParallel > 0 } +type Multiregion struct { + Strategy *MultiregionStrategy + Regions []*MultiregionRegion +} + +func (m *Multiregion) Canonicalize() { + if m.Strategy == nil { + m.Strategy = &MultiregionStrategy{} + } + if m.Regions == nil { + m.Regions = []*MultiregionRegion{} + } +} + +// Diff indicates whether the multiregion config has changed +func (m *Multiregion) Diff(m2 *Multiregion) bool { + return !reflect.DeepEqual(m, m2) +} + +func (m *Multiregion) Copy() *Multiregion { + if m == nil { + return nil + } + copy := new(Multiregion) + if m.Strategy != nil { + copy.Strategy = &MultiregionStrategy{ + MaxParallel: m.Strategy.MaxParallel, + OnFailure: m.Strategy.OnFailure, + } + } + for _, region := range m.Regions { + copyRegion := &MultiregionRegion{ + Name: region.Name, + Count: region.Count, + Datacenters: []string{}, + Meta: map[string]string{}, + } + for _, dc := range region.Datacenters { + copyRegion.Datacenters = append(copyRegion.Datacenters, dc) + } + for k, v := range region.Meta { + copyRegion.Meta[k] = v + } + copy.Regions = append(copy.Regions, copyRegion) + } + return copy +} + +func (m *Multiregion) Validate(jobType string) error { + var mErr multierror.Error + seen := map[string]struct{}{} + for _, region := range m.Regions { + if _, ok := seen[region.Name]; ok { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Multiregion region %q can't be listed twice", + region.Name)) + } + seen[region.Name] = struct{}{} + if len(region.Datacenters) == 0 { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Multiregion region %q must have at least 1 datacenter", + region.Name), + ) + } + } + if m.Strategy != nil { + switch jobType { + case JobTypeBatch: + if m.Strategy.OnFailure != "" || m.Strategy.MaxParallel != 0 { + mErr.Errors = append(mErr.Errors, + errors.New("Multiregion batch jobs can't have an update strategy")) + } + case JobTypeSystem: + if m.Strategy.OnFailure != "" { + mErr.Errors = append(mErr.Errors, + errors.New("Multiregion system jobs can't have an on_failure setting")) + } + default: // service + } + } + return mErr.ErrorOrNil() +} + +type MultiregionStrategy struct { + MaxParallel int + OnFailure string +} + +type MultiregionRegion struct { + Name string + Count int + Datacenters []string + Meta map[string]string +} + const ( // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron" @@ -7751,6 +7890,9 @@ const ( DeploymentStatusFailed = "failed" DeploymentStatusSuccessful = "successful" DeploymentStatusCancelled = "cancelled" + DeploymentStatusPending = "pending" + DeploymentStatusBlocked = "blocked" + DeploymentStatusUnblocking = "unblocking" // TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag @@ -7766,6 +7908,12 @@ const ( DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations" DeploymentStatusDescriptionProgressDeadline = "Failed due to progress deadline" DeploymentStatusDescriptionFailedByUser = "Deployment marked as failed" + + // used only in multiregion deployments + DeploymentStatusDescriptionFailedByPeer = "Failed because of an error in peer region" + DeploymentStatusDescriptionBlocked = "Deployment is complete but waiting for peer region" + DeploymentStatusDescriptionUnblocking = "Deployment is unblocking remaining regions" + DeploymentStatusDescriptionPendingForPeer = "Deployment is pending, waiting for peer region" ) // DeploymentStatusDescriptionRollback is used to get the status description of @@ -7814,6 +7962,9 @@ type Deployment struct { // present the correct list of deployments for the job and not old ones. JobCreateIndex uint64 + // Multiregion specifies if deployment is part of multiregion deployment + IsMultiregion bool + // TaskGroups is the set of task groups effected by the deployment and their // current deployment status. TaskGroups map[string]*DeploymentState @@ -7839,6 +7990,7 @@ func NewDeployment(job *Job) *Deployment { JobModifyIndex: job.ModifyIndex, JobSpecModifyIndex: job.JobModifyIndex, JobCreateIndex: job.CreateIndex, + IsMultiregion: job.IsMultiregion(), Status: DeploymentStatusRunning, StatusDescription: DeploymentStatusDescriptionRunning, TaskGroups: make(map[string]*DeploymentState, len(job.TaskGroups)), @@ -7867,7 +8019,7 @@ func (d *Deployment) Copy() *Deployment { // Active returns whether the deployment is active or terminal. func (d *Deployment) Active() bool { switch d.Status { - case DeploymentStatusRunning, DeploymentStatusPaused: + case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending: return true default: return false diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 98cb49c1a4d..6a8664d7512 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5239,3 +5239,128 @@ func TestNodeReservedNetworkResources_ParseReserved(t *testing.T) { require.Equal(out, tc.Parsed) } } + +func TestMultiregion_CopyCanonicalize(t *testing.T) { + require := require.New(t) + + emptyOld := &Multiregion{} + expected := &Multiregion{ + Strategy: &MultiregionStrategy{}, + Regions: []*MultiregionRegion{}, + } + + old := emptyOld.Copy() + old.Canonicalize() + require.Equal(old, expected) + require.False(old.Diff(expected)) + + nonEmptyOld := &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: 2, + OnFailure: "fail_all", + }, + Regions: []*MultiregionRegion{ + { + Name: "west", + Count: 2, + Datacenters: []string{"west-1", "west-2"}, + Meta: map[string]string{}, + }, + { + Name: "east", + Count: 1, + Datacenters: []string{"east-1"}, + Meta: map[string]string{}, + }, + }, + } + + old = nonEmptyOld.Copy() + old.Canonicalize() + require.Equal(old, nonEmptyOld) + require.False(old.Diff(nonEmptyOld)) +} + +func TestMultiregion_Validate(t *testing.T) { + require := require.New(t) + cases := []struct { + Name string + JobType string + Case *Multiregion + Errors []string + }{ + { + Name: "empty valid multiregion spec", + JobType: JobTypeService, + Case: &Multiregion{}, + Errors: []string{}, + }, + + { + Name: "non-empty valid multiregion spec", + JobType: JobTypeService, + Case: &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: 2, + OnFailure: "fail_all", + }, + Regions: []*MultiregionRegion{ + { + + Count: 2, + Datacenters: []string{"west-1", "west-2"}, + Meta: map[string]string{}, + }, + { + Name: "east", + Count: 1, + Datacenters: []string{"east-1"}, + Meta: map[string]string{}, + }, + }, + }, + Errors: []string{}, + }, + + { + Name: "repeated region, wrong strategy, missing DCs", + JobType: JobTypeBatch, + Case: &Multiregion{ + Strategy: &MultiregionStrategy{ + MaxParallel: 2, + }, + Regions: []*MultiregionRegion{ + { + Name: "west", + Datacenters: []string{"west-1", "west-2"}, + }, + + { + Name: "west", + }, + }, + }, + Errors: []string{ + "Multiregion region \"west\" can't be listed twice", + "Multiregion region \"west\" must have at least 1 datacenter", + "Multiregion batch jobs can't have an update strategy", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + err := tc.Case.Validate(tc.JobType) + if len(tc.Errors) == 0 { + require.NoError(err) + } else { + mErr := err.(*multierror.Error) + for i, expectedErr := range tc.Errors { + if !strings.Contains(mErr.Errors[i].Error(), expectedErr) { + t.Fatalf("err: %s, expected: %s", err, expectedErr) + } + } + } + }) + } +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index de40b164731..dd5dc085027 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -198,8 +198,17 @@ func (a *allocReconciler) Compute() *reconcileResults { // Detect if the deployment is paused if a.deployment != nil { a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused + //|| a.deployment.Status == structs.DeploymentStatusPending a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed } + if a.deployment == nil { + // When we create the deployment later, it will be in a paused + // state. But we also need to tell Compute we're paused, otherwise we + // make placements on the paused deployment. + if a.job.IsMultiregion() && a.job.Region != a.job.Multiregion.Regions[0].Name { + a.deploymentPaused = true + } + } // Reconcile each group complete := true @@ -210,10 +219,22 @@ func (a *allocReconciler) Compute() *reconcileResults { // Mark the deployment as complete if possible if a.deployment != nil && complete { + + var status string + var desc string + + if a.job.IsMultiregion() { + status = structs.DeploymentStatusBlocked + desc = structs.DeploymentStatusDescriptionBlocked + } else { + status = structs.DeploymentStatusSuccessful + desc = structs.DeploymentStatusDescriptionSuccessful + } + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusSuccessful, - StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + Status: status, + StatusDescription: desc, }) } @@ -534,6 +555,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // A previous group may have made the deployment already if a.deployment == nil { a.deployment = structs.NewDeployment(a.job) + // only the first region of a multiregion job starts in the + // running state + if a.job.IsMultiregion() && a.job.Region != a.job.Multiregion.Regions[0].Name { + a.deployment.Status = structs.DeploymentStatusPending + a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer + } a.result.deployment = a.deployment } diff --git a/vendor/github.com/hashicorp/nomad/api/deployments.go b/vendor/github.com/hashicorp/nomad/api/deployments.go index 8a241243c36..4a4844246fa 100644 --- a/vendor/github.com/hashicorp/nomad/api/deployments.go +++ b/vendor/github.com/hashicorp/nomad/api/deployments.go @@ -107,6 +107,19 @@ func (d *Deployments) PromoteGroups(deploymentID string, groups []string, q *Wri return &resp, wm, nil } +// Unblock is used to unblock the given deployment. +func (d *Deployments) Unblock(deploymentID string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) { + var resp DeploymentUpdateResponse + req := &DeploymentUnblockRequest{ + DeploymentID: deploymentID, + } + wm, err := d.client.write("/v1/deployment/unblock/"+deploymentID, req, &resp, q) + if err != nil { + return nil, nil, err + } + return &resp, wm, nil +} + // SetAllocHealth is used to set allocation health for allocs that are part of // the given deployment func (d *Deployments) SetAllocHealth(deploymentID string, healthy, unhealthy []string, q *WriteOptions) (*DeploymentUpdateResponse, *WriteMeta, error) { @@ -150,6 +163,9 @@ type Deployment struct { // present the correct list of deployments for the job and not old ones. JobCreateIndex uint64 + // IsMultiregion specifies if this deployment is part of a multi-region deployment + IsMultiregion bool + // TaskGroups is the set of task groups effected by the deployment and their // current deployment status. TaskGroups map[string]*DeploymentState @@ -257,6 +273,12 @@ type DeploymentFailRequest struct { WriteRequest } +// DeploymentUnblockRequest is used to unblock a particular deployment +type DeploymentUnblockRequest struct { + DeploymentID string + WriteRequest +} + // SingleDeploymentResponse is used to respond with a single deployment type SingleDeploymentResponse struct { Deployment *Deployment diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index b8f56acbeff..7c1d66714f8 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -622,6 +622,78 @@ func (u *UpdateStrategy) Empty() bool { return true } +type Multiregion struct { + Strategy *MultiregionStrategy + Regions []*MultiregionRegion +} + +func (m *Multiregion) Canonicalize() { + if m.Strategy == nil { + m.Strategy = &MultiregionStrategy{ + MaxParallel: intToPtr(0), + OnFailure: stringToPtr(""), + } + } else { + if m.Strategy.MaxParallel == nil { + m.Strategy.MaxParallel = intToPtr(0) + } + if m.Strategy.OnFailure == nil { + m.Strategy.OnFailure = stringToPtr("") + } + } + if m.Regions == nil { + m.Regions = []*MultiregionRegion{} + } + for _, region := range m.Regions { + if region.Count == nil { + region.Count = intToPtr(1) + } + if region.Datacenters == nil { + region.Datacenters = []string{} + } + if region.Meta == nil { + region.Meta = map[string]string{} + } + } +} + +func (m *Multiregion) Copy() *Multiregion { + if m == nil { + return nil + } + copy := new(Multiregion) + if m.Strategy != nil { + copy.Strategy = new(MultiregionStrategy) + copy.Strategy.MaxParallel = intToPtr(*m.Strategy.MaxParallel) + copy.Strategy.OnFailure = stringToPtr(*m.Strategy.OnFailure) + } + for _, region := range m.Regions { + copyRegion := new(MultiregionRegion) + copyRegion.Name = region.Name + copyRegion.Count = intToPtr(*region.Count) + for _, dc := range region.Datacenters { + copyRegion.Datacenters = append(copyRegion.Datacenters, dc) + } + for k, v := range region.Meta { + copyRegion.Meta[k] = v + } + copy.Regions = append(copy.Regions, copyRegion) + } + return copy +} + +type MultiregionStrategy struct { + MaxParallel *int `mapstructure:"max_parallel"` + OnFailure *string `mapstructure:"on_failure"` +} + +type MultiregionRegion struct { + Name string + Count *int + Datacenters []string + Meta map[string]string +} + // PeriodicConfig is for serializing periodic config for a job. type PeriodicConfig struct { Enabled *bool @@ -711,6 +783,7 @@ type Job struct { Affinities []*Affinity TaskGroups []*TaskGroup Update *UpdateStrategy + Multiregion *Multiregion Spreads []*Spread Periodic *PeriodicConfig ParameterizedJob *ParameterizedJobConfig @@ -741,6 +814,11 @@ func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } +// IsMultiregion returns whether a job is a multiregion job +func (j *Job) IsMultiregion() bool { + return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 +} + func (j *Job) Canonicalize() { if j.ID == nil { j.ID = stringToPtr("") @@ -807,6 +885,9 @@ func (j *Job) Canonicalize() { } else if *j.Type == JobTypeService { j.Update = DefaultUpdateStrategy() } + if j.Multiregion != nil { + j.Multiregion.Canonicalize() + } for _, tg := range j.TaskGroups { tg.Canonicalize(j)