diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e474f42a411..1878d109b9d 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -560,10 +560,12 @@ func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { // Create a client node with a plugin node := mock.Node() - node.CSINodePlugins = map[string]*structs.CSIInfo{ + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ "minnie": {PluginID: "minnie", Healthy: true, RequiresControllerPlugin: true, ControllerInfo: &structs.CSIControllerInfo{SupportsAttachDetach: true}, }, + } + node.CSINodePlugins = map[string]*structs.CSIInfo{ "adam": {PluginID: "adam", Healthy: true}, } err := state.UpsertNode(3, node) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 623ea55c795..88d9b1bb838 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1037,6 +1037,12 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if raw != nil { plug = raw.(*structs.CSIPlugin).Copy() } else { + if !info.Healthy { + // we don't want to create new plugins for unhealthy + // allocs, otherwise we'd recreate the plugin when we + // get the update for the alloc becoming terminal + return nil + } plug = structs.NewCSIPlugin(info.PluginID, index) plug.Provider = info.Provider plug.Version = info.ProviderVersion @@ -1057,13 +1063,15 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } - inUse := map[string]struct{}{} + inUseController := map[string]struct{}{} + inUseNode := map[string]struct{}{} + for _, info := range node.CSIControllerPlugins { err := loop(info) if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseController[info.PluginID] = struct{}{} } for _, info := range node.CSINodePlugins { @@ -1071,7 +1079,7 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro if err != nil { return err } - inUse[info.PluginID] = struct{}{} + inUseNode[info.PluginID] = struct{}{} } // remove the client node from any plugin that's not @@ -1086,15 +1094,33 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro break } plug := raw.(*structs.CSIPlugin) - _, ok := inUse[plug.ID] - if !ok { - _, asController := plug.Controllers[node.ID] - _, asNode := plug.Nodes[node.ID] - if asController || asNode { - err = deleteNodeFromPlugin(txn, plug.Copy(), node, index) + + var hadDelete bool + if _, ok := inUseController[plug.ID]; !ok { + if _, asController := plug.Controllers[node.ID]; asController { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeController) if err != nil { return err } + hadDelete = true + } + } + if _, ok := inUseNode[plug.ID]; !ok { + if _, asNode := plug.Nodes[node.ID]; asNode { + err := plug.DeleteNodeForType(node.ID, structs.CSIPluginTypeNode) + if err != nil { + return err + } + hadDelete = true + } + } + // we check this flag both for performance and to make sure we + // don't delete a plugin when registering a node plugin but + // no controller + if hadDelete { + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err } } } @@ -1130,7 +1156,11 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } plug := raw.(*structs.CSIPlugin).Copy() - err = deleteNodeFromPlugin(txn, plug, node, index) + err = plug.DeleteNode(node.ID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) if err != nil { return err } @@ -1143,14 +1173,6 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return nil } -func deleteNodeFromPlugin(txn *memdb.Txn, plug *structs.CSIPlugin, node *structs.Node, index uint64) error { - err := plug.DeleteNode(node.ID) - if err != nil { - return err - } - return updateOrGCPlugin(index, txn, plug) -} - // updateOrGCPlugin updates a plugin but will delete it if the plugin is empty func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) error { plug.ModifyIndex = index @@ -1187,7 +1209,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru plugins := map[string]*structs.CSIPlugin{} for _, a := range allocs { - // if its nil, we can just panic tg := a.Job.LookupTaskGroup(a.TaskGroup) for _, t := range tg.Tasks { if t.CSIPluginConfig != nil { @@ -2137,7 +2158,6 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs if vol == nil { return nil, nil } - // Lookup CSIPlugin, the health records, and calculate volume health txn := s.db.Txn(false) defer txn.Abort() @@ -2796,6 +2816,10 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a return err } + if err := s.updatePluginWithAlloc(index, copyAlloc, txn); err != nil { + return err + } + // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) @@ -2902,6 +2926,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return err } + if err := s.updatePluginWithAlloc(index, alloc, txn); err != nil { + return err + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -4559,6 +4587,42 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return nil } +// updatePluginWithAlloc updates the CSI plugins for an alloc when the +// allocation is updated or inserted with a terminal server status. +func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocation, + txn *memdb.Txn) error { + if !alloc.ServerTerminalStatus() { + return nil + } + + ws := memdb.NewWatchSet() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + for _, t := range tg.Tasks { + if t.CSIPluginConfig != nil { + pluginID := t.CSIPluginConfig.ID + plug, err := s.CSIPluginByID(ws, pluginID) + if err != nil { + return err + } + if plug == nil { + // plugin may not have been created because it never + // became healthy, just move on + return nil + } + err = plug.DeleteAlloc(alloc.ID, alloc.NodeID) + if err != nil { + return err + } + err = updateOrGCPlugin(index, txn, plug) + if err != nil { + return err + } + } + } + + return nil +} + // UpsertACLPolicies is used to create or update a set of ACL policies func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index ad12ea68061..ef1216d1abb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -3027,10 +3027,11 @@ func TestStateStore_CSIVolume(t *testing.T) { func TestStateStore_CSIPluginNodes(t *testing.T) { index := uint64(999) state := testStateStore(t) + ws := memdb.NewWatchSet() + plugID := "foo" - // Create Nodes fingerprinting the plugins + // Create Nomad client Nodes ns := []*structs.Node{mock.Node(), mock.Node()} - for _, n := range ns { index++ err := state.UpsertNode(index, n) @@ -3038,10 +3039,10 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { } // Fingerprint a running controller plugin - n0 := ns[0].Copy() + n0, _ := state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3052,17 +3053,16 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { }, }, } - index++ err := state.UpsertNode(index, n0) require.NoError(t, err) // Fingerprint two running node plugins for _, n := range ns[:] { - n = n.Copy() + n, _ := state.NodeByID(ws, n.ID) n.CSINodePlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: true, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3070,24 +3070,38 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { NodeInfo: &structs.CSINodeInfo{}, }, } - index++ err = state.UpsertNode(index, n) require.NoError(t, err) } - ws := memdb.NewWatchSet() - plug, err := state.CSIPluginByID(ws, "foo") + plug, err := state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") - require.Equal(t, "foo", plug.ID) - require.Equal(t, 1, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) + // Volume using the plugin + index++ + vol := &structs.CSIVolume{ + ID: uuid.Generate(), + Namespace: structs.DefaultNamespace, + PluginID: plugID, + } + err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + require.NoError(t, err) + + vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) + require.NoError(t, err) + require.True(t, vol.Schedulable, "volume should be schedulable") // Controller is unhealthy + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "foo": { - PluginID: "foo", + plugID: { + PluginID: plugID, Healthy: false, UpdateTime: time.Now(), RequiresControllerPlugin: true, @@ -3103,56 +3117,404 @@ func TestStateStore_CSIPluginNodes(t *testing.T) { err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") - require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 2, plug.NodesHealthy) - - // Volume using the plugin - index++ - vol := &structs.CSIVolume{ - ID: uuid.Generate(), - Namespace: structs.DefaultNamespace, - PluginID: "foo", - } - err = state.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) - require.True(t, vol.Schedulable) + require.False(t, vol.Schedulable, "volume should not be schedulable") // Node plugin is removed - n1 := ns[1].Copy() + n1, _ := state.NodeByID(ws, ns[1].ID) n1.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n1) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) - require.Equal(t, "foo", plug.ID) - require.Equal(t, 0, plug.ControllersHealthy) - require.Equal(t, 1, plug.NodesHealthy) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") - // Last plugin is removed - n0 = ns[0].Copy() + // Last node plugin is removed + n0, _ = state.NodeByID(ws, ns[0].ID) n0.CSINodePlugins = map[string]*structs.CSIInfo{} index++ err = state.UpsertNode(index, n0) require.NoError(t, err) - plug, err = state.CSIPluginByID(ws, "foo") + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // A node plugin is restored + n0, _ = state.NodeByID(ws, n0.ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugin should be replaced and healthy + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 1, len(plug.Nodes), "nodes expected") + + // Remove node again + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSINodePlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Nodes plugins should be gone but controllers left + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 0, len(plug.Nodes), "nodes expected") + + // controller is removed + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{} + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + // Plugin has been removed entirely + plug, err = state.CSIPluginByID(ws, plugID) require.NoError(t, err) require.Nil(t, plug) - // Volume exists and is safe to query, but unschedulable + // Volume still exists and is safe to query, but unschedulable vol, err = state.CSIVolumeByID(ws, structs.DefaultNamespace, vol.ID) require.NoError(t, err) require.False(t, vol.Schedulable) } +// TestStateStore_CSIPluginAllocUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates. +func TestStateStore_CSIPluginAllocUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + n := mock.Node() + index++ + err := state.UpsertNode(index, n) + require.NoError(t, err) + + // (1) unhealthy fingerprint, then terminal alloc, then healthy node update + plugID0 := "foo0" + + alloc0 := mock.Alloc() + alloc0.NodeID = n.ID + alloc0.DesiredStatus = "run" + alloc0.ClientStatus = "running" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID0} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID0: { + PluginID: plugID0, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: not yet healthy") + + alloc0.DesiredStatus = "stopped" + alloc0.ClientStatus = "complete" + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: allocs never healthy") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins[plugID0].Healthy = true + n.CSINodePlugins[plugID0].UpdateTime = time.Now() + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID0) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + // (2) healthy fingerprint, then terminal alloc update + plugID1 := "foo1" + + alloc1 := mock.Alloc() + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID1: { + PluginID: plugID1, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.NotNil(t, plug, "plugin should exist") + + alloc1.NodeID = n.ID + alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = "complete" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID1} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID1) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + // (3) terminal alloc update, then unhealthy fingerprint + plugID2 := "foo2" + + alloc2 := mock.Alloc() + alloc2.NodeID = n.ID + alloc2.DesiredStatus = "stop" + alloc2.ClientStatus = "complete" + alloc2.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = &structs.TaskCSIPluginConfig{ID: plugID2} + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc2}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "no plugin should exist: alloc became terminal") + + n, _ = state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID2: { + PluginID: plugID2, + AllocID: alloc2.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID2) + require.NoError(t, err) + require.Nil(t, plug, "plugin should not exist: never became healthy") + +} + +// TestStateStore_CSIPluginMultiNodeUpdates tests the ordering +// interactions for CSI plugins between Nomad client node updates and +// allocation updates when multiple nodes are involved +func TestStateStore_CSIPluginMultiNodeUpdates(t *testing.T) { + t.Parallel() + index := uint64(999) + state := testStateStore(t) + ws := memdb.NewWatchSet() + + var err error + + // Create Nomad client Nodes + ns := []*structs.Node{mock.Node(), mock.Node()} + for _, n := range ns { + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + } + + plugID := "foo" + plugCfg := &structs.TaskCSIPluginConfig{ID: plugID} + + // Fingerprint two running node plugins and their allocs; we'll + // leave these in place for the test to ensure we don't GC the + // plugin + for _, n := range ns[:] { + nAlloc := mock.Alloc() + n, _ := state.NodeByID(ws, n.ID) + n.CSINodePlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: nAlloc.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + index++ + err = state.UpsertNode(index, n) + require.NoError(t, err) + + nAlloc.NodeID = n.ID + nAlloc.DesiredStatus = "run" + nAlloc.ClientStatus = "running" + nAlloc.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{nAlloc}) + require.NoError(t, err) + } + + // Fingerprint a running controller plugin + alloc0 := mock.Alloc() + n0, _ := state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + plug, err := state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n1, _ := state.NodeByID(ws, ns[1].ID) + + alloc0.NodeID = n0.ID + alloc0.DesiredStatus = "stop" + alloc0.ClientStatus = "complete" + alloc0.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc0}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + alloc1 := mock.Alloc() + alloc1.NodeID = n1.ID + alloc1.DesiredStatus = "run" + alloc1.ClientStatus = "running" + alloc1.Job.TaskGroups[0].Tasks[0].CSIPluginConfig = plugCfg + + index++ + err = state.UpsertAllocs(index, []*structs.Allocation{alloc1}) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.Equal(t, 0, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 0, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + + n0, _ = state.NodeByID(ws, ns[0].ID) + n0.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc0.ID, + Healthy: false, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n0) + require.NoError(t, err) + + n1.CSIControllerPlugins = map[string]*structs.CSIInfo{ + plugID: { + PluginID: plugID, + AllocID: alloc1.ID, + Healthy: true, + UpdateTime: time.Now(), + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsListVolumes: true, + }, + }, + } + index++ + err = state.UpsertNode(index, n1) + require.NoError(t, err) + + plug, err = state.CSIPluginByID(ws, plugID) + require.NoError(t, err) + require.True(t, plug.ControllerRequired) + require.Equal(t, 1, plug.ControllersHealthy, "controllers healthy") + require.Equal(t, 1, len(plug.Controllers), "controllers expected") + require.Equal(t, 2, plug.NodesHealthy, "nodes healthy") + require.Equal(t, 2, len(plug.Nodes), "nodes expected") + +} + func TestStateStore_CSIPluginJobs(t *testing.T) { s := testStateStore(t) deleteNodes := CreateTestCSIPlugin(s, "foo") diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index c69fada688f..a227e93c18e 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -701,7 +701,8 @@ func (p *CSIPlugin) Copy() *CSIPlugin { func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { if info.ControllerInfo != nil { p.ControllerRequired = info.RequiresControllerPlugin && - info.ControllerInfo.SupportsAttachDetach + (info.ControllerInfo.SupportsAttachDetach || + info.ControllerInfo.SupportsReadOnlyAttach) prev, ok := p.Controllers[nodeID] if ok { @@ -712,11 +713,14 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.ControllersHealthy -= 1 } } + // note: for this to work as expected, only a single // controller for a given plugin can be on a given Nomad // client, they also conflict on the client so this should be // ok - p.Controllers[nodeID] = info + if prev != nil || info.Healthy { + p.Controllers[nodeID] = info + } if info.Healthy { p.ControllersHealthy += 1 } @@ -732,7 +736,9 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) error { p.NodesHealthy -= 1 } } - p.Nodes[nodeID] = info + if prev != nil || info.Healthy { + p.Nodes[nodeID] = info + } if info.Healthy { p.NodesHealthy += 1 }