diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9f579f81be8..c17724c634e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2133,7 +2133,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking vol := obj.(*structs.CSIVolume) - return s.CSIVolumeDenormalizePlugins(ws, vol.Copy()) + return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it @@ -2261,12 +2261,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } } - volume, err := s.CSIVolumeDenormalizePlugins(ws, orig.Copy()) + volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) if err != nil { return err } - - volume, err = s.CSIVolumeDenormalize(ws, volume) + volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) if err != nil { return err } @@ -2330,7 +2329,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // allocations have been stopped but claims can't be freed because // ex. the plugins have all been removed. if vol.InUse() { - if !force || !s.volSafeToForce(vol) { + if !force || !s.volSafeToForce(txn, vol) { return fmt.Errorf("volume in use: %s", id) } } @@ -2349,9 +2348,8 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // volSafeToForce checks if the any of the remaining allocations // are in a non-terminal state. -func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool { - ws := memdb.NewWatchSet() - vol, err := s.CSIVolumeDenormalize(ws, v) +func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { + vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) if err != nil { return false } @@ -2369,19 +2367,30 @@ func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool { return true } -// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and plugins, but -// without allocations -// Use this for current volume metadata, handling lists of volumes -// Use CSIVolumeDenormalize for volumes containing both health and current allocations +// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and +// plugins, but without allocations. +// Use this for current volume metadata, handling lists of volumes. +// Use CSIVolumeDenormalize for volumes containing both health and current +// allocations. func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } - // Lookup CSIPlugin, the health records, and calculate volume health txn := s.db.ReadTxn() defer txn.Abort() + return s.CSIVolumeDenormalizePluginsTxn(txn, vol) +} - plug, err := s.CSIPluginByID(ws, vol.PluginID) +// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and +// plugins, but without allocations. +// Use this for current volume metadata, handling lists of volumes. +// Use CSIVolumeDenormalize for volumes containing both health and current +// allocations. +func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + if vol == nil { + return nil, nil + } + plug, err := s.CSIPluginByIDTxn(txn, nil, vol.PluginID) if err != nil { return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err) } @@ -2412,8 +2421,15 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs // CSIVolumeDenormalize returns a CSIVolume with allocations func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + txn := s.db.ReadTxn() + return s.CSIVolumeDenormalizeTxn(txn, ws, vol) +} + +// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations +func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + for id := range vol.ReadAllocs { - a, err := s.AllocByID(ws, id) + a, err := s.allocByIDImpl(txn, ws, id) if err != nil { return nil, err } @@ -2434,7 +2450,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol } for id := range vol.WriteAllocs { - a, err := s.AllocByID(ws, id) + a, err := s.allocByIDImpl(txn, ws, id) if err != nil { return nil, err }