diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index bd01129f314..7e55e6ced43 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -84,9 +84,17 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err if node == nil { return fmt.Errorf("host volume %s has nonexistent node ID %s", vol.ID, vol.NodeID) } - if _, ok := node.HostVolumes[vol.Name]; ok { - vol.State = structs.HostVolumeStateReady + switch vol.State { + case structs.HostVolumeStateDeleted: + // no-op: don't allow soft-deletes to resurrect a previously fingerprinted volume + default: + // prevent a race between node fingerprint and create RPC that could + // switch a ready volume back to pending + if _, ok := node.HostVolumes[vol.Name]; ok { + vol.State = structs.HostVolumeStateReady + } } + // Register RPCs for new volumes may not have the node pool set vol.NodePool = node.NodePool diff --git a/scheduler/feasible.go b/scheduler/feasible.go index e6e7c81d4a3..60442f92e7f 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -137,40 +137,38 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator { // HostVolumeChecker is a FeasibilityChecker which returns whether a node has // the host volumes necessary to schedule a task group. type HostVolumeChecker struct { - ctx Context - - // volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are - // a slice because a single task group may request the same volume multiple times. - volumes map[string][]*structs.VolumeRequest + ctx Context + volumeReqs []*structs.VolumeRequest + namespace string } // NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes func NewHostVolumeChecker(ctx Context) *HostVolumeChecker { return &HostVolumeChecker{ - ctx: ctx, + ctx: ctx, + volumeReqs: []*structs.VolumeRequest{}, } } // SetVolumes takes the volumes required by a task group and updates the checker. -func (h *HostVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) { - lookupMap := make(map[string][]*structs.VolumeRequest) - // Convert the map from map[DesiredName]Request to map[Source][]Request to improve - // lookup performance. Also filter non-host volumes. +func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) { + h.namespace = ns + h.volumeReqs = []*structs.VolumeRequest{} for _, req := range volumes { if req.Type != structs.VolumeTypeHost { - continue + continue // filter CSI volumes } if req.PerAlloc { // provide a unique volume source per allocation copied := req.Copy() copied.Source = copied.Source + structs.AllocSuffix(allocName) - lookupMap[copied.Source] = append(lookupMap[copied.Source], copied) + h.volumeReqs = append(h.volumeReqs, copied) + } else { - lookupMap[req.Source] = append(lookupMap[req.Source], req) + h.volumeReqs = append(h.volumeReqs, req) } } - h.volumes = lookupMap } func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { @@ -183,35 +181,45 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool { } func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool { - rLen := len(h.volumes) - hLen := len(n.HostVolumes) // Fast path: Requested no volumes. No need to check further. - if rLen == 0 { + if len(h.volumeReqs) == 0 { return true } - // Fast path: Requesting more volumes than the node has, can't meet the criteria. - if rLen > hLen { - return false - } - - for source, requests := range h.volumes { - nodeVolume, ok := n.HostVolumes[source] + for _, req := range h.volumeReqs { + volCfg, ok := n.HostVolumes[req.Source] if !ok { return false } - // If the volume supports being mounted as ReadWrite, we do not need to - // do further validation for readonly placement. - if !nodeVolume.ReadOnly { - continue - } - - // The Volume can only be mounted ReadOnly, validate that no requests for - // it are ReadWrite. - for _, req := range requests { - if !req.ReadOnly { + if volCfg.ID != "" { // dynamic host volume + vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false) + if err != nil || vol == nil { + // node fingerprint has a dynamic volume that's no longer in the + // state store; this is only possible if the batched fingerprint + // update from a delete RPC is written before the delete RPC's + // raft entry completes + return false + } + if vol.State != structs.HostVolumeStateReady { + return false + } + var capOk bool + for _, cap := range vol.RequestedCapabilities { + if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) && + req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) { + capOk = true + break + } + } + if !capOk { + return false + } + } else if !req.ReadOnly { + // this is a static host volume and can only be mounted ReadOnly, + // validate that no requests for it are ReadWrite. + if volCfg.ReadOnly { return false } } diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 4e887752989..9c5a9aaf1a7 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) { alloc.NodeID = nodes[2].ID for i, c := range cases { - checker.SetVolumes(alloc.Name, c.RequestedVolumes) + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes) if act := checker.Feasible(c.Node); act != c.Result { t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) } @@ -187,10 +187,54 @@ func TestHostVolumeChecker(t *testing.T) { func TestHostVolumeChecker_ReadOnly(t *testing.T) { ci.Parallel(t) - _, ctx := testContext(t) + store, ctx := testContext(t) + nodes := []*structs.Node{ mock.Node(), mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + + hostVolCapsReadWrite := []*structs.HostVolumeCapability{ + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeReader, + }, + { + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeWriter, + }, + } + hostVolCapsReadOnly := []*structs.HostVolumeCapability{{ + AttachmentMode: structs.HostVolumeAttachmentModeFilesystem, + AccessMode: structs.HostVolumeAccessModeSingleNodeReader, + }} + + dhvNotReady := &structs.HostVolume{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Name: "foo", + NodeID: nodes[2].ID, + RequestedCapabilities: hostVolCapsReadOnly, + State: structs.HostVolumeStateDeleted, + } + dhvReadOnly := &structs.HostVolume{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Name: "foo", + NodeID: nodes[3].ID, + RequestedCapabilities: hostVolCapsReadOnly, + State: structs.HostVolumeStateReady, + } + dhvReadWrite := &structs.HostVolume{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Name: "foo", + NodeID: nodes[4].ID, + RequestedCapabilities: hostVolCapsReadWrite, + State: structs.HostVolumeStateReady, } nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ @@ -203,6 +247,23 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { ReadOnly: false, }, } + nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {ID: dhvNotReady.ID}, + } + nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {ID: dhvReadOnly.ID}, + } + nodes[4].HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "foo": {ID: dhvReadWrite.ID}, + } + + for _, node := range nodes { + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)) + } + + must.NoError(t, store.UpsertHostVolume(1000, dhvNotReady)) + must.NoError(t, store.UpsertHostVolume(1000, dhvReadOnly)) + must.NoError(t, store.UpsertHostVolume(1000, dhvReadWrite)) readwriteRequest := map[string]*structs.VolumeRequest{ "foo": { @@ -219,42 +280,89 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) { }, } + dhvReadOnlyRequest := map[string]*structs.VolumeRequest{ + "foo": { + Type: "host", + Source: "foo", + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }, + } + dhvReadWriteRequest := map[string]*structs.VolumeRequest{ + "foo": { + Type: "host", + Source: "foo", + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }, + } + checker := NewHostVolumeChecker(ctx) cases := []struct { - Node *structs.Node - RequestedVolumes map[string]*structs.VolumeRequest - Result bool + name string + node *structs.Node + requestedVolumes map[string]*structs.VolumeRequest + expect bool }{ - { // ReadWrite Request, ReadOnly Host - Node: nodes[0], - RequestedVolumes: readwriteRequest, - Result: false, + { + name: "read-write request / read-only host", + node: nodes[0], + requestedVolumes: readwriteRequest, + expect: false, }, - { // ReadOnly Request, ReadOnly Host - Node: nodes[0], - RequestedVolumes: readonlyRequest, - Result: true, + { + name: "read-only request / read-only host", + node: nodes[0], + requestedVolumes: readonlyRequest, + expect: true, }, - { // ReadOnly Request, ReadWrite Host - Node: nodes[1], - RequestedVolumes: readonlyRequest, - Result: true, + { + name: "read-only request / read-write host", + node: nodes[1], + requestedVolumes: readonlyRequest, + expect: true, }, - { // ReadWrite Request, ReadWrite Host - Node: nodes[1], - RequestedVolumes: readwriteRequest, - Result: true, + { + name: "read-write request / read-write host", + node: nodes[1], + requestedVolumes: readwriteRequest, + expect: true, + }, + { + name: "dynamic single-reader request / host not ready", + node: nodes[2], + requestedVolumes: dhvReadOnlyRequest, + expect: false, + }, + { + name: "dynamic single-reader request / caps match", + node: nodes[3], + requestedVolumes: dhvReadOnlyRequest, + expect: true, + }, + { + name: "dynamic single-reader request / no matching cap", + node: nodes[4], + requestedVolumes: dhvReadOnlyRequest, + expect: true, + }, + { + name: "dynamic single-writer request / caps match", + node: nodes[4], + requestedVolumes: dhvReadWriteRequest, + expect: true, }, } alloc := mock.Alloc() alloc.NodeID = nodes[1].ID - for i, c := range cases { - checker.SetVolumes(alloc.Name, c.RequestedVolumes) - if act := checker.Feasible(c.Node); act != c.Result { - t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result) - } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes) + actual := checker.Feasible(tc.node) + must.Eq(t, tc.expect, actual) + }) } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 7e22070966f..9d46edf8801 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -118,6 +118,10 @@ type State interface { // CSIVolumeByID fetch CSI volumes, containing controller jobs CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error) + HostVolumeByID(memdb.WatchSet, string, string, bool) (*structs.HostVolume, error) + + HostVolumesByNodeID(memdb.WatchSet, string, state.SortOption) (memdb.ResultIterator, error) + // LatestIndex returns the greatest index value for all indexes. LatestIndex() (uint64, error) } diff --git a/scheduler/stack.go b/scheduler/stack.go index 5c897ddf2de..1f2b6586886 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -51,6 +51,7 @@ type GenericStack struct { wrappedChecks *FeasibilityWrapper quota FeasibleIterator jobVersion *uint64 + jobNamespace string jobConstraint *ConstraintChecker taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker @@ -101,6 +102,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { jobVer := job.Version s.jobVersion = &jobVer + s.jobNamespace = job.Namespace s.jobConstraint.SetConstraints(job.Constraints) s.distinctHostsConstraint.SetJob(job) @@ -154,7 +156,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0]) @@ -202,6 +204,7 @@ type SystemStack struct { ctx Context source *StaticIterator + jobNamespace string wrappedChecks *FeasibilityWrapper quota FeasibleIterator jobConstraint *ConstraintChecker @@ -313,6 +316,7 @@ func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { } func (s *SystemStack) SetJob(job *structs.Job) { + s.jobNamespace = job.Namespace s.jobConstraint.SetConstraints(job.Constraints) s.distinctPropertyConstraint.SetJob(job) s.binPack.SetJob(job) @@ -345,7 +349,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) s.taskGroupDevices.SetTaskGroup(tg) - s.taskGroupHostVolumes.SetVolumes(options.AllocName, tg.Volumes) + s.taskGroupHostVolumes.SetVolumes(options.AllocName, s.jobNamespace, tg.Volumes) s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes) if len(tg.Networks) > 0 { s.taskGroupNetwork.SetNetwork(tg.Networks[0])