Skip to content

Commit

Permalink
csi: ensure that PastClaims are populated with correct mode (#11932)
Browse files Browse the repository at this point in the history
In the client's `(*csiHook) Postrun()` method, we make an unpublish
RPC that includes a claim in the `CSIVolumeClaimStateUnpublishing`
state and using the mode from the client. But then in the
`(*CSIVolume) Unpublish` RPC handler, we query the volume from the
state store (because we only get an ID from the client). And when we
make the client RPC for the node unpublish step, we use the _current
volume's_ view of the mode. If the volume's mode has been changed
before the old allocations can have their claims released, then we end
up making a CSI RPC that will never succeed.

Why does this code path get the mode from the volume and not the
claim? Because the claim written by the GC job in `(*CoreScheduler)
csiVolumeClaimGC` doesn't have a mode. Instead it just writes a claim
in the unpublishing state to ensure the volumewatcher detects a "past
claim" change and reaps all the claims on the volumes.

Fix this by ensuring that the `CSIVolumeDenormalize` creates past
claims for all nil allocations with a correct access mode set.
  • Loading branch information
tgross committed Jan 28, 2022
1 parent cdbb2bc commit 7181e96
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 70 deletions.
136 changes: 68 additions & 68 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st

// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy())
return s.csiVolumeDenormalizePluginsTxn(txn, vol.Copy())
}

// CSIVolumesByPluginID looks up csi_volumes by pluginID. Caller should
Expand Down Expand Up @@ -2326,11 +2326,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
}
}

volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy())
volume, err := s.csiVolumeDenormalizePluginsTxn(txn, orig.Copy())
if err != nil {
return err
}
volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume)
volume, err = s.csiVolumeDenormalizeTxn(txn, nil, volume)
if err != nil {
return err
}
Expand Down Expand Up @@ -2414,7 +2414,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
// volSafeToForce checks if the any of the remaining allocations
// are in a non-terminal state.
func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v)
vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v)
if err != nil {
return false
}
Expand Down Expand Up @@ -2443,15 +2443,12 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
}
txn := s.db.ReadTxn()
defer txn.Abort()
return s.CSIVolumeDenormalizePluginsTxn(txn, vol)
return s.csiVolumeDenormalizePluginsTxn(txn, vol)
}

// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and
// plugins, but without allocations.
// Use this for current volume metadata, handling lists of volumes.
// Use CSIVolumeDenormalize for volumes containing both health and current
// allocations.
func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
// csiVolumeDenormalizePluginsTxn implements
// CSIVolumeDenormalizePlugins, inside a transaction.
func (s *StateStore) csiVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
Expand Down Expand Up @@ -2484,80 +2481,83 @@ func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVol
return vol, nil
}

// CSIVolumeDenormalize returns a CSIVolume with allocations
// CSIVolumeDenormalize returns a CSIVolume with its current
// Allocations and Claims, including creating new PastClaims for
// terminal or garbage collected allocations. This ensures we have a
// consistent state. Note that it mutates the original volume and so
// should always be called on a Copy after reading from the state
// store.
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
txn := s.db.ReadTxn()
return s.CSIVolumeDenormalizeTxn(txn, ws, vol)
return s.csiVolumeDenormalizeTxn(txn, ws, vol)
}

// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations
func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
// csiVolumeDenormalizeTxn implements CSIVolumeDenormalize inside a transaction
func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
for id := range vol.ReadAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
if a != nil {
vol.ReadAllocs[id] = a
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. (same for WriteAlloc below)
if _, ok := vol.ReadClaims[id]; !ok {
vol.ReadClaims[id] = &structs.CSIVolumeClaim{

// note: denormalize mutates the maps we pass in!
denormalize := func(
currentAllocs map[string]*structs.Allocation,
currentClaims, pastClaims map[string]*structs.CSIVolumeClaim,
fallbackMode structs.CSIVolumeClaimMode) error {

for id := range currentAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return err
}
pastClaim := pastClaims[id]
currentClaim := currentClaims[id]
if currentClaim == nil {
// COMPAT(1.4.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. No clusters should see this
// anymore, so warn nosily in the logs so that
// operators ask us about it. Remove this block and
// the now-unused fallbackMode parameter, and return
// an error if currentClaim is nil in 1.4.0
s.logger.Warn("volume was missing claim for allocation",
"volume_id", vol.ID, "alloc", id)
currentClaim = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimRead,
Mode: fallbackMode,
State: structs.CSIVolumeClaimStateTaken,
}
currentClaims[id] = currentClaim
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateUnpublishing,
}
readClaim := vol.ReadClaims[id]
if readClaim != nil {
vol.PastClaims[id].NodeID = readClaim.NodeID
}
}
}

for id := range vol.WriteAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
if a != nil {
vol.WriteAllocs[id] = a
if _, ok := vol.WriteClaims[id]; !ok {
vol.WriteClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
currentAllocs[id] = a
if a == nil && pastClaim == nil {
// the alloc is garbage collected but nothing has written a PastClaim,
// so create one now
pastClaim = &structs.CSIVolumeClaim{
AllocationID: id,
NodeID: currentClaim.NodeID,
Mode: currentClaim.Mode,
State: structs.CSIVolumeClaimStateUnpublishing,
AccessMode: currentClaim.AccessMode,
AttachmentMode: currentClaim.AttachmentMode,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims

vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateUnpublishing,
}
writeClaim := vol.WriteClaims[id]
if writeClaim != nil {
vol.PastClaims[id].NodeID = writeClaim.NodeID
pastClaims[id] = pastClaim
}

}
return nil
}

err := denormalize(vol.ReadAllocs, vol.ReadClaims, vol.PastClaims,
structs.CSIVolumeClaimRead)
if err != nil {
return nil, err
}
err = denormalize(vol.WriteAllocs, vol.WriteClaims, vol.PastClaims,
structs.CSIVolumeClaimWrite)
if err != nil {
return nil, err
}

// COMPAT: the AccessMode and AttachmentMode fields were added to claims
Expand Down
5 changes: 3 additions & 2 deletions nomad/state/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
allocID2 := uuid.Generate() // nil alloc

alloc1 := mock.Alloc()
alloc1.ClientStatus = "complete"
alloc1.DesiredStatus = "stop"
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc1.DesiredStatus = structs.AllocDesiredStatusRun

// Insert allocs into the state store
err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
Expand Down Expand Up @@ -303,6 +303,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
NodesHealthy: 2,
NodesExpected: 0,
}
vol = vol.Copy() // canonicalize

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
if err != nil {
Expand Down

0 comments on commit 7181e96

Please sign in to comment.