Skip to content

Commit

Permalink
[wip] CSI: unique volume per allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed Mar 8, 2021
1 parent f270a44 commit dd628be
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 10 deletions.
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type VolumeRequest struct {
Source string `hcl:"source,optional"`
ReadOnly bool `hcl:"read_only,optional"`
MountOptions *CSIMountOptions `hcl:"mount_options,block"`
PerAlloc bool `hcl:"per_alloc,optional"`
ExtraKeysHCL []string `hcl1:",unusedKeys,optional" json:"-"`
}

Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
Type: v.Type,
ReadOnly: v.ReadOnly,
Source: v.Source,
PerAlloc: v.PerAlloc,
}

if v.MountOptions != nil {
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3639,6 +3639,7 @@ func TestTaskGroupDiff(t *testing.T) {
Type: "host",
Source: "foo-src",
ReadOnly: true,
PerAlloc: true,
},
},
},
Expand All @@ -3656,6 +3657,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "foo",
},
{
Type: DiffTypeAdded,
Name: "PerAlloc",
Old: "",
New: "true",
},
{
Type: DiffTypeAdded,
Name: "ReadOnly",
Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ func AllocName(job, group string, idx uint) string {
return fmt.Sprintf("%s.%s[%d]", job, group, idx)
}

// AllocSuffix returns the alloc index suffix that was added by the AllocName
// function above.
func AllocSuffix(name string) string {
idx := strings.LastIndex(name, "[")
if idx == -1 {
return ""
}
suffix := name[idx:]
return suffix
}

// ACLPolicyListHash returns a consistent hash for a set of policies.
func ACLPolicyListHash(policies []*ACLPolicy) string {
cacheKeyHash, err := blake2b.New256(nil)
Expand Down
7 changes: 6 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6097,14 +6097,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}

// Validate the Host Volumes
// Validate the volume requests
for name, decl := range tg.Volumes {
if !(decl.Type == VolumeTypeHost ||
decl.Type == VolumeTypeCSI) {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type))
continue
}

if decl.PerAlloc && tg.Update != nil && tg.Update.Canary > 0 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Volume %s cannot be per_alloc when canaries are in use", name))
}

if decl.Source == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name))
}
Expand Down
22 changes: 22 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,28 @@ func TestTaskGroup_Validate(t *testing.T) {
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)

tg = &TaskGroup{
Name: "group-a",
Update: &UpdateStrategy{
Canary: 1,
},
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "csi",
PerAlloc: true,
},
},
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{},
},
},
}
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)
require.Contains(t, err.Error(), `Volume foo cannot be per_alloc when canaries are in use`)

tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type VolumeRequest struct {
Source string
ReadOnly bool
MountOptions *CSIMountOptions
PerAlloc bool
}

func (v *VolumeRequest) Copy() *VolumeRequest {
Expand Down
14 changes: 11 additions & 3 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,23 @@ func (c *CSIVolumeChecker) SetNamespace(namespace string) {
c.namespace = namespace
}

func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
func (c *CSIVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) {

xs := make(map[string]*structs.VolumeRequest)

// Filter to only CSI Volumes
for alias, req := range volumes {
if req.Type != structs.VolumeTypeCSI {
continue
}

xs[alias] = req
if req.PerAlloc {
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
xs[alias] = copied
} else {
xs[alias] = req
}
}
c.volumes = xs
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func TestCSIVolumeChecker(t *testing.T) {
}

for i, c := range cases {
checker.SetVolumes(c.RequestedVolumes)
checker.SetVolumes("group.task[0]", c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Compute penalty nodes for rescheduled allocs
selectOptions := getSelectOptions(prevAllocation, preferredNode)
selectOptions.AllocName = missing.Name()
option := s.selectNextOption(tg, selectOptions)

// Store the available nodes by datacenter
Expand Down
5 changes: 3 additions & 2 deletions scheduler/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
}

// GenericStack is the Stack used for the Generic scheduler. It is
Expand Down Expand Up @@ -143,7 +144,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
Expand Down Expand Up @@ -297,7 +298,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
Expand Down
2 changes: 1 addition & 1 deletion scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.stack.SetNodes(nodes)

// Attempt to match the task group
option := s.stack.Select(missing.TaskGroup, nil)
option := s.stack.Select(missing.TaskGroup, &SelectOptions{AllocName: missing.Name})

if option == nil {
// If the task can't be placed on this node, update reporting data
Expand Down
5 changes: 3 additions & 2 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
option := stack.Select(update.TaskGroup,
&SelectOptions{AllocName: update.Alloc.Name})

// Pop the allocation
ctx.Plan().PopUpdate(update.Alloc)
Expand Down Expand Up @@ -977,7 +978,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "", "")

// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions
option := stack.Select(newTG, &SelectOptions{AllocName: existing.Name})

// Pop the allocation
ctx.Plan().PopUpdate(existing)
Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/hashicorp/nomad/api/tasks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit dd628be

Please sign in to comment.