Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: account for other claims in capability check #24684

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 91 additions & 12 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

proposed, err := h.ctx.ProposedAllocs(n.ID)
if err != nil {
return false // only hit this on state store invariant failure
}

for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.Source]
if !ok {
Expand All @@ -207,18 +212,12 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
if !h.hostVolumeIsAvailable(vol,
structs.HostVolumeAccessMode(req.AccessMode),
structs.HostVolumeAttachmentMode(req.AttachmentMode),
req.ReadOnly,
proposed,
) {
return false
}

Expand All @@ -242,6 +241,86 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

// hostVolumeIsAvailable determines if a dynamic host volume is available for a request
func (h *HostVolumeChecker) hostVolumeIsAvailable(
vol *structs.HostVolume,
reqAccess structs.HostVolumeAccessMode,
reqAttach structs.HostVolumeAttachmentMode,
readOnly bool,
proposed []*structs.Allocation) bool {

if vol.State != structs.HostVolumeStateReady {
return false
}

// pick a default capability based on the read-only flag. this happens here
// in the scheduler rather than job submit because we don't know whether a
// host volume is dynamic or not until we try to schedule it (ex. the same
// name could be static on one node and dynamic on another)
if reqAccess == structs.HostVolumeAccessModeUnknown {
if readOnly {
reqAccess = structs.HostVolumeAccessModeSingleNodeReader
} else {
reqAccess = structs.HostVolumeAccessModeSingleNodeWriter
}
}
if reqAttach == structs.HostVolumeAttachmentModeUnknown {
reqAttach = structs.HostVolumeAttachmentModeFilesystem
}

// check that the volume has the requested capability at all
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if reqAccess == cap.AccessMode &&
reqAttach == cap.AttachmentMode {
capOk = true
break
}
}
if !capOk {
return false
}

switch reqAccess {
case structs.HostVolumeAccessModeSingleNodeReader:
return readOnly
case structs.HostVolumeAccessModeSingleNodeWriter:
return !readOnly
case structs.HostVolumeAccessModeSingleNodeSingleWriter:
// examine all proposed allocs on the node, including those that might
// not have yet been persisted. they have nil pointers to their Job, so
// we have to go back to the state store to get them
seen := map[string]struct{}{}
for _, alloc := range proposed {
uniqueGroup := alloc.JobNamespacedID().String() + alloc.TaskGroup
if _, ok := seen[uniqueGroup]; ok {
// all allocs for the same group will have the same read-only
// flag and capabilities, so we only need to check a given group
// once
continue
}
seen[uniqueGroup] = struct{}{}
job, err := h.ctx.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
return false
}
tg := job.LookupTaskGroup(alloc.TaskGroup)
for _, req := range tg.Volumes {
if req.Type == structs.VolumeTypeHost && req.Source == vol.Name {
if !req.ReadOnly {
return false
}
}
}
}

case structs.HostVolumeAccessModeSingleNodeMultiWriter:
// no contraint
}

return true
}

type CSIVolumeChecker struct {
ctx Context
namespace string
Expand Down
151 changes: 149 additions & 2 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestRandomIterator(t *testing.T) {
}
}

func TestHostVolumeChecker(t *testing.T) {
func TestHostVolumeChecker_Static(t *testing.T) {
ci.Parallel(t)

_, ctx := testContext(t)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestHostVolumeChecker(t *testing.T) {
}
}

func TestHostVolumeChecker_ReadOnly(t *testing.T) {
func TestHostVolumeChecker_Dynamic(t *testing.T) {
ci.Parallel(t)

store, ctx := testContext(t)
Expand Down Expand Up @@ -284,6 +284,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
"foo": {
Type: "host",
Source: "foo",
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
Expand Down Expand Up @@ -475,6 +476,152 @@ func TestHostVolumeChecker_Sticky(t *testing.T) {
}
}

// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the
// hostVolumeIsAvailable method
func TestDynamicHostVolumeIsAvailable(t *testing.T) {

store, ctx := testContext(t)

allCaps := []*structs.HostVolumeCapability{}

for _, accessMode := range []structs.HostVolumeAccessMode{
structs.HostVolumeAccessModeSingleNodeReader,
structs.HostVolumeAccessModeSingleNodeWriter,
structs.HostVolumeAccessModeSingleNodeSingleWriter,
structs.HostVolumeAccessModeSingleNodeMultiWriter,
} {
for _, attachMode := range []structs.HostVolumeAttachmentMode{
structs.HostVolumeAttachmentModeFilesystem,
structs.HostVolumeAttachmentModeBlockDevice,
} {
allCaps = append(allCaps, &structs.HostVolumeCapability{
AttachmentMode: attachMode,
AccessMode: accessMode,
})
}
}

jobReader, jobWriter := mock.Job(), mock.Job()
jobReader.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"example": {
Type: structs.VolumeTypeHost,
Source: "example",
ReadOnly: true,
},
}
jobWriter.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"example": {
Type: structs.VolumeTypeHost,
Source: "example",
},
}
index, _ := store.LatestIndex()
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobReader))
index++
must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobWriter))

allocReader0, allocReader1 := mock.Alloc(), mock.Alloc()
allocReader0.JobID = jobReader.ID
allocReader1.JobID = jobReader.ID

allocWriter0, allocWriter1 := mock.Alloc(), mock.Alloc()
allocWriter0.JobID = jobWriter.ID
allocWriter1.JobID = jobWriter.ID

index++
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{allocReader0, allocReader1, allocWriter0, allocWriter1}))

testCases := []struct {
name string
hasProposed []*structs.Allocation
hasCaps []*structs.HostVolumeCapability
wantAccess structs.HostVolumeAccessMode
wantAttach structs.HostVolumeAttachmentMode
readOnly bool
expect bool
}{
{
name: "enforce attachment mode",
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce read only",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
expect: false,
},
{
name: "enforce read only ok",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeReader,
readOnly: true,
expect: true,
},
{
name: "enforce single writer",
hasProposed: []*structs.Allocation{allocReader0, allocReader1, allocWriter0},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: false,
},
{
name: "enforce single writer ok",
hasProposed: []*structs.Allocation{allocReader0, allocReader1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter,
expect: true,
},
{
name: "multi writer is always ok",
hasProposed: []*structs.Allocation{allocReader0, allocWriter0, allocWriter1},
wantAttach: structs.HostVolumeAttachmentModeFilesystem,
wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter,
expect: true,
},
{
name: "default capabilities ok",
expect: true,
},
{
name: "default capabilities fail",
readOnly: true,
hasCaps: []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice,
AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter,
}},
expect: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
vol := &structs.HostVolume{
Name: "example",
State: structs.HostVolumeStateReady,
}
if len(tc.hasCaps) > 0 {
vol.RequestedCapabilities = tc.hasCaps
} else {
vol.RequestedCapabilities = allCaps
}
checker := NewHostVolumeChecker(ctx)
must.Eq(t, tc.expect, checker.hostVolumeIsAvailable(
vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.hasProposed))
})
}

}

func TestCSIVolumeChecker(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
}
}

func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) {
func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)
Expand Down
Loading