Skip to content

Commit

Permalink
MRD: all regions should start pending (#8433)
Browse files Browse the repository at this point in the history
Deployments should wait until kicked off by `Job.Register` so that we can
assert that all regions have a scheduled deployment before starting any
region. This changeset includes the OSS fixes to support the ENT work.

`IsMultiregionStarter` has no more callers in OSS, so remove it here.
  • Loading branch information
tgross authored Jul 14, 2020
1 parent 5cfd314 commit 3703489
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 92 deletions.
5 changes: 4 additions & 1 deletion command/deployment_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ func TestDeploymentStatusCommand_Multiregion(t *testing.T) {
require.Contains(t, out, eastDeploys[0].ID[0:7])
require.Contains(t, out, "west")
require.Contains(t, out, westDeploys[0].ID[0:7])
require.Contains(t, out, "running")

// this will always be pending because we're not really doing a multiregion
// register here in OSS
require.Contains(t, out, "pending")

require.NotContains(t, out, "<none>")

Expand Down
5 changes: 4 additions & 1 deletion command/job_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,10 @@ func TestJobStatusCommand_Multiregion(t *testing.T) {
require.Contains(t, out, eastDeploys[0].ID[0:7])
require.Contains(t, out, "west")
require.Contains(t, out, westDeploys[0].ID[0:7])
require.Contains(t, out, "running")

// this will always be pending because we're not really doing a multiregion
// register here in OSS
require.Contains(t, out, "pending")

require.NotContains(t, out, "<none>")

Expand Down
14 changes: 13 additions & 1 deletion nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
if existingJob != nil {
existingVersion = existingJob.Version
}
err = j.multiregionRegister(args, reply, existingVersion)
isRunner, err := j.multiregionRegister(args, reply, existingVersion)
if err != nil {
return err
}
Expand All @@ -334,6 +334,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.JobModifyIndex = existingJob.JobModifyIndex
}

// used for multiregion start
args.Job.JobModifyIndex = reply.JobModifyIndex

// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
return nil
Expand Down Expand Up @@ -371,6 +374,15 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex

// Kick off a multiregion deployment (enterprise only).
if isRunner {
err = j.multiregionStart(args, reply)
if err != nil {
return err
}
}

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions nomad/job_endpoint_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) {
}

// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse,
existingVersion uint64) error {
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, existingVersion uint64) (bool, error) {
return false, nil
}

// multiregionStart is used to kick-off a deployment across multiple regions
func (j *Job) multiregionStart(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error {
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions nomad/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,9 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {

func MultiregionJob() *structs.Job {
job := Job()
update := *structs.DefaultUpdateStrategy
job.Update = update
job.TaskGroups[0].Update = &update
job.Multiregion = &structs.Multiregion{
Strategy: &structs.MultiregionStrategy{
MaxParallel: 1,
Expand Down
24 changes: 0 additions & 24 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4175,30 +4175,6 @@ func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
}

// IsMultiregionStarter returns whether a regional job should begin
// in the running state
func (j *Job) IsMultiregionStarter() bool {
if !j.IsMultiregion() {
return true
}
if j.Type == "system" || j.Type == "batch" {
return true
}
if j.Multiregion.Strategy == nil || j.Multiregion.Strategy.MaxParallel == 0 {
return true
}
for i, region := range j.Multiregion.Regions {
if j.Region == region.Name {
if i < j.Multiregion.Strategy.MaxParallel {
return true
} else {
break
}
}
}
return false
}

// VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string]*Vault {
policies := make(map[string]map[string]*Vault, len(j.TaskGroups))
Expand Down
58 changes: 0 additions & 58 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5438,64 +5438,6 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) {
require.False(old.Diff(nonEmptyOld))
}

func TestMultiregion_Starter(t *testing.T) {
require := require.New(t)

j := &Job{}
j.Type = "service"
j.Region = "north"
require.True(j.IsMultiregionStarter())

tc := &Multiregion{
Strategy: &MultiregionStrategy{},
Regions: []*MultiregionRegion{
{Name: "north"},
{Name: "south"},
{Name: "east"},
{Name: "west"},
},
}

b := &Job{}
b.Type = "batch"
b.Multiregion = tc
b.Region = "west"
require.True(j.IsMultiregionStarter())

j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.True(j.IsMultiregionStarter())
j.Region = "east"
require.True(j.IsMultiregionStarter())
j.Region = "west"
require.True(j.IsMultiregionStarter())

tc.Strategy = &MultiregionStrategy{MaxParallel: 1}
j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.False(j.IsMultiregionStarter())
j.Region = "east"
require.False(j.IsMultiregionStarter())
j.Region = "west"
require.False(j.IsMultiregionStarter())

tc.Strategy = &MultiregionStrategy{MaxParallel: 2}
j.Multiregion = tc
j.Region = "north"
require.True(j.IsMultiregionStarter())
j.Region = "south"
require.True(j.IsMultiregionStarter())
j.Region = "east"
require.False(j.IsMultiregionStarter())
j.Region = "west"
require.False(j.IsMultiregionStarter())

}

func TestNodeResources_Merge(t *testing.T) {
res := &NodeResources{
Cpu: NodeCpuResources{
Expand Down
9 changes: 4 additions & 5 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func (a *allocReconciler) Compute() *reconcileResults {
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
// When we create the deployment later, it will be in a paused
// When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment.
if !a.job.IsMultiregionStarter() {
if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}
Expand Down Expand Up @@ -555,9 +555,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job)
// in a multiregion job, if max_parallel is set, only the first
// region starts in the running state
if !a.job.IsMultiregionStarter() {
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
Expand Down

0 comments on commit 3703489

Please sign in to comment.