Skip to content

Commit

Permalink
CSI: unique volume per allocation
Browse files Browse the repository at this point in the history
Add a `PerAlloc` field to volume requests that directs the scheduler to test
feasibility for volumes with a source ID that includes the allocation index
suffix (ex. `[0]`), rather than the exact source ID.

Read the `PerAlloc` field when making the volume claim at the client to
determine if the allocation index suffix (ex. `[0]`) should be added to the
volume source ID.
  • Loading branch information
tgross authored Mar 18, 2021
1 parent a1eaad9 commit 7c75696
Show file tree
Hide file tree
Showing 25 changed files with 329 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ BUG FIXES:
IMPROVEMENTS:
* cli: Update defaults for `nomad operator debug` flags `-interval` and `-server-id` to match common usage. [[GH-10121](https://github.com/hashicorp/nomad/issues/10121)]
* consul/connect: Enable setting `local_bind_address` field on connect upstreams [[GH-6248](https://github.com/hashicorp/nomad/issues/6248)]
* csi: Added support for jobs to request a unique volume ID per allocation. [[GH-10136](https://github.com/hashicorp/nomad/issues/10136)]
* driver/docker: Added support for optional extra container labels. [[GH-9885](https://github.com/hashicorp/nomad/issues/9885)]
* driver/docker: Added support for configuring default logger behavior in the client configuration. [[GH-10156](https://github.com/hashicorp/nomad/issues/10156)]

Expand Down
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type VolumeRequest struct {
Source string `hcl:"source,optional"`
ReadOnly bool `hcl:"read_only,optional"`
MountOptions *CSIMountOptions `hcl:"mount_options,block"`
PerAlloc bool `hcl:"per_alloc,optional"`
ExtraKeysHCL []string `hcl1:",unusedKeys,optional" json:"-"`
}

Expand Down
15 changes: 13 additions & 2 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,14 @@ func (c *csiHook) Postrun() error {
mode = structs.CSIVolumeClaimWrite
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: pair.request.Source,
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Expand Down Expand Up @@ -159,8 +165,13 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
claimType = structs.CSIVolumeClaimRead
}

source := pair.request.Source
if pair.request.PerAlloc {
source = source + structs.AllocSuffix(c.alloc.Name)
}

req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
VolumeID: source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
Type: v.Type,
ReadOnly: v.ReadOnly,
Source: v.Source,
PerAlloc: v.PerAlloc,
}

if v.MountOptions != nil {
Expand Down
5 changes: 3 additions & 2 deletions e2e/csi/input/use-ebs-volume.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ job "use-ebs-volume" {

group "group" {
volume "test" {
type = "csi"
source = "ebs-vol0"
type = "csi"
source = "ebs-vol"
per_alloc = true
}

task "task" {
Expand Down
2 changes: 1 addition & 1 deletion e2e/terraform/compute.tf
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ data "external" "packer_sha" {
sha=$(git log -n 1 --pretty=format:%H packer)
echo "{\"sha\":\"$${sha}\"}"
EOT
]
]

}

Expand Down
2 changes: 1 addition & 1 deletion e2e/terraform/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ nomad_local_binary = "" # overrides nomad_sha and nomad_version if set

# Example overrides:
# nomad_sha = "38e23b62a7700c96f4898be777543869499fea0a"
# nomad_local_binary = "../../pkg/linux_amd/nomad"
# nomad_local_binary = "../../pkg/linux_amd64/nomad"
# nomad_local_binary_client_windows_2016_amd64 = ["../../pkg/windows_amd64/nomad.exe"]
4 changes: 2 additions & 2 deletions e2e/terraform/volumes.tf
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ data "template_file" "ebs_volume_hcl" {
count = var.volumes ? 1 : 0
template = <<EOT
type = "csi"
id = "ebs-vol0"
name = "ebs-vol0"
id = "ebs-vol[0]"
name = "ebs-vol"
external_id = "${aws_ebs_volume.csi[0].id}"
access_mode = "single-node-writer"
attachment_mode = "file-system"
Expand Down
5 changes: 1 addition & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2135,10 +2135,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
if obj == nil {
return nil, nil
}
vol, ok := obj.(*structs.CSIVolume)
if !ok {
return nil, fmt.Errorf("volume row conversion error")
}
vol := obj.(*structs.CSIVolume)

// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3639,6 +3639,7 @@ func TestTaskGroupDiff(t *testing.T) {
Type: "host",
Source: "foo-src",
ReadOnly: true,
PerAlloc: true,
},
},
},
Expand All @@ -3656,6 +3657,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "foo",
},
{
Type: DiffTypeAdded,
Name: "PerAlloc",
Old: "",
New: "true",
},
{
Type: DiffTypeAdded,
Name: "ReadOnly",
Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ func AllocName(job, group string, idx uint) string {
return fmt.Sprintf("%s.%s[%d]", job, group, idx)
}

// AllocSuffix returns the alloc index suffix that was added by the AllocName
// function above.
func AllocSuffix(name string) string {
idx := strings.LastIndex(name, "[")
if idx == -1 {
return ""
}
suffix := name[idx:]
return suffix
}

// ACLPolicyListHash returns a consistent hash for a set of policies.
func ACLPolicyListHash(policies []*ACLPolicy) string {
cacheKeyHash, err := blake2b.New256(nil)
Expand Down
7 changes: 6 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6100,14 +6100,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}

// Validate the Host Volumes
// Validate the volume requests
for name, decl := range tg.Volumes {
if !(decl.Type == VolumeTypeHost ||
decl.Type == VolumeTypeCSI) {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type))
continue
}

if decl.PerAlloc && tg.Update != nil && tg.Update.Canary > 0 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Volume %s cannot be per_alloc when canaries are in use", name))
}

if decl.Source == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name))
}
Expand Down
22 changes: 22 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,28 @@ func TestTaskGroup_Validate(t *testing.T) {
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)

tg = &TaskGroup{
Name: "group-a",
Update: &UpdateStrategy{
Canary: 1,
},
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "csi",
PerAlloc: true,
},
},
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{},
},
},
}
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)
require.Contains(t, err.Error(), `Volume foo cannot be per_alloc when canaries are in use`)

tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type VolumeRequest struct {
Source string
ReadOnly bool
MountOptions *CSIMountOptions
PerAlloc bool
}

func (v *VolumeRequest) Copy() *VolumeRequest {
Expand Down
21 changes: 14 additions & 7 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,31 +227,38 @@ func (c *CSIVolumeChecker) SetNamespace(namespace string) {
c.namespace = namespace
}

func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
func (c *CSIVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) {

xs := make(map[string]*structs.VolumeRequest)

// Filter to only CSI Volumes
for alias, req := range volumes {
if req.Type != structs.VolumeTypeCSI {
continue
}

xs[alias] = req
if req.PerAlloc {
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
xs[alias] = copied
} else {
xs[alias] = req
}
}
c.volumes = xs
}

func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool {
hasPlugins, failReason := c.hasPlugins(n)

if hasPlugins {
ok, failReason := c.isFeasible(n)
if ok {
return true
}

c.ctx.Metrics().FilterNode(n, failReason)
return false
}

func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) {
// We can mount the volume if
// - if required, a healthy controller plugin is running the driver
// - the volume has free claims, or this job owns the claims
Expand Down
19 changes: 16 additions & 3 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ func TestCSIVolumeChecker(t *testing.T) {
require.NoError(t, err)
index++

vid3 := "volume-id[0]"
vol3 := vol.Copy()
vol3.ID = vid3
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol3})
require.NoError(t, err)
index++

alloc := mock.Alloc()
alloc.NodeID = nodes[4].ID
alloc.Job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
Expand All @@ -332,11 +339,17 @@ func TestCSIVolumeChecker(t *testing.T) {
noVolumes := map[string]*structs.VolumeRequest{}

volumes := map[string]*structs.VolumeRequest{
"baz": {
"shared": {
Type: "csi",
Name: "baz",
Source: "volume-id",
},
"unique": {
Type: "csi",
Name: "baz",
Source: "volume-id[0]",
PerAlloc: true,
},
"nonsense": {
Type: "host",
Name: "nonsense",
Expand Down Expand Up @@ -390,7 +403,7 @@ func TestCSIVolumeChecker(t *testing.T) {
}

for i, c := range cases {
checker.SetVolumes(c.RequestedVolumes)
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand All @@ -407,7 +420,7 @@ func TestCSIVolumeChecker(t *testing.T) {
checker.SetNamespace(structs.DefaultNamespace)

for _, node := range nodes {
checker.SetVolumes(volumes)
checker.SetVolumes(alloc.Name, volumes)
act := checker.Feasible(node)
require.False(t, act, "request with missing volume should never be feasible")
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Compute penalty nodes for rescheduled allocs
selectOptions := getSelectOptions(prevAllocation, preferredNode)
selectOptions.AllocName = missing.Name()
option := s.selectNextOption(tg, selectOptions)

// Store the available nodes by datacenter
Expand Down
Loading

0 comments on commit 7c75696

Please sign in to comment.