Skip to content

Commit

Permalink
dynamic host volumes: capabilities check during scheduling (#24617)
Browse files Browse the repository at this point in the history
Static host volumes have a simple readonly toggle, but dynamic host volumes have
a more complex set of capabilities similar to CSI volumes. Update the
feasibility checker to account for these capabilities and volume readiness.

Also fixes a minor bug in the state store where a soft-delete (not yet
implemented) could cause a volume to be marked ready again. This is needed to
support testing the readiness checking in the scheduler.

Ref: #24479
  • Loading branch information
tgross committed Dec 13, 2024
1 parent d579dae commit d336e8a
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 64 deletions.
12 changes: 10 additions & 2 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,17 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err
if node == nil {
return fmt.Errorf("host volume %s has nonexistent node ID %s", vol.ID, vol.NodeID)
}
if _, ok := node.HostVolumes[vol.Name]; ok {
vol.State = structs.HostVolumeStateReady
switch vol.State {
case structs.HostVolumeStateDeleted:
// no-op: don't allow soft-deletes to resurrect a previously fingerprinted volume
default:
// prevent a race between node fingerprint and create RPC that could
// switch a ready volume back to pending
if _, ok := node.HostVolumes[vol.Name]; ok {
vol.State = structs.HostVolumeStateReady
}
}

// Register RPCs for new volumes may not have the node pool set
vol.NodePool = node.NodePool

Expand Down
76 changes: 42 additions & 34 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,40 +137,38 @@ func NewRandomIterator(ctx Context, nodes []*structs.Node) *StaticIterator {
// HostVolumeChecker is a FeasibilityChecker which returns whether a node has
// the host volumes necessary to schedule a task group.
type HostVolumeChecker struct {
ctx Context

// volumes is a map[HostVolumeName][]RequestedVolume. The requested volumes are
// a slice because a single task group may request the same volume multiple times.
volumes map[string][]*structs.VolumeRequest
ctx Context
volumeReqs []*structs.VolumeRequest
namespace string
}

// NewHostVolumeChecker creates a HostVolumeChecker from a set of volumes
func NewHostVolumeChecker(ctx Context) *HostVolumeChecker {
return &HostVolumeChecker{
ctx: ctx,
ctx: ctx,
volumeReqs: []*structs.VolumeRequest{},
}
}

// SetVolumes takes the volumes required by a task group and updates the checker.
func (h *HostVolumeChecker) SetVolumes(allocName string, volumes map[string]*structs.VolumeRequest) {
lookupMap := make(map[string][]*structs.VolumeRequest)
// Convert the map from map[DesiredName]Request to map[Source][]Request to improve
// lookup performance. Also filter non-host volumes.
func (h *HostVolumeChecker) SetVolumes(allocName string, ns string, volumes map[string]*structs.VolumeRequest) {
h.namespace = ns
h.volumeReqs = []*structs.VolumeRequest{}
for _, req := range volumes {
if req.Type != structs.VolumeTypeHost {
continue
continue // filter CSI volumes
}

if req.PerAlloc {
// provide a unique volume source per allocation
copied := req.Copy()
copied.Source = copied.Source + structs.AllocSuffix(allocName)
lookupMap[copied.Source] = append(lookupMap[copied.Source], copied)
h.volumeReqs = append(h.volumeReqs, copied)

} else {
lookupMap[req.Source] = append(lookupMap[req.Source], req)
h.volumeReqs = append(h.volumeReqs, req)
}
}
h.volumes = lookupMap
}

func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
Expand All @@ -183,35 +181,45 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
}

func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
rLen := len(h.volumes)
hLen := len(n.HostVolumes)

// Fast path: Requested no volumes. No need to check further.
if rLen == 0 {
if len(h.volumeReqs) == 0 {
return true
}

// Fast path: Requesting more volumes than the node has, can't meet the criteria.
if rLen > hLen {
return false
}

for source, requests := range h.volumes {
nodeVolume, ok := n.HostVolumes[source]
for _, req := range h.volumeReqs {
volCfg, ok := n.HostVolumes[req.Source]
if !ok {
return false
}

// If the volume supports being mounted as ReadWrite, we do not need to
// do further validation for readonly placement.
if !nodeVolume.ReadOnly {
continue
}

// The Volume can only be mounted ReadOnly, validate that no requests for
// it are ReadWrite.
for _, req := range requests {
if !req.ReadOnly {
if volCfg.ID != "" { // dynamic host volume
vol, err := h.ctx.State().HostVolumeByID(nil, h.namespace, volCfg.ID, false)
if err != nil || vol == nil {
// node fingerprint has a dynamic volume that's no longer in the
// state store; this is only possible if the batched fingerprint
// update from a delete RPC is written before the delete RPC's
// raft entry completes
return false
}
if vol.State != structs.HostVolumeStateReady {
return false
}
var capOk bool
for _, cap := range vol.RequestedCapabilities {
if req.AccessMode == structs.CSIVolumeAccessMode(cap.AccessMode) &&
req.AttachmentMode == structs.CSIVolumeAttachmentMode(cap.AttachmentMode) {
capOk = true
break
}
}
if !capOk {
return false
}
} else if !req.ReadOnly {
// this is a static host volume and can only be mounted ReadOnly,
// validate that no requests for it are ReadWrite.
if volCfg.ReadOnly {
return false
}
}
Expand Down
160 changes: 134 additions & 26 deletions scheduler/feasible_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestHostVolumeChecker(t *testing.T) {
alloc.NodeID = nodes[2].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
Expand All @@ -187,10 +187,54 @@ func TestHostVolumeChecker(t *testing.T) {
func TestHostVolumeChecker_ReadOnly(t *testing.T) {
ci.Parallel(t)

_, ctx := testContext(t)
store, ctx := testContext(t)

nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}

hostVolCapsReadWrite := []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
},
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
}
hostVolCapsReadOnly := []*structs.HostVolumeCapability{{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeReader,
}}

dhvNotReady := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[2].ID,
RequestedCapabilities: hostVolCapsReadOnly,
State: structs.HostVolumeStateDeleted,
}
dhvReadOnly := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[3].ID,
RequestedCapabilities: hostVolCapsReadOnly,
State: structs.HostVolumeStateReady,
}
dhvReadWrite := &structs.HostVolume{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Name: "foo",
NodeID: nodes[4].ID,
RequestedCapabilities: hostVolCapsReadWrite,
State: structs.HostVolumeStateReady,
}

nodes[0].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
Expand All @@ -203,6 +247,23 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
ReadOnly: false,
},
}
nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {ID: dhvNotReady.ID},
}
nodes[3].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {ID: dhvReadOnly.ID},
}
nodes[4].HostVolumes = map[string]*structs.ClientHostVolumeConfig{
"foo": {ID: dhvReadWrite.ID},
}

for _, node := range nodes {
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node))
}

