Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: account for other claims in capability check #24684

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 89 additions & 12 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

proposed, err := h.ctx.ProposedAllocs(n.ID)
if err != nil {
return false // only hit this on state store invariant failure
}

for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.Source]
if !ok {
Expand All @@ -207,18 +212,12 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
if !h.hostVolumeIsAvailable(vol,
structs.HostVolumeAccessMode(req.AccessMode),
structs.HostVolumeAttachmentMode(req.AttachmentMode),
req.ReadOnly,
proposed,
) {
return false
}

Expand All @@ -242,6 +241,84 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

// hostVolumeIsAvailable determines if a dynamic host volume is available for a request
func (h *HostVolumeChecker) hostVolumeIsAvailable(
vol *structs.HostVolume,
reqAccess structs.HostVolumeAccessMode,
reqAttach structs.HostVolumeAttachmentMode,
readOnly bool,
proposed []*structs.Allocation) bool {

if vol.State != structs.HostVolumeStateReady {
return false
}

// pick a default capability based on the read-only flag. this happens here
// in the scheduler rather than job submit because we don't know whether a
// host volume is dynamic or not until we try to schedule it (ex. the same
// name could be static on one node and dynamic on another)
if reqAccess == structs.HostVolumeAccessModeUnknown {
if readOnly {
reqAccess = structs.HostVolumeAccessModeSingleNodeReader
} else {
reqAccess = structs.HostVolumeAccessModeSingleNodeWriter
}
}
if reqAttach == structs.HostVolumeAttachmentModeUnknown {
reqAttach = structs.HostVolumeAttachmentModeFilesystem
}

// check that the volume has the requested capability at all
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if reqAccess == cap.AccessMode &&
reqAttach == cap.AttachmentMode {
capOk = true
break
}
}
if !capOk {
return false
}

switch reqAccess {
case structs.HostVolumeAccessModeSingleNodeReader:
return readOnly
case structs.HostVolumeAccessModeSingleNodeWriter:
return !readOnly
case structs.HostVolumeAccessModeSingleNodeSingleWriter:
// examine all proposed allocs on the node, including those that might
// not have yet been persisted. they have nil pointers to their Job, so
// we have to go back to the state store to get them
seen := map[structs.NamespacedID]struct{}{}
for _, alloc := range proposed {
if _, ok := seen[alloc.JobNamespacedID()]; ok {
// all allocs for the same job will have the same read-only flag
// and capabilities, so we only need to check a given job once
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what this is doing, but I don't follow the reasoning in this comment. I read it as meaning that different groups in the job will all have the same volume params, but this is more about allocs per group, specifically count, right?

Suggested change
// all allocs for the same job will have the same read-only flag
// and capabilities, so we only need to check a given job once
continue
// all allocs for the same task group will have the same read-only
// flag and capabilities, so we only need to check a given job once
continue

using the job's ns+id as the seen key, without any reference to group, threw me a bit, I think. not that you should change that, just that the comment is pretty important to clarify this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, very good point. So that's actually a bug here because if a job has multiple allocs for different task groups on the same node, then we'd potentially miss volume requests to check. Ex. a job has alloc A for group A and alloc B for group B on the same node, and only group B has a volume request. If we happen to check alloc A first we'd miss that.

Will fix.

}
seen[alloc.JobNamespacedID()] = struct{}{}
job, err := h.ctx.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
return false
}
tg := job.LookupTaskGroup(alloc.TaskGroup)
for _, req := range tg.Volumes {
if req.Type == structs.VolumeTypeHost && req.Source == vol.Name {
if !req.ReadOnly {
return false
}
}
}
}

case structs.HostVolumeAccessModeSingleNodeMultiWriter:
// no contraint
}

return true
}

type CSIVolumeChecker struct {
ctx Context
namespace string
Expand Down
151 changes: 149 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestRandomIterator(t *testing.T) {
}
}

func TestHostVolumeChecker(t *testing.T) {
func TestHostVolumeChecker_Static(t *testing.T) {
ci.Parallel(t)

_, ctx := testContext(t)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestHostVolumeChecker(t *testing.T) {
}
}

func TestHostVolumeChecker_ReadOnly(t *testing.T) {
func TestHostVolumeChecker_Dynamic(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)
Expand Down Expand Up @@ -284,6 +284,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
"foo": {
Type: "host",
Source: "foo",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
Expand Down Expand Up @@ -475,6 +476,152 @@ func TestHostVolumeChecker_Sticky(t *testing.T) {
}
}

// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the
// dynamicHostVolumeIsAvailable function
func TestDynamicHostVolumeIsAvailable(t *testing.T) {

store, ctx := testContext(t)

allCaps := []*structs.HostVolumeCapability{}

for _, accessMode := range []structs.HostVolumeAccessMode{
structs.HostVolumeAccessModeSingleNodeReader,
structs.HostVolumeAccessModeSingleNodeWriter,
structs.HostVolumeAccessModeSingleNodeSingleWriter,
structs.HostVolumeAccessModeSingleNodeMultiWriter,
} {
for _, attachMode := range []structs.HostVolumeAttachmentMode{
structs.HostVolumeAttachmentModeFilesystem,
structs.HostVolumeAttachmentModeBlockDevice,
} {
allCaps = append(allCaps, &structs.HostVolumeCapability{
AttachmentMode: attachMode,
AccessMode: accessMode,
})
}
}

jobReader, jobWriter := mock.Job(), mock.Job()
jobReader.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"example": {
Type: structs.VolumeTypeHost,
Source: "example",
ReadOnly: true,
},
}
jobWriter.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"example": {
Type: structs.VolumeTypeHost,
Source: "example",
},
}
index, _ := store.LatestIndex()
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobReader))
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobWriter))

allocReader0, allocReader1 := mock.Alloc(), mock.Alloc()
allocReader0.JobID = jobReader.ID
allocReader1.JobID = jobReader.ID

allocWriter0, allocWriter1 := mock.Alloc(), mock.Alloc()
allocWriter0.JobID = jobWriter.ID
allocWriter1.JobID = jobWriter.ID

index++
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{allocReader0, allocReader1, allocWriter0, allocWriter1}))

testCases := []struct {
name string
hasProposed []*structs.Allocation
hasCaps []*structs.HostVolumeCapability
wantAccess structs.HostVolumeAccessMode
wantAttach structs.HostVolumeAttachmentMode
readOnly bool
expect bool
}{
{
name: "enforce attachment mode",
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce read only",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
expect: false,
},
{
name: "enforce read only ok",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
readOnly: true,
expect: true,
},
{
name: "enforce single writer",
hasProposed: []*structs.Allocation{allocReader0, allocReader1, allocWriter0},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce single writer ok",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: true,
},
{
name: "multi writer is always ok",
hasProposed: []*structs.Allocation{allocReader0, allocWriter0, allocWriter1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter,
expect: true,
},
{
name: "default capabilities ok",
expect: true,
},
{
name: "default capabilities fail",
readOnly: true,
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
expect: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
vol := &structs.HostVolume{
Name: "example",
State: structs.HostVolumeStateReady,
}
if len(tc.hasCaps) > 0 {
vol.RequestedCapabilities = tc.hasCaps
} else {
vol.RequestedCapabilities = allCaps
}
checker := NewHostVolumeChecker(ctx)
must.Eq(t, tc.expect, checker.hostVolumeIsAvailable(
vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.hasProposed))
})
}

}

func TestCSIVolumeChecker(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
}
}

func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) {
func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)
Expand Down
Loading