From 9eb4975d52bab4d41b6e7777a050c0cc58a77180 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 30 Apr 2020 17:00:16 -0400 Subject: [PATCH 1/8] csi: failing unit test that exercises plugin cleanup The existing test didn't go back to the state store, which normally is ok but is complicated in this case by denormalization which changes the behavior. This commit makes the test more comprehensive. --- nomad/state/state_store_test.go | 143 +++++++++++++++++++++++--------- 1 file changed, 102 insertions(+), 41 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ad12ea68061..aa5374ce811 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3027,10 +3027,11 @@ func TestStateStore_CSIVolume(t *testing.T) { func TestStateStore_CSIPluginNodes(t *testing.T) { index := uint64(999) state := testStateStore(t) + ws := memdb.NewWatchSet() + plugID := "foo" - // Create Nodes fingerprinting the plugins + // Create Nomad client Nodes ns := []*structs.Node{mock.Node(), mock.Node()} - for _, n := range ns { index++ err := state.UpsertNode(index, n) @@ -3038,10 +3039,10 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { } // Fingerprint a running controller plugin - n0 := ns[0].Copy() + n0, _ := state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3052,17 +3053,16 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { }, }, } - index++ err := state.UpsertNode(index, n0) require.NoError(t, err) // Fingerprint two running node plugins for _, n := range ns[:] { - n = n.Copy() + n, _ := state.NodeByID(ws, n.ID) n.CSINodePlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3070,24 +3070,38 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { NodeInfo: &structs.CSINodeInfo{}, }, } - index++ err = state.UpsertNode(index, n) require.NoError(t, err) } - ws := memdb.NewWatchSet() - plug, err := state.CSIPluginByID(ws, "foo") + plug, err := state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") - require.Equal(t, "foo", plug.ID) - require.Equal(t, 1, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) + // Volume using the plugin + index++ + vol := &structs.CSIVolume{ + ID: uuid.Generate(), + Namespace: structs.DefaultNamespace, + PluginID: plugID, + } + err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + require.NoError(t, err) + + vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) + require.NoError(t, err) + require.True(t, vol.Schedulable, "volume should be schedulable") // Controller is unhealthy + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: false, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3103,51 +3117,98 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") - require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) - - // Volume using the plugin - index++ - vol := &structs.CSIVolume{ - ID: uuid.Generate(), - Namespace: structs.DefaultNamespace, - PluginID: "foo", - } - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) - require.True(t, vol.Schedulable) + require.False(t, vol.Schedulable, "volume should not be schedulable") // Node plugin is removed - n1 := ns[1].Copy() + n1, _ := state.NodeByID(ws, ns[1].ID) n1.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n1) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 1, plug.NodesHealthy) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") - // Last plugin is removed - n0 = ns[0].Copy() + // Last node plugin is removed + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // A node plugin is restored + n0, _ = state.NodeByID(ws, n0.ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugin should be replaced and healthy + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") + + // Remove node again + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // controller is removed + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Plugin has been removed entirely + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) require.Nil(t, plug) - // Volume exists and is safe to query, but unschedulable + // Volume still exists and is safe to query, but unschedulable vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) require.False(t, vol.Schedulable) From c44c4fc0d245e29139c4b0ab53bf4a6e2c8e565d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 30 Apr 2020 17:01:48 -0400 Subject: [PATCH 2/8] csi: plugins with PUBLISH_READONLY need controllers too All known controllers that support `PUBLISH_READONLY` also support `PUBLISH_UNPUBLISH_VOLUME` but we shouldn't assume this. --- nomad/structs/csi.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index c69fada688f..08ef9797113 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -701,7 +701,8 @@ func (p *CSIPlugin) Copy() *CSIPlugin { func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { if info.ControllerInfo != nil { p.ControllerRequired = info.RequiresControllerPlugin && - info.ControllerInfo.SupportsAttachDetach + (info.ControllerInfo.SupportsAttachDetach || + info.ControllerInfo.SupportsReadOnlyAttach) prev, ok := p.Controllers[nodeID] if ok { From e716e8bdf4430f8f65e460b9b2d7e0e114e0409d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 30 Apr 2020 17:04:12 -0400 Subject: [PATCH 3/8] csi: delete node and controller plugins separately If a Nomad client node is running both a controller and a node plugin (which is a common case), then if only the controller or the node is removed, the plugin was not being updated with the correct counts. --- nomad/csi_endpoint_test.go | 4 +++- nomad/state/state_store.go | 39 ++++++++++++++++++++++++++++---------- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e474f42a411..1878d109b9d 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { // Create a client node with a plugin node := mock.Node() - node.CSINodePlugins = map[string]*structs.CSIInfo{ + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ "minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true, ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true}, }, + } + node.CSINodePlugins = map[string]*structs.CSIInfo{ "adam": {PluginID: "adam", Healthy: true}, } err := state.UpsertNode(3, node) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 623ea55c795..7bd13e2da72 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1057,13 +1057,15 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } - inUse := map[string]struct{}{} + inUseController := map[string]struct{}{} + inUseNode := map[string]struct{}{} + for _, info := range node.CSIControllerPlugins { err := loop(info) if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseController[info.PluginID] = struct{}{} } for _, info := range node.CSINodePlugins { @@ -1071,7 +1073,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseNode[info.PluginID] = struct{}{} } // remove the client node from any plugin that's not @@ -1086,15 +1088,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro break } plug := raw.(*structs.CSIPlugin) - _, ok := inUse[plug.ID] - if !ok { - _, asController := plug.Controllers[node.ID] - _, asNode := plug.Nodes[node.ID] - if asController || asNode { - err = deleteNodeFromPlugin(txn, plug.Copy(), node, index) + + var hadDelete bool + if _, ok := inUseController[plug.ID]; !ok { + if _, asController := plug.Controllers[node.ID]; asController { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController) if err != nil { return err } + hadDelete = true + } + } + if _, ok := inUseNode[plug.ID]; !ok { + if _, asNode := plug.Nodes[node.ID]; asNode { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode) + if err != nil { + return err + } + hadDelete = true + } + } + // we check this flag both for performance and to make sure we + // don't delete a plugin when registering a node plugin but + // no controller + if hadDelete { + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err } } } @@ -2137,7 +2157,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs if vol == nil { return nil, nil } - // Lookup CSIPlugin, the health records, and calculate volume health txn := s.db.Txn(false) defer txn.Abort() From 70d3e4b44c7433ec1f605525c47f68228ef81afb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 4 May 2020 10:34:02 -0400 Subject: [PATCH 4/8] csi: create plugins only for healthy allocs Plugins are first created when a Nomad client sends a node update RPC that includes allocs with plugins. We use the same mechanism to update plugin health. But if we allow a plugin to be created for the first time when the alloc is not healthy, then we'll recreate deleted plugins when the job's allocs all get marked terminal. This changeset fixes that by only creating a plugin when the alloc is healthy. --- nomad/state/state_store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7bd13e2da72..0b6b3de80f6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if raw != nil { plug = raw.(*structs.CSIPlugin).Copy() } else { + if !info.Healthy { + // we don't want to create new plugins for unhealthy + // allocs, otherwise we'd recreate the plugin when we + // get the update for the alloc becoming terminal + return nil + } plug = structs.NewCSIPlugin(info.PluginID, index) plug.Provider = info.Provider plug.Version = info.ProviderVersion From 8e0728e3326529bb7982942fcf958769344088e3 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 4 May 2020 10:44:33 -0400 Subject: [PATCH 5/8] function defactor --- nomad/state/state_store.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0b6b3de80f6..1e498d1078f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1156,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } plug := raw.(*structs.CSIPlugin).Copy() - err = deleteNodeFromPlugin(txn, plug, node, index) + err = plug.DeleteNode(node.ID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) if err != nil { return err } @@ -1169,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } -func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error { - err := plug.DeleteNode(node.ID) - if err != nil { - return err - } - return updateOrGCPlugin(index, txn, plug) -} - // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error { plug.ModifyIndex = index @@ -1213,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru plugins := map[string]*structs.CSIPlugin{} for _, a := range allocs { - // if its nil, we can just panic tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, t := range tg.Tasks { if t.CSIPluginConfig != nil { From f3f1d6efbb7b50924cdf4f55bb18c25b39447293 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 4 May 2020 10:49:34 -0400 Subject: [PATCH 6/8] csi: update of terminal plugin allocs should clean themselves up When an allocation that implements a CSI plugin becomes terminal the client fingerprint can't tell if the plugin is unhealthy intentionally (for the case of updates or job stop). Allocations that are server-terminal should delete themselves from the plugin and trigger a plugin self-GC, the same as an unused node. --- nomad/state/state_store.go | 44 +++++ nomad/state/state_store_test.go | 301 ++++++++++++++++++++++++++++++++ nomad/structs/csi.go | 8 +- 3 files changed, 352 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1e498d1078f..88d9b1bb838 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2816,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a return err } + if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil { + return err + } + // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) @@ -2922,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return err } + if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil { + return err + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -4579,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return nil } +// updatePluginWithAlloc updates the CSI plugins for an alloc when the +// allocation is updated or inserted with a terminal server status. +func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, + txn *memdb.Txn) error { + if !alloc.ServerTerminalStatus() { + return nil + } + + ws := memdb.NewWatchSet() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, t := range tg.Tasks { + if t.CSIPluginConfig != nil { + pluginID := t.CSIPluginConfig.ID + plug, err := s.CSIPluginByID(ws, pluginID) + if err != nil { + return err + } + if plug == nil { + // plugin may not have been created because it never + // became healthy, just move on + return nil + } + err = plug.DeleteAlloc(alloc.ID, alloc.NodeID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err + } + } + } + + return nil +} + // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index aa5374ce811..ef1216d1abb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3214,6 +3214,307 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { require.False(t, vol.Schedulable) } +// TestStateStore_CSIPluginAllocUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates. +func TestStateStore_CSIPluginAllocUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + n := mock.Node() + index++ + err := state.UpsertNode(index, n) + require.NoError(t, err) + + // (1) unhealthy fingerprint, then terminal alloc, then healthy node update + plugID0 := "foo0" + + alloc0 := mock.Alloc() + alloc0.NodeID = n.ID + alloc0.DesiredStatus = "run" + alloc0.ClientStatus = "running" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID0: { + PluginID: plugID0, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: not yet healthy") + + alloc0.DesiredStatus = "stopped" + alloc0.ClientStatus = "complete" + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: allocs never healthy") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins[plugID0].Healthy = true + n.CSINodePlugins[plugID0].UpdateTime = time.Now() + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + // (2) healthy fingerprint, then terminal alloc update + plugID1 := "foo1" + + alloc1 := mock.Alloc() + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID1: { + PluginID: plugID1, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + alloc1.NodeID = n.ID + alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = "complete" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + // (3) terminal alloc update, then unhealthy fingerprint + plugID2 := "foo2" + + alloc2 := mock.Alloc() + alloc2.NodeID = n.ID + alloc2.DesiredStatus = "stop" + alloc2.ClientStatus = "complete" + alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID2: { + PluginID: plugID2, + AllocID: alloc2.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "plugin should not exist: never became healthy") + +} + +// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates when multiple nodes are involved +func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + var err error + + // Create Nomad client Nodes + ns := []*structs.Node{mock.Node(), mock.Node()} + for _, n := range ns { + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + } + + plugID := "foo" + plugCfg := &structs.TaskCSIPluginConfig{ID: plugID} + + // Fingerprint two running node plugins and their allocs; we'll + // leave these in place for the test to ensure we don't GC the + // plugin + for _, n := range ns[:] { + nAlloc := mock.Alloc() + n, _ := state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: nAlloc.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + nAlloc.NodeID = n.ID + nAlloc.DesiredStatus = "run" + nAlloc.ClientStatus = "running" + nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc}) + require.NoError(t, err) + } + + // Fingerprint a running controller plugin + alloc0 := mock.Alloc() + n0, _ := state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n1, _ := state.NodeByID(ws, ns[1].ID) + + alloc0.NodeID = n0.ID + alloc0.DesiredStatus = "stop" + alloc0.ClientStatus = "complete" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + alloc1 := mock.Alloc() + alloc1.NodeID = n1.ID + alloc1.DesiredStatus = "run" + alloc1.ClientStatus = "running" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + n1.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n1) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + +} + func TestStateStore_CSIPluginJobs(t *testing.T) { s := testStateStore(t) deleteNodes := CreateTestCSIPlugin(s, "foo") diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 08ef9797113..73412055849 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -713,11 +713,15 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.ControllersHealthy -= 1 } } + // note: for this to work as expected, only a single // controller for a given plugin can be on a given Nomad // client, they also conflict on the client so this should be // ok p.Controllers[nodeID] = info + if prev != nil || prev == nil && info.Healthy { + p.Controllers[nodeID] = info + } if info.Healthy { p.ControllersHealthy += 1 } @@ -733,7 +737,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.NodesHealthy -= 1 } } - p.Nodes[nodeID] = info + if prev != nil || prev == nil && info.Healthy { + p.Nodes[nodeID] = info + } if info.Healthy { p.NodesHealthy += 1 } From e339037a8feb7baa1cb550461b09a43b0277d726 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 5 May 2020 13:52:35 -0400 Subject: [PATCH 7/8] address review comments --- nomad/structs/csi.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index 73412055849..e37d0961e1f 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -719,7 +719,7 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { // client, they also conflict on the client so this should be // ok p.Controllers[nodeID] = info - if prev != nil || prev == nil && info.Healthy { + if prev != nil || info.Healthy { p.Controllers[nodeID] = info } if info.Healthy { @@ -737,7 +737,7 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.NodesHealthy -= 1 } } - if prev != nil || prev == nil && info.Healthy { + if prev != nil || info.Healthy { p.Nodes[nodeID] = info } if info.Healthy { From 9765f29b301e991afd54c793c41fd5e98796bd85 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 5 May 2020 14:34:51 -0400 Subject: [PATCH 8/8] fix bad rebase --- nomad/structs/csi.go | 1 - 1 file changed, 1 deletion(-) diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index e37d0961e1f..a227e93c18e 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -718,7 +718,6 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { // controller for a given plugin can be on a given Nomad // client, they also conflict on the client so this should be // ok - p.Controllers[nodeID] = info if prev != nil || info.Healthy { p.Controllers[nodeID] = info }