diff --git a/api/tasks.go b/api/tasks.go index ed268423b99..9c43df47625 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -382,6 +382,7 @@ type VolumeRequest struct { Source string `hcl:"source,optional"` ReadOnly bool `hcl:"read_only,optional"` MountOptions *CSIMountOptions `hcl:"mount_options,block"` + PerAlloc bool `hcl:"per_alloc,optional"` ExtraKeysHCL []string `hcl1:",unusedKeys,optional" json:"-"` } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index ec21072f91c..8e28be0fd07 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -944,6 +944,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta Type: v.Type, ReadOnly: v.ReadOnly, Source: v.Source, + PerAlloc: v.PerAlloc, } if v.MountOptions != nil { diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 2d1637c19c1..dfb2d44e34d 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -3639,6 +3639,7 @@ func TestTaskGroupDiff(t *testing.T) { Type: "host", Source: "foo-src", ReadOnly: true, + PerAlloc: true, }, }, }, @@ -3656,6 +3657,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "", New: "foo", }, + { + Type: DiffTypeAdded, + Name: "PerAlloc", + Old: "", + New: "true", + }, { Type: DiffTypeAdded, Name: "ReadOnly", diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 0f6e5e70a79..67f771321ee 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -329,6 +329,17 @@ func AllocName(job, group string, idx uint) string { return fmt.Sprintf("%s.%s[%d]", job, group, idx) } +// AllocSuffix returns the alloc index suffix that was added by the AllocName +// function above. +func AllocSuffix(name string) string { + idx := strings.LastIndex(name, "[") + if idx == -1 { + return "" + } + suffix := name[idx:] + return suffix +} + // ACLPolicyListHash returns a consistent hash for a set of policies. func ACLPolicyListHash(policies []*ACLPolicy) string { cacheKeyHash, err := blake2b.New256(nil) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8e574cee8a8..88dc6a81555 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6097,7 +6097,7 @@ func (tg *TaskGroup) Validate(j *Job) error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader")) } - // Validate the Host Volumes + // Validate the volume requests for name, decl := range tg.Volumes { if !(decl.Type == VolumeTypeHost || decl.Type == VolumeTypeCSI) { @@ -6105,6 +6105,11 @@ func (tg *TaskGroup) Validate(j *Job) error { continue } + if decl.PerAlloc && tg.Update != nil && tg.Update.Canary > 0 { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Volume %s cannot be per_alloc when canaries are in use", name)) + } + if decl.Source == "" { mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name)) } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 4787c5832d4..09a411f2461 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1106,6 +1106,28 @@ func TestTaskGroup_Validate(t *testing.T) { err = tg.Validate(&Job{}) require.Contains(t, err.Error(), `Volume foo has an empty source`) + tg = &TaskGroup{ + Name: "group-a", + Update: &UpdateStrategy{ + Canary: 1, + }, + Volumes: map[string]*VolumeRequest{ + "foo": { + Type: "csi", + PerAlloc: true, + }, + }, + Tasks: []*Task{ + { + Name: "task-a", + Resources: &Resources{}, + }, + }, + } + err = tg.Validate(&Job{}) + require.Contains(t, err.Error(), `Volume foo has an empty source`) + require.Contains(t, err.Error(), `Volume foo cannot be per_alloc when canaries are in use`) + tg = &TaskGroup{ Volumes: map[string]*VolumeRequest{ "foo": { diff --git a/nomad/structs/volumes.go b/nomad/structs/volumes.go index 832c7af6653..ed6da134206 100644 --- a/nomad/structs/volumes.go +++ b/nomad/structs/volumes.go @@ -91,6 +91,7 @@ type VolumeRequest struct { Source string ReadOnly bool MountOptions *CSIMountOptions + PerAlloc bool } func (v *VolumeRequest) Copy() *VolumeRequest { diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 3ab25292db7..b3fb2c8d11e 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -227,15 +227,23 @@ func (c *CSIVolumeChecker) SetNamespace(namespace string) { c.namespace = namespace } -func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) { +func (c *CSIVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) { + xs := make(map[string]*structs.VolumeRequest) + // Filter to only CSI Volumes for alias, req := range volumes { if req.Type != structs.VolumeTypeCSI { continue } - - xs[alias] = req + if req.PerAlloc { + // provide a unique volume source per allocation + copied := req.Copy() + copied.Source = copied.Source + structs.AllocSuffix(allocName) + xs[alias] = copied + } else { + xs[alias] = req + } } c.volumes = xs } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 9caaa72cf81..44c99469b66 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -390,7 +390,7 @@ func TestCSIVolumeChecker(t *testing.T) { } for i, c := range cases { - checker.SetVolumes(c.RequestedVolumes) + checker.SetVolumes("group.task[0]", c.RequestedVolumes) if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 516fbf49c1b..852528874eb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -547,6 +547,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Compute penalty nodes for rescheduled allocs selectOptions := getSelectOptions(prevAllocation, preferredNode) + selectOptions.AllocName = missing.Name() option := s.selectNextOption(tg, selectOptions) // Store the available nodes by datacenter diff --git a/scheduler/stack.go b/scheduler/stack.go index bccabc7899a..f362df9a9c0 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -35,6 +35,7 @@ type SelectOptions struct { PenaltyNodeIDs map[string]struct{} PreferredNodes []*structs.Node Preempt bool + AllocName string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -143,7 +144,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) s.taskGroupHostVolumes.SetVolumes(tg.Volumes) - s.taskGroupCSIVolumes.SetVolumes(tg.Volumes) + s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) } @@ -297,7 +298,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) s.taskGroupHostVolumes.SetVolumes(tg.Volumes) - s.taskGroupCSIVolumes.SetVolumes(tg.Volumes) + s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 4b1e5c8cbfa..22684beb8af 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -284,7 +284,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { s.stack.SetNodes(nodes) // Attempt to match the task group - option := s.stack.Select(missing.TaskGroup, nil) + option := s.stack.Select(missing.TaskGroup, &SelectOptions{AllocName: missing.Name}) if option == nil { // If the task can't be placed on this node, update reporting data diff --git a/scheduler/util.go b/scheduler/util.go index 777c0e00f2b..86461a8f655 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -695,7 +695,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "", "") // Attempt to match the task group - option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions + option := stack.Select(update.TaskGroup, + &SelectOptions{AllocName: update.Alloc.Name}) // Pop the allocation ctx.Plan().PopUpdate(update.Alloc) @@ -977,7 +978,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "", "") // Attempt to match the task group - option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions + option := stack.Select(newTG, &SelectOptions{AllocName: existing.Name}) // Pop the allocation ctx.Plan().PopUpdate(existing) diff --git a/vendor/github.com/hashicorp/nomad/api/tasks.go b/vendor/github.com/hashicorp/nomad/api/tasks.go index ed268423b99..9c43df47625 100644 --- a/vendor/github.com/hashicorp/nomad/api/tasks.go +++ b/vendor/github.com/hashicorp/nomad/api/tasks.go @@ -382,6 +382,7 @@ type VolumeRequest struct { Source string `hcl:"source,optional"` ReadOnly bool `hcl:"read_only,optional"` MountOptions *CSIMountOptions `hcl:"mount_options,block"` + PerAlloc bool `hcl:"per_alloc,optional"` ExtraKeysHCL []string `hcl1:",unusedKeys,optional" json:"-"` }