Skip to content

Commit

Permalink
CSI: Scheduler knows about CSI constraints and availability (#6995)
Browse files Browse the repository at this point in the history
* structs: piggyback csi volumes on host volumes for job specs

* state_store: CSIVolumeByID always includes plugins, matches usecase

* scheduler/feasible: csi volume checker

* scheduler/stack: add csi volumes

* contributing: update rpc checklist

* scheduler: add volumes to State interface

* scheduler/feasible: introduce new checker collection tgAvailable

* scheduler/stack: taskGroupCSIVolumes checker is transient

* state_store CSIVolumeDenormalizePlugins comment clarity

* structs: remote TODO comment in TaskGroup Validate

* scheduler/feasible: CSIVolumeChecker hasPlugins improve comment

* scheduler/feasible_test: set t.Parallel

* Update nomad/state/state_store.go

Co-Authored-By: Danielle <[email protected]>

* Update scheduler/feasible.go

Co-Authored-By: Danielle <[email protected]>

* structs: lift ControllerRequired to each volume

* state_store: store plug.ControllerRequired, use it for volume health

* feasible: csi match fast path remove stale host volume copied logic

* scheduler/feasible: improve comments

Co-authored-by: Danielle <[email protected]>
  • Loading branch information
2 people authored and tgross committed Mar 9, 2020
1 parent 2bf383e commit 029c5b5
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 40 deletions.
7 changes: 7 additions & 0 deletions contributing/checklist-rpc-endpoint.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ Prefer adding a new message to changing any existing RPC messages.
* [ ] `Request` struct and `*RequestType` constant in
`nomad/structs/structs.go`. Append the constant, old constant
values must remain unchanged

* [ ] In `nomad/fsm.go`, add a dispatch case to the switch statement in `(n *nomadFSM) Apply`
* `*nomadFSM` method to decode the request and call the state method

* [ ] State method for modifying objects in a `Txn` in `nomad/state/state_store.go`
* `nomad/state/state_store_test.go`

* [ ] Handler for the request in `nomad/foo_endpoint.go`
* RPCs are resolved by matching the method name for bound structs
[net/rpc](https://golang.org/pkg/net/rpc/)
* Check ACLs for security, list endpoints filter by ACL
* Register new RPC struct in `nomad/server.go`
* Check ACLs to enforce security

* Wrapper for the HTTP request in `command/agent/foo_endpoint.go`
* Backwards compatibility requires a new endpoint, an upgraded
client or server may be forwarding this request to an old server,
without support for the new RPC
* RPCs triggered by an internal process may not need support
* Check ACLs as an optimization

* [ ] `nomad/core_sched.go` sends many RPCs
* `ServersMeetMinimumVersion` asserts that the server cluster is
upgraded, so use this to gaurd sending the new RPC, else send the old RPC
Expand Down
35 changes: 13 additions & 22 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
plug = raw.(*structs.CSIPlugin).Copy(index)
} else {
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.ControllerRequired = info.RequiresControllerPlugin
}

plug.AddPlugin(node.ID, info, index)
Expand Down Expand Up @@ -1647,7 +1648,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
return nil
}

// CSIVolumeByID is used to lookup a single volume
// CSIVolumeByID is used to lookup a single volume. Its plugins are denormalized to provide accurate Health
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVolume, error) {
txn := s.db.Txn(false)

Expand All @@ -1662,10 +1663,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVo
}

vol := obj.(*structs.CSIVolume)
// Health data is stale, so set this volume unhealthy until it's denormalized
vol.Healthy = false

return vol, nil
return s.CSIVolumeDenormalizePlugins(ws, vol)
}

// CSIVolumes looks up csi_volumes by pluginID
Expand Down Expand Up @@ -1839,21 +1837,10 @@ func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *me
return nil
}

// CSIVolumeDenormalize takes a CSIVolume and denormalizes for the API
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}

vol, err := s.CSIVolumeDenormalizePlugins(ws, vol)
if err != nil {
return nil, err
}

return s.csiVolumeDenormalizeAllocs(ws, vol)
}

