Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: unique volume per allocation #10136

Merged
merged 6 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ BUG FIXES:
IMPROVEMENTS:
* cli: Update defaults for `nomad operator debug` flags `-interval` and `-server-id` to match common usage. [[GH-10121](https://github.com/hashicorp/nomad/issues/10121)]
* consul/connect: Enable setting `local_bind_address` field on connect upstreams [[GH-6248](https://github.com/hashicorp/nomad/issues/6248)]
* csi: Added support for jobs to request a unique volume ID per allocation. [[GH-10136](https://github.com/hashicorp/nomad/issues/10136)]
* driver/docker: Added support for optional extra container labels. [[GH-9885](https://github.com/hashicorp/nomad/issues/9885)]
* driver/docker: Added support for configuring default logger behavior in the client configuration. [[GH-10156](https://github.com/hashicorp/nomad/issues/10156)]

Expand Down
1 change: 1 addition & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ type VolumeRequest struct {
Source string `hcl:"source,optional"`
ReadOnly bool `hcl:"read_only,optional"`
MountOptions *CSIMountOptions `hcl:"mount_options,block"`
PerAlloc bool `hcl:"per_alloc,optional"`
ExtraKeysHCL []string `hcl1:",unusedKeys,optional" json:"-"`
}

Expand Down
15 changes: 13 additions & 2 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,14 @@ func (c *csiHook) Postrun() error {
mode = structs.CSIVolumeClaimWrite
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall an issue where alloc indexes aren't unique, and we may run into two allocations sharing the same id in cases of canaries where multiple deployment versions are running. Would that cause an issue here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would! But it's one of our invariants that if you're running a job with PerAlloc you can't also use canaries. (It doesn't make sense as a concept with volumes.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make a note of that and/or add a test for that. I fear one day, we relax or change the requirement a bit and miss this assumption in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll definitely add some more commentary to make that clear though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized I totally missed the docs too!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 3b1d19c

}

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: pair.request.Source,
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Expand Down Expand Up @@ -159,8 +165,13 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
claimType = structs.CSIVolumeClaimRead
}

source := pair.request.Source
if pair.request.PerAlloc {
source = source + structs.AllocSuffix(c.alloc.Name)
}

req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
VolumeID: source,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Claim: claimType,
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,7 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
Type: v.Type,
ReadOnly: v.ReadOnly,
Source: v.Source,
PerAlloc: v.PerAlloc,
}

if v.MountOptions != nil {
Expand Down
5 changes: 3 additions & 2 deletions e2e/csi/input/use-ebs-volume.nomad
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ job "use-ebs-volume" {

group "group" {
volume "test" {
type = "csi"
source = "ebs-vol0"
type = "csi"
source = "ebs-vol"
per_alloc = true
}

task "task" {
Expand Down
2 changes: 1 addition & 1 deletion e2e/terraform/compute.tf
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ data "external" "packer_sha" {
sha=$(git log -n 1 --pretty=format:%H packer)
echo "{\"sha\":\"$${sha}\"}"
EOT
]
]

}

Expand Down
2 changes: 1 addition & 1 deletion e2e/terraform/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ nomad_local_binary = "" # overrides nomad_sha and nomad_version if set

# Example overrides:
# nomad_sha = "38e23b62a7700c96f4898be777543869499fea0a"
# nomad_local_binary = "../../pkg/linux_amd/nomad"
# nomad_local_binary = "../../pkg/linux_amd64/nomad"
# nomad_local_binary_client_windows_2016_amd64 = ["../../pkg/windows_amd64/nomad.exe"]
4 changes: 2 additions & 2 deletions e2e/terraform/volumes.tf
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ data "template_file" "ebs_volume_hcl" {
count = var.volumes ? 1 : 0
template = <<EOT
type = "csi"
id = "ebs-vol0"
name = "ebs-vol0"
id = "ebs-vol[0]"
name = "ebs-vol"
external_id = "${aws_ebs_volume.csi[0].id}"
access_mode = "single-node-writer"
attachment_mode = "file-system"
Expand Down
5 changes: 1 addition & 4 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2135,10 +2135,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
if obj == nil {
return nil, nil
}
vol, ok := obj.(*structs.CSIVolume)
if !ok {
return nil, fmt.Errorf("volume row conversion error")
}
vol := obj.(*structs.CSIVolume)

// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
Expand Down
7 changes: 7 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3639,6 +3639,7 @@ func TestTaskGroupDiff(t *testing.T) {
Type: "host",
Source: "foo-src",
ReadOnly: true,
PerAlloc: true,
},
},
},
Expand All @@ -3656,6 +3657,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "foo",
},
{
Type: DiffTypeAdded,
Name: "PerAlloc",
Old: "",
New: "true",
},
{
Type: DiffTypeAdded,
Name: "ReadOnly",
Expand Down
11 changes: 11 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ func AllocName(job, group string, idx uint) string {
return fmt.Sprintf("%s.%s[%d]", job, group, idx)
}

// AllocSuffix returns the alloc index suffix that was added by the AllocName
// function above.
func AllocSuffix(name string) string {
idx := strings.LastIndex(name, "[")
if idx == -1 {
return ""
}
suffix := name[idx:]
return suffix
}

// ACLPolicyListHash returns a consistent hash for a set of policies.
func ACLPolicyListHash(policies []*ACLPolicy) string {
cacheKeyHash, err := blake2b.New256(nil)
Expand Down
7 changes: 6 additions & 1 deletion nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6100,14 +6100,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}

// Validate the Host Volumes
// Validate the volume requests
for name, decl := range tg.Volumes {
if !(decl.Type == VolumeTypeHost ||
decl.Type == VolumeTypeCSI) {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type))
continue
}

if decl.PerAlloc && tg.Update != nil && tg.Update.Canary > 0 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Volume %s cannot be per_alloc when canaries are in use", name))
}

if decl.Source == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has an empty source", name))
}
Expand Down
22 changes: 22 additions & 0 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,28 @@ func TestTaskGroup_Validate(t *testing.T) {
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)

tg = &TaskGroup{
Name: "group-a",
Update: &UpdateStrategy{
Canary: 1,
},
Volumes: map[string]*VolumeRequest{
"foo": {
Type: "csi",
PerAlloc: true,
},
},
Tasks: []*Task{
{
Name: "task-a",
Resources: &Resources{},
},
},
}
err = tg.Validate(&Job{})
require.Contains(t, err.Error(), `Volume foo has an empty source`)
require.Contains(t, err.Error(), `Volume foo cannot be per_alloc when canaries are in use`)

tg = &TaskGroup{
Volumes: map[string]*VolumeRequest{
"foo": {
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type VolumeRequest struct {
Source string
ReadOnly bool
MountOptions *CSIMountOptions
PerAlloc bool
}

func (v *VolumeRequest) Copy() *VolumeRequest {
Expand Down
21 changes: 14 additions & 7 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,31 +227,38 @@ func (c *CSIVolumeChecker) SetNamespace(namespace string) {
c.namespace = namespace
}

func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
func (c *CSIVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) {

xs := make(map[string]*structs.VolumeRequest)

// Filter to only CSI Volumes
for alias, req := range volumes {
if req.Type != structs.VolumeTypeCSI {
continue
}

xs[alias] = req
if req.PerAlloc {
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
xs[alias] = copied
} else {
xs[alias] = req
}
}
c.volumes = xs
}

func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool {
hasPlugins, failReason := c.hasPlugins(n)

if hasPlugins {
ok, failReason := c.isFeasible(n)
if ok {
return true
}

c.ctx.Metrics().FilterNode(n, failReason)
return false
}

func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) {
func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) {
// We can mount the volume if
// - if required, a healthy controller plugin is running the driver
// - the volume has free claims, or this job owns the claims
Expand Down
19 changes: 16 additions & 3 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ func TestCSIVolumeChecker(t *testing.T) {
require.NoError(t, err)
index++

vid3 := "volume-id[0]"
vol3 := vol.Copy()
vol3.ID = vid3
err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol3})
require.NoError(t, err)
index++

alloc := mock.Alloc()
alloc.NodeID = nodes[4].ID
alloc.Job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
Expand All @@ -332,11 +339,17 @@ func TestCSIVolumeChecker(t *testing.T) {
noVolumes := map[string]*structs.VolumeRequest{}

volumes := map[string]*structs.VolumeRequest{
"baz": {
"shared": {
Type: "csi",
Name: "baz",
Source: "volume-id",
},
"unique": {
Type: "csi",
Name: "baz",
Source: "volume-id[0]",
PerAlloc: true,
},
"nonsense": {
Type: "host",
Name: "nonsense",
Expand Down Expand Up @@ -390,7 +403,7 @@ func TestCSIVolumeChecker(t *testing.T) {
}

for i, c := range cases {
checker.SetVolumes(c.RequestedVolumes)
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand All @@ -407,7 +420,7 @@ func TestCSIVolumeChecker(t *testing.T) {
checker.SetNamespace(structs.DefaultNamespace)

for _, node := range nodes {
checker.SetVolumes(volumes)
checker.SetVolumes(alloc.Name, volumes)
act := checker.Feasible(node)
require.False(t, act, "request with missing volume should never be feasible")
}
Expand Down
1 change: 1 addition & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Compute penalty nodes for rescheduled allocs
selectOptions := getSelectOptions(prevAllocation, preferredNode)
selectOptions.AllocName = missing.Name()
option := s.selectNextOption(tg, selectOptions)

// Store the available nodes by datacenter
Expand Down
Loading