From 8b88e5987680cc4b8be9f335b0ee3a1cba5c7007 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 24 Nov 2020 14:52:53 -0500 Subject: [PATCH] csi: remove nested read txn from volume write txns When making updates to CSI volumes, the state store methods that have open write transactions were querying the state store using the same methods used by the CSI RPC endpoint, but these method creates their own top-level read transactions. These have yet not been implicated in any bugs. Refactor the CSIVolume query methods to have an implementation method that accepts a transaction, which can be called with either a read txn or a write txn. --- nomad/state/state_store.go | 48 +++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 9f579f81be8..c17724c634e 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2133,7 +2133,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking vol := obj.(*structs.CSIVolume) - return s.CSIVolumeDenormalizePlugins(ws, vol.Copy()) + return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumes looks up csi_volumes by pluginID. Caller should snapshot if it @@ -2261,12 +2261,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } } - volume, err := s.CSIVolumeDenormalizePlugins(ws, orig.Copy()) + volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) if err != nil { return err } - - volume, err = s.CSIVolumeDenormalize(ws, volume) + volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) if err != nil { return err } @@ -2330,7 +2329,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // allocations have been stopped but claims can't be freed because // ex. the plugins have all been removed. if vol.InUse() { - if !force || !s.volSafeToForce(vol) { + if !force || !s.volSafeToForce(txn, vol) { return fmt.Errorf("volume in use: %s", id) } } @@ -2349,9 +2348,8 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // volSafeToForce checks if the any of the remaining allocations // are in a non-terminal state. -func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool { - ws := memdb.NewWatchSet() - vol, err := s.CSIVolumeDenormalize(ws, v) +func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { + vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) if err != nil { return false } @@ -2369,19 +2367,30 @@ func (s *StateStore) volSafeToForce(v *structs.CSIVolume) bool { return true } -// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and plugins, but -// without allocations -// Use this for current volume metadata, handling lists of volumes -// Use CSIVolumeDenormalize for volumes containing both health and current allocations +// CSIVolumeDenormalizePlugins returns a CSIVolume with current health and +// plugins, but without allocations. +// Use this for current volume metadata, handling lists of volumes. +// Use CSIVolumeDenormalize for volumes containing both health and current +// allocations. func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } - // Lookup CSIPlugin, the health records, and calculate volume health txn := s.db.ReadTxn() defer txn.Abort() + return s.CSIVolumeDenormalizePluginsTxn(txn, vol) +} - plug, err := s.CSIPluginByID(ws, vol.PluginID) +// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and +// plugins, but without allocations. +// Use this for current volume metadata, handling lists of volumes. +// Use CSIVolumeDenormalize for volumes containing both health and current +// allocations. +func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + if vol == nil { + return nil, nil + } + plug, err := s.CSIPluginByIDTxn(txn, nil, vol.PluginID) if err != nil { return nil, fmt.Errorf("plugin lookup error: %s %v", vol.PluginID, err) } @@ -2412,8 +2421,15 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs // CSIVolumeDenormalize returns a CSIVolume with allocations func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + txn := s.db.ReadTxn() + return s.CSIVolumeDenormalizeTxn(txn, ws, vol) +} + +// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations +func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { + for id := range vol.ReadAllocs { - a, err := s.AllocByID(ws, id) + a, err := s.allocByIDImpl(txn, ws, id) if err != nil { return nil, err } @@ -2434,7 +2450,7 @@ func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVol } for id := range vol.WriteAllocs { - a, err := s.AllocByID(ws, id) + a, err := s.allocByIDImpl(txn, ws, id) if err != nil { return nil, err }