// CSIVolumeDenormalize returns a CSIVolume with current health and plugins
// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and plugins, but
// without allocations
// Use this for current volume metadata, handling lists of volumes
// Use CSIVolumeDenormalize for volumes containing both health and current allocations
func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
Expand All @@ -1874,20 +1861,24 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
return vol, nil
}

vol.ControllerRequired = plug.ControllerRequired
vol.ControllersHealthy = plug.ControllersHealthy
vol.NodesHealthy = plug.NodesHealthy
// This number is incorrect! The expected number of node plugins is actually this +
// the number of blocked evaluations for the jobs controlling these plugins
vol.ControllersExpected = len(plug.Controllers)
vol.NodesExpected = len(plug.Nodes)

vol.Healthy = vol.ControllersHealthy > 0 && vol.NodesHealthy > 0
vol.Healthy = vol.NodesHealthy > 0
if vol.ControllerRequired {
vol.Healthy = vol.ControllersHealthy > 0 && vol.Healthy
}

return vol, nil
}

// csiVolumeDenormalizeAllocs returns a CSIVolume with allocations
func (s *StateStore) csiVolumeDenormalizeAllocs(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
for id := range vol.ReadAllocs {
a, err := s.AllocByID(ws, id)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type CSIVolume struct {
Healthy bool
VolumeGC time.Time
PluginID string
ControllerRequired bool
ControllersHealthy int
ControllersExpected int
NodesHealthy int
Expand Down Expand Up @@ -457,6 +458,7 @@ type CSIPlugin struct {
// controller tasks for this plugin. It is addressed by [job.Namespace][job.ID]
Jobs map[string]map[string]*Job

ControllerRequired bool
ControllersHealthy int
Controllers map[string]*CSIInfo
NodesHealthy int
Expand Down
4 changes: 2 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5068,8 +5068,8 @@ func (tg *TaskGroup) Validate(j *Job) error {

// Validate the Host Volumes
for name, decl := range tg.Volumes {
if decl.Type != VolumeTypeHost {
// TODO: Remove this error when adding new volume types
if !(decl.Type == VolumeTypeHost ||
decl.Type == VolumeTypeCSI) {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Volume %s has unrecognised type %s", name, decl.Type))
continue
}
Expand Down
112 changes: 105 additions & 7 deletions scheduler/feasible.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@ import (
"strconv"
"strings"

memdb "github.com/hashicorp/go-memdb"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/constraints/semver"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)

const (
FilterConstraintHostVolumes = "missing compatible host volumes"
FilterConstraintCSIVolumes = "missing CSI plugins"
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
)

// FeasibleIterator is used to iteratively yield nodes that
// match feasibility constraints. The iterators may manage
// some state for performance optimizations.
Expand Down Expand Up @@ -61,14 +69,14 @@ func (iter *StaticIterator) Next() *structs.Node {
// Check if exhausted
n := len(iter.nodes)
if iter.offset == n || iter.seen == n {
if iter.seen != n {
if iter.seen != n { // seen has been Reset() to 0
iter.offset = 0
} else {
return nil
}
}

// Return the next offset
// Return the next offset, use this one
offset := iter.offset
iter.offset += 1
iter.seen += 1
Expand Down Expand Up @@ -135,7 +143,7 @@ func (h *HostVolumeChecker) Feasible(candidate *structs.Node) bool {
return true
}

h.ctx.Metrics().FilterNode(candidate, "missing compatible host volumes")
h.ctx.Metrics().FilterNode(candidate, FilterConstraintHostVolumes)
return false
}

Expand Down Expand Up @@ -177,6 +185,67 @@ func (h *HostVolumeChecker) hasVolumes(n *structs.Node) bool {
return true
}

type CSIVolumeChecker struct {
ctx Context
volumes map[string]*structs.VolumeRequest
}

func NewCSIVolumeChecker(ctx Context) *CSIVolumeChecker {
return &CSIVolumeChecker{
ctx: ctx,
}
}

func (c *CSIVolumeChecker) SetVolumes(volumes map[string]*structs.VolumeRequest) {
c.volumes = volumes
}

func (c *CSIVolumeChecker) Feasible(n *structs.Node) bool {
if c.hasPlugins(n) {
return true
}

c.ctx.Metrics().FilterNode(n, FilterConstraintCSIVolumes)
return false
}

func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) bool {
// We can mount the volume if
// - if required, a healthy controller plugin is running the driver
// - the volume has free claims
// - this node is running the node plugin, implies matching topology

// Fast path: Requested no volumes. No need to check further.
if len(c.volumes) == 0 {
return true
}

ws := memdb.NewWatchSet()
for _, req := range c.volumes {
// Check that this node has a healthy running plugin with the right PluginID
plugin, ok := n.CSINodePlugins[req.Name]
if !(ok && plugin.Healthy) {
return false
}

// Get the volume to check that it's healthy (there's a healthy controller
// and the volume hasn't encountered an error or been marked for GC
vol, err := c.ctx.State().CSIVolumeByID(ws, req.Source)

if err != nil || vol == nil {
return false
}

if (req.ReadOnly && !vol.CanReadOnly()) ||
!vol.CanWrite() {
return false
}

}

return true
}

// DriverChecker is a FeasibilityChecker which returns whether a node has the
// drivers necessary to scheduler a task group.
type DriverChecker struct {
Expand All @@ -201,7 +270,7 @@ func (c *DriverChecker) Feasible(option *structs.Node) bool {
if c.hasDrivers(option) {
return true
}
c.ctx.Metrics().FilterNode(option, "missing drivers")
c.ctx.Metrics().FilterNode(option, FilterConstraintDrivers)
return false
}

Expand Down Expand Up @@ -780,18 +849,20 @@ type FeasibilityWrapper struct {
source FeasibleIterator
jobCheckers []FeasibilityChecker
tgCheckers []FeasibilityChecker
tgAvailable []FeasibilityChecker
tg string
}

// NewFeasibilityWrapper returns a FeasibleIterator based on the passed source
// and FeasibilityCheckers.
func NewFeasibilityWrapper(ctx Context, source FeasibleIterator,
jobCheckers, tgCheckers []FeasibilityChecker) *FeasibilityWrapper {
jobCheckers, tgCheckers, tgAvailable []FeasibilityChecker) *FeasibilityWrapper {
return &FeasibilityWrapper{
ctx: ctx,
source: source,
jobCheckers: jobCheckers,
tgCheckers: tgCheckers,
tgAvailable: tgAvailable,
}
}

Expand Down Expand Up @@ -858,7 +929,12 @@ OUTER:
continue
case EvalComputedClassEligible:
// Fast path the eligible case
return option
if w.available(option) {
return option
}
// We match the class but are temporarily unavailable, the eval
// should be blocked
return nil
case EvalComputedClassEscaped:
tgEscaped = true
case EvalComputedClassUnknown:
Expand All @@ -884,10 +960,32 @@ OUTER:
evalElig.SetTaskGroupEligibility(true, w.tg, option.ComputedClass)
}

// tgAvailable handlers are available transiently, so we test them without
// affecting the computed class
if !w.available(option) {
continue OUTER
}

return option
}
}

// available checks transient feasibility checkers which depend on changing conditions,
// e.g. the health status of a plugin or driver
func (w *FeasibilityWrapper) available(option *structs.Node) bool {
// If we don't have any availability checks, we're available
if len(w.tgAvailable) == 0 {
return true
}

for _, check := range w.tgAvailable {
if !check.Feasible(option) {
return false
}
}
return true
}

// DeviceChecker is a FeasibilityChecker which returns whether a node has the
// devices necessary to scheduler a task group.
type DeviceChecker struct {
Expand Down Expand Up @@ -920,7 +1018,7 @@ func (c *DeviceChecker) Feasible(option *structs.Node) bool {
return true
}

c.ctx.Metrics().FilterNode(option, "missing devices")
c.ctx.Metrics().FilterNode(option, FilterConstraintDevices)
return false
}

Expand Down
Loading

0 comments on commit 029c5b5

Please sign in to comment.