must.NoError(t, store.UpsertHostVolume(1000, dhvNotReady))
must.NoError(t, store.UpsertHostVolume(1000, dhvReadOnly))
must.NoError(t, store.UpsertHostVolume(1000, dhvReadWrite))

readwriteRequest := map[string]*structs.VolumeRequest{
"foo": {
Expand All @@ -219,42 +280,89 @@ func TestHostVolumeChecker_ReadOnly(t *testing.T) {
},
}

dhvReadOnlyRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}
dhvReadWriteRequest := map[string]*structs.VolumeRequest{
"foo": {
Type: "host",
Source: "foo",
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
}

checker := NewHostVolumeChecker(ctx)
cases := []struct {
Node *structs.Node
RequestedVolumes map[string]*structs.VolumeRequest
Result bool
name string
node *structs.Node
requestedVolumes map[string]*structs.VolumeRequest
expect bool
}{
{ // ReadWrite Request, ReadOnly Host
Node: nodes[0],
RequestedVolumes: readwriteRequest,
Result: false,
{
name: "read-write request / read-only host",
node: nodes[0],
requestedVolumes: readwriteRequest,
expect: false,
},
{ // ReadOnly Request, ReadOnly Host
Node: nodes[0],
RequestedVolumes: readonlyRequest,
Result: true,
{
name: "read-only request / read-only host",
node: nodes[0],
requestedVolumes: readonlyRequest,
expect: true,
},
{ // ReadOnly Request, ReadWrite Host
Node: nodes[1],
RequestedVolumes: readonlyRequest,
Result: true,
{
name: "read-only request / read-write host",
node: nodes[1],
requestedVolumes: readonlyRequest,
expect: true,
},
{ // ReadWrite Request, ReadWrite Host
Node: nodes[1],
RequestedVolumes: readwriteRequest,
Result: true,
{
name: "read-write request / read-write host",
node: nodes[1],
requestedVolumes: readwriteRequest,
expect: true,
},
{
name: "dynamic single-reader request / host not ready",
node: nodes[2],
requestedVolumes: dhvReadOnlyRequest,
expect: false,
},
{
name: "dynamic single-reader request / caps match",
node: nodes[3],
requestedVolumes: dhvReadOnlyRequest,
expect: true,
},
{
name: "dynamic single-reader request / no matching cap",
node: nodes[4],
requestedVolumes: dhvReadOnlyRequest,
expect: true,
},
{
name: "dynamic single-writer request / caps match",
node: nodes[4],
requestedVolumes: dhvReadWriteRequest,
expect: true,
},
}

alloc := mock.Alloc()
alloc.NodeID = nodes[1].ID

for i, c := range cases {
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
checker.SetVolumes(alloc.Name, structs.DefaultNamespace, tc.requestedVolumes)
actual := checker.Feasible(tc.node)
must.Eq(t, tc.expect, actual)
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type State interface {
// CSIVolumeByID fetch CSI volumes, containing controller jobs
CSIVolumesByNodeID(memdb.WatchSet, string, string) (memdb.ResultIterator, error)

HostVolumeByID(memdb.WatchSet, string, string, bool) (*structs.HostVolume, error)

HostVolumesByNodeID(memdb.WatchSet, string, state.SortOption) (memdb.ResultIterator, error)

// LatestIndex returns the greatest index value for all indexes.
LatestIndex() (uint64, error)
}
Expand Down
Loading

0 comments on commit d336e8a

Please sign in to comment.