Skip to content

Commit

Permalink
csi: fix plugin counts on node update (#7844)
Browse files Browse the repository at this point in the history
In this changeset:

* If a Nomad client node is running both a controller and a node
  plugin (which is a common case), then if only the controller or the
  node is removed, the plugin was not being updated with the correct
  counts.
* The existing test for plugin cleanup didn't go back to the state
  store, which normally is ok but is complicated in this case by
  denormalization which changes the behavior. This commit makes the
  test more comprehensive.
* Set "controller required" when plugin has `PUBLISH_READONLY`. All
  known controllers that support `PUBLISH_READONLY` also support
  `PUBLISH_UNPUBLISH_VOLUME` but we shouldn't assume this.
* Only create plugins when the allocs for those plugins are
  healthy. If we allow a plugin to be created for the first time when
  the alloc is not healthy, then we'll recreate deleted plugins when
  the job's allocs all get marked terminal.
* Terminal plugin alloc updates should cleanup the plugin. The client
  fingerprint can't tell if the plugin is unhealthy intentionally (for
  the case of updates or job stop). Allocations that are
  server-terminal should delete themselves from the plugin and trigger
  a plugin self-GC, the same as an unused node.
  • Loading branch information
tgross authored May 5, 2020
1 parent 1585d34 commit 1531db8
Show file tree
Hide file tree
Showing 4 changed files with 499 additions and 65 deletions.
4 changes: 3 additions & 1 deletion nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {

// Create a client node with a plugin
node := mock.Node()
node.CSINodePlugins = map[string]*structs.CSIInfo{
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true},
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"adam": {PluginID: "adam", Healthy: true},
}
err := state.UpsertNode(3, node)
Expand Down
104 changes: 84 additions & 20 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
if raw != nil {
plug = raw.(*structs.CSIPlugin).Copy()
} else {
if !info.Healthy {
// we don't want to create new plugins for unhealthy
// allocs, otherwise we'd recreate the plugin when we
// get the update for the alloc becoming terminal
return nil
}
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.Provider = info.Provider
plug.Version = info.ProviderVersion
Expand All @@ -1057,21 +1063,23 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}

inUse := map[string]struct{}{}
inUseController := map[string]struct{}{}
inUseNode := map[string]struct{}{}

for _, info := range node.CSIControllerPlugins {
err := loop(info)
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseController[info.PluginID] = struct{}{}
}

for _, info := range node.CSINodePlugins {
err := loop(info)
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseNode[info.PluginID] = struct{}{}
}

// remove the client node from any plugin that's not
Expand All @@ -1086,15 +1094,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
break
}
plug := raw.(*structs.CSIPlugin)
_, ok := inUse[plug.ID]
if !ok {
_, asController := plug.Controllers[node.ID]
_, asNode := plug.Nodes[node.ID]
if asController || asNode {
err = deleteNodeFromPlugin(txn, plug.Copy(), node, index)

var hadDelete bool
if _, ok := inUseController[plug.ID]; !ok {
if _, asController := plug.Controllers[node.ID]; asController {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController)
if err != nil {
return err
}
hadDelete = true
}
}
if _, ok := inUseNode[plug.ID]; !ok {
if _, asNode := plug.Nodes[node.ID]; asNode {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode)
if err != nil {
return err
}
hadDelete = true
}
}
// we check this flag both for performance and to make sure we
// don't delete a plugin when registering a node plugin but
// no controller
if hadDelete {
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -1130,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}

plug := raw.(*structs.CSIPlugin).Copy()
err = deleteNodeFromPlugin(txn, plug, node, index)
err = plug.DeleteNode(node.ID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
Expand All @@ -1143,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}

func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error {
err := plug.DeleteNode(node.ID)
if err != nil {
return err
}
return updateOrGCPlugin(index, txn, plug)
}

// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error {
plug.ModifyIndex = index
Expand Down Expand Up @@ -1187,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
plugins := map[string]*structs.CSIPlugin{}

for _, a := range allocs {
// if its nil, we can just panic
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
Expand Down Expand Up @@ -2137,7 +2158,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
if vol == nil {
return nil, nil
}

// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -2796,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
return err
}

if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
return err
}

// Update the allocation
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
Expand Down Expand Up @@ -2902,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}

if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
return err
}

if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -4559,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}

// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
txn *memdb.Txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}

ws := memdb.NewWatchSet()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
pluginID := t.CSIPluginConfig.ID
plug, err := s.CSIPluginByID(ws, pluginID)
if err != nil {
return err
}
if plug == nil {
// plugin may not have been created because it never
// became healthy, just move on
return nil
}
err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}

return nil
}

// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)
Expand Down
Loading

0 comments on commit 1531db8

Please sign in to comment.