From 1531db809fdf482bdf709c11c451b2f40afef962 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 5 May 2020 15:39:57 -0400 Subject: [PATCH] csi: fix plugin counts on node update (#7844) In this changeset: * If a Nomad client node is running both a controller and a node plugin (which is a common case), then if only the controller or the node is removed, the plugin was not being updated with the correct counts. * The existing test for plugin cleanup didn't go back to the state store, which normally is ok but is complicated in this case by denormalization which changes the behavior. This commit makes the test more comprehensive. * Set "controller required" when plugin has `PUBLISH_READONLY`. All known controllers that support `PUBLISH_READONLY` also support `PUBLISH_UNPUBLISH_VOLUME` but we shouldn't assume this. * Only create plugins when the allocs for those plugins are healthy. If we allow a plugin to be created for the first time when the alloc is not healthy, then we'll recreate deleted plugins when the job's allocs all get marked terminal. * Terminal plugin alloc updates should cleanup the plugin. The client fingerprint can't tell if the plugin is unhealthy intentionally (for the case of updates or job stop). Allocations that are server-terminal should delete themselves from the plugin and trigger a plugin self-GC, the same as an unused node. --- nomad/csi_endpoint_test.go | 4 +- nomad/state/state_store.go | 104 ++++++-- nomad/state/state_store_test.go | 444 +++++++++++++++++++++++++++++--- nomad/structs/csi.go | 12 +- 4 files changed, 499 insertions(+), 65 deletions(-) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e474f42a411..1878d109b9d 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { // Create a client node with a plugin node := mock.Node() - node.CSINodePlugins = map[string]*structs.CSIInfo{ + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ "minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true, ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true}, }, + } + node.CSINodePlugins = map[string]*structs.CSIInfo{ "adam": {PluginID: "adam", Healthy: true}, } err := state.UpsertNode(3, node) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 623ea55c795..88d9b1bb838 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if raw != nil { plug = raw.(*structs.CSIPlugin).Copy() } else { + if !info.Healthy { + // we don't want to create new plugins for unhealthy + // allocs, otherwise we'd recreate the plugin when we + // get the update for the alloc becoming terminal + return nil + } plug = structs.NewCSIPlugin(info.PluginID, index) plug.Provider = info.Provider plug.Version = info.ProviderVersion @@ -1057,13 +1063,15 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } - inUse := map[string]struct{}{} + inUseController := map[string]struct{}{} + inUseNode := map[string]struct{}{} + for _, info := range node.CSIControllerPlugins { err := loop(info) if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseController[info.PluginID] = struct{}{} } for _, info := range node.CSINodePlugins { @@ -1071,7 +1079,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseNode[info.PluginID] = struct{}{} } // remove the client node from any plugin that's not @@ -1086,15 +1094,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro break } plug := raw.(*structs.CSIPlugin) - _, ok := inUse[plug.ID] - if !ok { - _, asController := plug.Controllers[node.ID] - _, asNode := plug.Nodes[node.ID] - if asController || asNode { - err = deleteNodeFromPlugin(txn, plug.Copy(), node, index) + + var hadDelete bool + if _, ok := inUseController[plug.ID]; !ok { + if _, asController := plug.Controllers[node.ID]; asController { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController) if err != nil { return err } + hadDelete = true + } + } + if _, ok := inUseNode[plug.ID]; !ok { + if _, asNode := plug.Nodes[node.ID]; asNode { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode) + if err != nil { + return err + } + hadDelete = true + } + } + // we check this flag both for performance and to make sure we + // don't delete a plugin when registering a node plugin but + // no controller + if hadDelete { + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err } } } @@ -1130,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } plug := raw.(*structs.CSIPlugin).Copy() - err = deleteNodeFromPlugin(txn, plug, node, index) + err = plug.DeleteNode(node.ID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) if err != nil { return err } @@ -1143,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } -func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error { - err := plug.DeleteNode(node.ID) - if err != nil { - return err - } - return updateOrGCPlugin(index, txn, plug) -} - // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error { plug.ModifyIndex = index @@ -1187,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru plugins := map[string]*structs.CSIPlugin{} for _, a := range allocs { - // if its nil, we can just panic tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, t := range tg.Tasks { if t.CSIPluginConfig != nil { @@ -2137,7 +2158,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs if vol == nil { return nil, nil } - // Lookup CSIPlugin, the health records, and calculate volume health txn := s.db.Txn(false) defer txn.Abort() @@ -2796,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a return err } + if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil { + return err + } + // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) @@ -2902,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return err } + if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil { + return err + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -4559,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return nil } +// updatePluginWithAlloc updates the CSI plugins for an alloc when the +// allocation is updated or inserted with a terminal server status. +func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, + txn *memdb.Txn) error { + if !alloc.ServerTerminalStatus() { + return nil + } + + ws := memdb.NewWatchSet() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, t := range tg.Tasks { + if t.CSIPluginConfig != nil { + pluginID := t.CSIPluginConfig.ID + plug, err := s.CSIPluginByID(ws, pluginID) + if err != nil { + return err + } + if plug == nil { + // plugin may not have been created because it never + // became healthy, just move on + return nil + } + err = plug.DeleteAlloc(alloc.ID, alloc.NodeID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err + } + } + } + + return nil +} + // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ad12ea68061..ef1216d1abb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3027,10 +3027,11 @@ func TestStateStore_CSIVolume(t *testing.T) { func TestStateStore_CSIPluginNodes(t *testing.T) { index := uint64(999) state := testStateStore(t) + ws := memdb.NewWatchSet() + plugID := "foo" - // Create Nodes fingerprinting the plugins + // Create Nomad client Nodes ns := []*structs.Node{mock.Node(), mock.Node()} - for _, n := range ns { index++ err := state.UpsertNode(index, n) @@ -3038,10 +3039,10 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { } // Fingerprint a running controller plugin - n0 := ns[0].Copy() + n0, _ := state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3052,17 +3053,16 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { }, }, } - index++ err := state.UpsertNode(index, n0) require.NoError(t, err) // Fingerprint two running node plugins for _, n := range ns[:] { - n = n.Copy() + n, _ := state.NodeByID(ws, n.ID) n.CSINodePlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3070,24 +3070,38 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { NodeInfo: &structs.CSINodeInfo{}, }, } - index++ err = state.UpsertNode(index, n) require.NoError(t, err) } - ws := memdb.NewWatchSet() - plug, err := state.CSIPluginByID(ws, "foo") + plug, err := state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") - require.Equal(t, "foo", plug.ID) - require.Equal(t, 1, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) + // Volume using the plugin + index++ + vol := &structs.CSIVolume{ + ID: uuid.Generate(), + Namespace: structs.DefaultNamespace, + PluginID: plugID, + } + err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + require.NoError(t, err) + + vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) + require.NoError(t, err) + require.True(t, vol.Schedulable, "volume should be schedulable") // Controller is unhealthy + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: false, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3103,56 +3117,404 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") - require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) - - // Volume using the plugin - index++ - vol := &structs.CSIVolume{ - ID: uuid.Generate(), - Namespace: structs.DefaultNamespace, - PluginID: "foo", - } - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) - require.True(t, vol.Schedulable) + require.False(t, vol.Schedulable, "volume should not be schedulable") // Node plugin is removed - n1 := ns[1].Copy() + n1, _ := state.NodeByID(ws, ns[1].ID) n1.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n1) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 1, plug.NodesHealthy) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") - // Last plugin is removed - n0 = ns[0].Copy() + // Last node plugin is removed + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // A node plugin is restored + n0, _ = state.NodeByID(ws, n0.ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugin should be replaced and healthy + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") + + // Remove node again + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // controller is removed + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Plugin has been removed entirely + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) require.Nil(t, plug) - // Volume exists and is safe to query, but unschedulable + // Volume still exists and is safe to query, but unschedulable vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) require.False(t, vol.Schedulable) } +// TestStateStore_CSIPluginAllocUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates. +func TestStateStore_CSIPluginAllocUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + n := mock.Node() + index++ + err := state.UpsertNode(index, n) + require.NoError(t, err) + + // (1) unhealthy fingerprint, then terminal alloc, then healthy node update + plugID0 := "foo0" + + alloc0 := mock.Alloc() + alloc0.NodeID = n.ID + alloc0.DesiredStatus = "run" + alloc0.ClientStatus = "running" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID0: { + PluginID: plugID0, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: not yet healthy") + + alloc0.DesiredStatus = "stopped" + alloc0.ClientStatus = "complete" + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: allocs never healthy") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins[plugID0].Healthy = true + n.CSINodePlugins[plugID0].UpdateTime = time.Now() + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + // (2) healthy fingerprint, then terminal alloc update + plugID1 := "foo1" + + alloc1 := mock.Alloc() + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID1: { + PluginID: plugID1, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + alloc1.NodeID = n.ID + alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = "complete" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + // (3) terminal alloc update, then unhealthy fingerprint + plugID2 := "foo2" + + alloc2 := mock.Alloc() + alloc2.NodeID = n.ID + alloc2.DesiredStatus = "stop" + alloc2.ClientStatus = "complete" + alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID2: { + PluginID: plugID2, + AllocID: alloc2.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "plugin should not exist: never became healthy") + +} + +// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates when multiple nodes are involved +func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + var err error + + // Create Nomad client Nodes + ns := []*structs.Node{mock.Node(), mock.Node()} + for _, n := range ns { + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + } + + plugID := "foo" + plugCfg := &structs.TaskCSIPluginConfig{ID: plugID} + + // Fingerprint two running node plugins and their allocs; we'll + // leave these in place for the test to ensure we don't GC the + // plugin + for _, n := range ns[:] { + nAlloc := mock.Alloc() + n, _ := state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: nAlloc.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + nAlloc.NodeID = n.ID + nAlloc.DesiredStatus = "run" + nAlloc.ClientStatus = "running" + nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc}) + require.NoError(t, err) + } + + // Fingerprint a running controller plugin + alloc0 := mock.Alloc() + n0, _ := state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n1, _ := state.NodeByID(ws, ns[1].ID) + + alloc0.NodeID = n0.ID + alloc0.DesiredStatus = "stop" + alloc0.ClientStatus = "complete" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + alloc1 := mock.Alloc() + alloc1.NodeID = n1.ID + alloc1.DesiredStatus = "run" + alloc1.ClientStatus = "running" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + n1.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n1) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + +} + func TestStateStore_CSIPluginJobs(t *testing.T) { s := testStateStore(t) deleteNodes := CreateTestCSIPlugin(s, "foo") diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index c69fada688f..a227e93c18e 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -701,7 +701,8 @@ func (p *CSIPlugin) Copy() *CSIPlugin { func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { if info.ControllerInfo != nil { p.ControllerRequired = info.RequiresControllerPlugin && - info.ControllerInfo.SupportsAttachDetach + (info.ControllerInfo.SupportsAttachDetach || + info.ControllerInfo.SupportsReadOnlyAttach) prev, ok := p.Controllers[nodeID] if ok { @@ -712,11 +713,14 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.ControllersHealthy -= 1 } } + // note: for this to work as expected, only a single // controller for a given plugin can be on a given Nomad // client, they also conflict on the client so this should be // ok - p.Controllers[nodeID] = info + if prev != nil || info.Healthy { + p.Controllers[nodeID] = info + } if info.Healthy { p.ControllersHealthy += 1 } @@ -732,7 +736,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.NodesHealthy -= 1 } } - p.Nodes[nodeID] = info + if prev != nil || info.Healthy { + p.Nodes[nodeID] = info + } if info.Healthy { p.NodesHealthy += 1 }