Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: fix plugin counts on node update #7844

Merged
merged 8 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion nomad/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {

// Create a client node with a plugin
node := mock.Node()
node.CSINodePlugins = map[string]*structs.CSIInfo{
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true},
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"adam": {PluginID: "adam", Healthy: true},
}
err := state.UpsertNode(3, node)
Expand Down
104 changes: 84 additions & 20 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
if raw != nil {
plug = raw.(*structs.CSIPlugin).Copy()
} else {
if !info.Healthy {
// we don't want to create new plugins for unhealthy
// allocs, otherwise we'd recreate the plugin when we
// get the update for the alloc becoming terminal
return nil
}
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.Provider = info.Provider
plug.Version = info.ProviderVersion
Expand All @@ -1057,21 +1063,23 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}

inUse := map[string]struct{}{}
inUseController := map[string]struct{}{}
inUseNode := map[string]struct{}{}

for _, info := range node.CSIControllerPlugins {
err := loop(info)
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseController[info.PluginID] = struct{}{}
}

for _, info := range node.CSINodePlugins {
err := loop(info)
if err != nil {
return err
}
inUse[info.PluginID] = struct{}{}
inUseNode[info.PluginID] = struct{}{}
}

// remove the client node from any plugin that's not
Expand All @@ -1086,15 +1094,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
break
}
plug := raw.(*structs.CSIPlugin)
_, ok := inUse[plug.ID]
if !ok {
_, asController := plug.Controllers[node.ID]
_, asNode := plug.Nodes[node.ID]
if asController || asNode {
err = deleteNodeFromPlugin(txn, plug.Copy(), node, index)

var hadDelete bool
if _, ok := inUseController[plug.ID]; !ok {
if _, asController := plug.Controllers[node.ID]; asController {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController)
if err != nil {
return err
}
hadDelete = true
}
}
if _, ok := inUseNode[plug.ID]; !ok {
if _, asNode := plug.Nodes[node.ID]; asNode {
err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode)
if err != nil {
return err
}
hadDelete = true
}
}
// we check this flag both for performance and to make sure we
// don't delete a plugin when registering a node plugin but
// no controller
if hadDelete {
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -1130,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}

plug := raw.(*structs.CSIPlugin).Copy()
err = deleteNodeFromPlugin(txn, plug, node, index)
err = plug.DeleteNode(node.ID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
Expand All @@ -1143,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}

func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error {
err := plug.DeleteNode(node.ID)
if err != nil {
return err
}
return updateOrGCPlugin(index, txn, plug)
}

// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error {
plug.ModifyIndex = index
Expand Down Expand Up @@ -1187,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
plugins := map[string]*structs.CSIPlugin{}

for _, a := range allocs {
// if its nil, we can just panic
tg := a.Job.LookupTaskGroup(a.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
Expand Down Expand Up @@ -2137,7 +2158,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
if vol == nil {
return nil, nil
}

// Lookup CSIPlugin, the health records, and calculate volume health
txn := s.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -2796,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
return err
}

if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil {
return err
}

// Update the allocation
if err := txn.Insert("allocs", copyAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
Expand Down Expand Up @@ -2902,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}

if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil {
return err
}

if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down Expand Up @@ -4559,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return nil
}

// updatePluginWithAlloc updates the CSI plugins for an alloc when the
// allocation is updated or inserted with a terminal server status.
func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation,
txn *memdb.Txn) error {
if !alloc.ServerTerminalStatus() {
return nil
}

ws := memdb.NewWatchSet()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
pluginID := t.CSIPluginConfig.ID
plug, err := s.CSIPluginByID(ws, pluginID)
if err != nil {
return err
}
if plug == nil {
// plugin may not have been created because it never
// became healthy, just move on
return nil
}
err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
if err != nil {
return err
}
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}

return nil
}

// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)
Expand Down
Loading