From 4b3f06c0ef9ed3c272c8a4435d57278bc34b58af Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Thu, 12 Dec 2024 18:06:21 +0100 Subject: [PATCH] make alloc ID available to the host volume checker --- scheduler/feasible.go | 79 ++++++++++++++++++++----------------- scheduler/feasible_test.go | 41 ++++++++++++++++--- scheduler/reconcile.go | 2 + scheduler/reconcile_util.go | 2 + scheduler/stack.go | 5 ++- 5 files changed, 84 insertions(+), 45 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 852e806695c..99d235d9fbd 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -21,6 +21,7 @@ import ( const ( FilterConstraintHostVolumes = "missing compatible host volumes" + FilterConstraintHostVolumesAllocLookupFailed = "sticky host volume allocation lookup failed" FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" @@ -139,22 +140,28 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { ctx Context - volumeReqs []*structs.VolumeRequest + volumeReqs []*allocVolumeRequest namespace string } +// allocVolumeRequest associates allocation ID with the volume request +type allocVolumeRequest struct { + allocID string + volumeReq *structs.VolumeRequest +} + // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { return &HostVolumeChecker{ ctx: ctx, - volumeReqs: []*structs.VolumeRequest{}, + volumeReqs: []*allocVolumeRequest{}, } } // SetVolumes takes the volumes required by a task group and updates the checker. -func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) { +func (h *HostVolumeChecker) SetVolumes(allocName, allocID string, ns string, volumes map[string]*structs.VolumeRequest) { h.namespace = ns - h.volumeReqs = []*structs.VolumeRequest{} + h.volumeReqs = []*allocVolumeRequest{} for _, req := range volumes { if req.Type != structs.VolumeTypeHost { continue // filter CSI volumes @@ -164,33 +171,35 @@ func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[ // provide a unique volume source per allocation copied := req.Copy() copied.Source = copied.Source + structs.AllocSuffix(allocName) - h.volumeReqs = append(h.volumeReqs, copied) + h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{allocID, copied}) } else { - h.volumeReqs = append(h.volumeReqs, req) + h.volumeReqs = append(h.volumeReqs, &allocVolumeRequest{allocID, req}) } } } func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { - if h.hasVolumes(candidate) { + feasible, failure := h.hasVolumes(candidate) + if feasible { return true } - h.ctx.Metrics().FilterNode(candidate, FilterConstraintHostVolumes) + h.ctx.Metrics().FilterNode(candidate, failure) return false } -func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { +func (h *HostVolumeChecker) hasVolumes(n *structs.Node) (bool, string) { // Fast path: Requested no volumes. No need to check further. if len(h.volumeReqs) == 0 { - return true + return true, "" } + ws := memdb.NewWatchSet() for _, req := range h.volumeReqs { - volCfg, ok := n.HostVolumes[req.Source] + volCfg, ok := n.HostVolumes[req.volumeReq.Source] if !ok { - return false + return false, FilterConstraintHostVolumes } if volCfg.ID != "" { // dynamic host volume @@ -200,55 +209,51 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { // state store; this is only possible if the batched fingerprint // update from a delete RPC is written before the delete RPC's // raft entry completes - return false + return false, FilterConstraintHostVolumes } if vol.State != structs.HostVolumeStateReady { - return false + return false, FilterConstraintHostVolumes } var capOk bool for _, cap := range vol.RequestedCapabilities { - if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && - req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { + if req.volumeReq.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && + req.volumeReq.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { capOk = true break } } if !capOk { - return false + return false, FilterConstraintHostVolumes } - if req.Sticky { - // NOTE: surely there is a better way to find the right alloc? - // Should we perhaps search for allocs by job? Could there be a - // situation in which there are non-terminal allocations - // belonging to the job in question that are sticky, have the - // volume IDs that match what the node offers and should not - // end up in this check? - allocs, err := h.ctx.ProposedAllocs(n.ID) + + if req.volumeReq.Sticky { + allocation, err := h.ctx.State().AllocByID(ws, req.allocID) if err != nil { - continue + return false, FilterConstraintHostVolumesAllocLookupFailed + } + if slices.Contains(allocation.HostVolumeIDs, vol.ID) { + return true, "" } - for _, a := range allocs { - if a.TerminalStatus() || a.NodeID != n.ID { - continue - } - - if !slices.Contains(a.HostVolumeIDs, volCfg.ID) { - return false - } + // if an allocation doesn't have a volume ID associated with + // it, update it + if len(allocation.HostVolumeIDs) == 0 { + allocation.HostVolumeIDs = []string{vol.ID} + // TODO: figure out how to update allocation. Should we + // have a new RPC endpoint for this? } } - } else if !req.ReadOnly { + } else if !req.volumeReq.ReadOnly { // this is a static host volume and can only be mounted ReadOnly, // validate that no requests for it are ReadWrite. if volCfg.ReadOnly { - return false + return false, FilterConstraintHostVolumes } } } - return true + return true, "" } type CSIVolumeChecker struct { diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 23311ec620f..1528baa4712 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes) + checker.SetVolumes(alloc.Name, alloc.ID, structs.DefaultNamespace, c.RequestedVolumes) if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } @@ -359,7 +359,7 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes) + checker.SetVolumes(alloc.Name, alloc.ID, structs.DefaultNamespace, tc.requestedVolumes) actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) @@ -416,30 +416,59 @@ func TestHostVolumeChecker_Sticky(t *testing.T) { checker := NewHostVolumeChecker(ctx) - alloc := mock.Alloc() - alloc.NodeID = nodes[1].ID - alloc.HostVolumeIDs = []string{dhv.ID} + // alloc0 wants a previously registered volume ID that's available on node1 + alloc0 := mock.Alloc() + alloc0.NodeID = nodes[1].ID + alloc0.HostVolumeIDs = []string{dhv.ID} + + // alloc1 wants a volume ID that's available on node1 but hasn't used it + // before + alloc1 := mock.Alloc() + alloc1.NodeID = nodes[1].ID + + // alloc2 wants a volume ID that's unrelated + alloc2 := mock.Alloc() + alloc2.NodeID = nodes[1].ID + alloc2.HostVolumeIDs = []string{uuid.Generate()} + + // insert all the allocs into the state + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc0, alloc1, alloc2})) cases := []struct { name string node *structs.Node + alloc *structs.Allocation expect bool }{ { "alloc asking for a sticky volume on an infeasible node", nodes[0], + alloc0, false, }, { "alloc asking for a sticky volume on a feasible node", nodes[1], + alloc0, true, }, + { + "alloc asking for a sticky volume on a feasible node for the first time", + nodes[1], + alloc1, + true, + }, + { + "alloc asking for an unrelated volume", + nodes[1], + alloc2, + false, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - checker.SetVolumes(alloc.Name, structs.DefaultNamespace, stickyRequest) + checker.SetVolumes(tc.alloc.Name, tc.alloc.ID, structs.DefaultNamespace, stickyRequest) actual := checker.Feasible(tc.node) must.Eq(t, tc.expect, actual) }) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index bf9241797c2..25dff2cfd65 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -805,6 +805,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, for _, alloc := range reschedule { place = append(place, allocPlaceResult{ name: alloc.Name, + id: alloc.ID, taskGroup: group, previousAlloc: alloc, reschedule: true, @@ -830,6 +831,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, existing++ place = append(place, allocPlaceResult{ name: alloc.Name, + id: alloc.ID, taskGroup: group, previousAlloc: alloc, reschedule: false, diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 41a56503c7e..0ca74f75f97 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -66,6 +66,7 @@ type allocStopResult struct { // allocation type allocPlaceResult struct { name string + id string canary bool taskGroup *structs.TaskGroup previousAlloc *structs.Allocation @@ -78,6 +79,7 @@ type allocPlaceResult struct { func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup } func (a allocPlaceResult) Name() string { return a.name } +func (a allocPlaceResult) ID() string { return a.id } func (a allocPlaceResult) Canary() bool { return a.canary } func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc } func (a allocPlaceResult) IsRescheduling() bool { return a.reschedule } diff --git a/scheduler/stack.go b/scheduler/stack.go index 1f2b6586886..39c8a8d0ef4 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -39,6 +39,7 @@ type SelectOptions struct { PreferredNodes []*structs.Node Preempt bool AllocName string + AllocID string } // GenericStack is the Stack used for the Generic scheduler. It is @@ -156,7 +157,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, options.AllocID, s.jobNamespace, tg.Volumes) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -349,7 +350,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, options.AllocID, s.jobNamespace, tg.Volumes) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])