diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 69ab03de7c3..fa1800b2ae0 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -192,6 +192,11 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } + proposed, err := h.ctx.ProposedAllocs(n.ID) + if err != nil { + return false // only hit this on state store invariant failure + } + for _, req := range h.volumeReqs { volCfg, ok := n.HostVolumes[req.Source] if !ok { @@ -207,18 +212,12 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { // raft entry completes return false } - if vol.State != structs.HostVolumeStateReady { - return false - } - var capOk bool - for _, cap := range vol.RequestedCapabilities { - if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && - req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { - capOk = true - break - } - } - if !capOk { + if !h.hostVolumeIsAvailable(vol, + structs.HostVolumeAccessMode(req.AccessMode), + structs.HostVolumeAttachmentMode(req.AttachmentMode), + req.ReadOnly, + proposed, + ) { return false } @@ -242,6 +241,86 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { return true } +// hostVolumeIsAvailable determines if a dynamic host volume is available for a request +func (h *HostVolumeChecker) hostVolumeIsAvailable( + vol *structs.HostVolume, + reqAccess structs.HostVolumeAccessMode, + reqAttach structs.HostVolumeAttachmentMode, + readOnly bool, + proposed []*structs.Allocation) bool { + + if vol.State != structs.HostVolumeStateReady { + return false + } + + // pick a default capability based on the read-only flag. this happens here + // in the scheduler rather than job submit because we don't know whether a + // host volume is dynamic or not until we try to schedule it (ex. the same + // name could be static on one node and dynamic on another) + if reqAccess == structs.HostVolumeAccessModeUnknown { + if readOnly { + reqAccess = structs.HostVolumeAccessModeSingleNodeReader + } else { + reqAccess = structs.HostVolumeAccessModeSingleNodeWriter + } + } + if reqAttach == structs.HostVolumeAttachmentModeUnknown { + reqAttach = structs.HostVolumeAttachmentModeFilesystem + } + + // check that the volume has the requested capability at all + var capOk bool + for _, cap := range vol.RequestedCapabilities { + if reqAccess == cap.AccessMode && + reqAttach == cap.AttachmentMode { + capOk = true + break + } + } + if !capOk { + return false + } + + switch reqAccess { + case structs.HostVolumeAccessModeSingleNodeReader: + return readOnly + case structs.HostVolumeAccessModeSingleNodeWriter: + return !readOnly + case structs.HostVolumeAccessModeSingleNodeSingleWriter: + // examine all proposed allocs on the node, including those that might + // not have yet been persisted. they have nil pointers to their Job, so + // we have to go back to the state store to get them + seen := map[string]struct{}{} + for _, alloc := range proposed { + uniqueGroup := alloc.JobNamespacedID().String() + alloc.TaskGroup + if _, ok := seen[uniqueGroup]; ok { + // all allocs for the same group will have the same read-only + // flag and capabilities, so we only need to check a given group + // once + continue + } + seen[uniqueGroup] = struct{}{} + job, err := h.ctx.State().JobByID(nil, alloc.Namespace, alloc.JobID) + if err != nil { + return false + } + tg := job.LookupTaskGroup(alloc.TaskGroup) + for _, req := range tg.Volumes { + if req.Type == structs.VolumeTypeHost && req.Source == vol.Name { + if !req.ReadOnly { + return false + } + } + } + } + + case structs.HostVolumeAccessModeSingleNodeMultiWriter: + // no contraint + } + + return true +} + type CSIVolumeChecker struct { ctx Context namespace string diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 3351210c2ee..18a8153e83c 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -91,7 +91,7 @@ func TestRandomIterator(t *testing.T) { } } -func TestHostVolumeChecker(t *testing.T) { +func TestHostVolumeChecker_Static(t *testing.T) { ci.Parallel(t) _, ctx := testContext(t) @@ -184,7 +184,7 @@ func TestHostVolumeChecker(t *testing.T) { } } -func TestHostVolumeChecker_ReadOnly(t *testing.T) { +func TestHostVolumeChecker_Dynamic(t *testing.T) { ci.Parallel(t) store, ctx := testContext(t) @@ -284,6 +284,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { "foo": { Type: "host", Source: "foo", + ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, }, @@ -475,6 +476,152 @@ func TestHostVolumeChecker_Sticky(t *testing.T) { } } +// TestDynamicHostVolumeIsAvailable provides fine-grained coverage of the +// hostVolumeIsAvailable method +func TestDynamicHostVolumeIsAvailable(t *testing.T) { + + store, ctx := testContext(t) + + allCaps := []*structs.HostVolumeCapability{} + + for _, accessMode := range []structs.HostVolumeAccessMode{ + structs.HostVolumeAccessModeSingleNodeReader, + structs.HostVolumeAccessModeSingleNodeWriter, + structs.HostVolumeAccessModeSingleNodeSingleWriter, + structs.HostVolumeAccessModeSingleNodeMultiWriter, + } { + for _, attachMode := range []structs.HostVolumeAttachmentMode{ + structs.HostVolumeAttachmentModeFilesystem, + structs.HostVolumeAttachmentModeBlockDevice, + } { + allCaps = append(allCaps, &structs.HostVolumeCapability{ + AttachmentMode: attachMode, + AccessMode: accessMode, + }) + } + } + + jobReader, jobWriter := mock.Job(), mock.Job() + jobReader.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "example": { + Type: structs.VolumeTypeHost, + Source: "example", + ReadOnly: true, + }, + } + jobWriter.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "example": { + Type: structs.VolumeTypeHost, + Source: "example", + }, + } + index, _ := store.LatestIndex() + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobReader)) + index++ + must.NoError(t, store.UpsertJob(structs.MsgTypeTestSetup, index, nil, jobWriter)) + + allocReader0, allocReader1 := mock.Alloc(), mock.Alloc() + allocReader0.JobID = jobReader.ID + allocReader1.JobID = jobReader.ID + + allocWriter0, allocWriter1 := mock.Alloc(), mock.Alloc() + allocWriter0.JobID = jobWriter.ID + allocWriter1.JobID = jobWriter.ID + + index++ + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{allocReader0, allocReader1, allocWriter0, allocWriter1})) + + testCases := []struct { + name string + hasProposed []*structs.Allocation + hasCaps []*structs.HostVolumeCapability + wantAccess structs.HostVolumeAccessMode + wantAttach structs.HostVolumeAttachmentMode + readOnly bool + expect bool + }{ + { + name: "enforce attachment mode", + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce read only", + hasProposed: []*structs.Allocation{allocReader0, allocReader1}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + expect: false, + }, + { + name: "enforce read only ok", + hasProposed: []*structs.Allocation{allocReader0, allocReader1}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeReader, + readOnly: true, + expect: true, + }, + { + name: "enforce single writer", + hasProposed: []*structs.Allocation{allocReader0, allocReader1, allocWriter0}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: false, + }, + { + name: "enforce single writer ok", + hasProposed: []*structs.Allocation{allocReader0, allocReader1}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeSingleWriter, + expect: true, + }, + { + name: "multi writer is always ok", + hasProposed: []*structs.Allocation{allocReader0, allocWriter0, allocWriter1}, + wantAttach: structs.HostVolumeAttachmentModeFilesystem, + wantAccess: structs.HostVolumeAccessModeSingleNodeMultiWriter, + expect: true, + }, + { + name: "default capabilities ok", + expect: true, + }, + { + name: "default capabilities fail", + readOnly: true, + hasCaps: []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeBlockDevice, + AccessMode: structs.HostVolumeAccessModeSingleNodeSingleWriter, + }}, + expect: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + vol := &structs.HostVolume{ + Name: "example", + State: structs.HostVolumeStateReady, + } + if len(tc.hasCaps) > 0 { + vol.RequestedCapabilities = tc.hasCaps + } else { + vol.RequestedCapabilities = allCaps + } + checker := NewHostVolumeChecker(ctx) + must.Eq(t, tc.expect, checker.hostVolumeIsAvailable( + vol, tc.wantAccess, tc.wantAttach, tc.readOnly, tc.hasProposed)) + }) + } + +} + func TestCSIVolumeChecker(t *testing.T) { ci.Parallel(t) state, ctx := testContext(t) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 5d471423136..3d236b5d289 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -218,7 +218,7 @@ func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { } } -func TestServiceSched_JobRegister_StickyVolumes(t *testing.T) { +func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) { ci.Parallel(t) h := NewHarness(t)