From 0d8952a30add6124c25f697ba1bf95201d06ddb4 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 24 Jan 2022 11:49:50 -0500 Subject: [PATCH] csi: update leader's ACL in volumewatcher (#11891) The volumewatcher that runs on the leader needs to make RPC calls rather than writing to raft (as we do in the deploymentwatcher) because the unpublish workflow needs to make RPC calls to the clients. This requires that the volumewatcher has access to the leader's ACL token. But when leadership transitions, the new leader creates a new leader ACL token. This ACL token needs to be passed into the volumewatcher when we enable it, otherwise the volumewatcher can find itself with a stale token. --- .changelog/11891.txt | 3 +++ nomad/leader.go | 4 ++-- nomad/volumewatcher/volumes_watcher.go | 7 ++++--- nomad/volumewatcher/volumes_watcher_test.go | 14 +++++++------- 4 files changed, 16 insertions(+), 12 deletions(-) create mode 100644 .changelog/11891.txt diff --git a/.changelog/11891.txt b/.changelog/11891.txt new file mode 100644 index 00000000000..f09bf8e1329 --- /dev/null +++ b/.changelog/11891.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where releasing volume claims would fail with ACL errors after leadership transitions. +``` diff --git a/nomad/leader.go b/nomad/leader.go index 1ec7bb5e388..93d6ac7d781 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -263,7 +263,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.nodeDrainer.SetEnabled(true, s.State()) // Enable the volume watcher, since we are now the leader - s.volumeWatcher.SetEnabled(true, s.State()) + s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) // Restore the eval broker state if err := s.restoreEvals(); err != nil { @@ -1053,7 +1053,7 @@ func (s *Server) revokeLeadership() error { s.nodeDrainer.SetEnabled(false, nil) // Disable the volume watcher - s.volumeWatcher.SetEnabled(false, nil) + s.volumeWatcher.SetEnabled(false, nil, "") // Disable any enterprise systems required. if err := s.revokeEnterpriseLeadership(); err != nil { diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index 061df061398..c8c687addea 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -57,14 +57,15 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W // SetEnabled is used to control if the watcher is enabled. The // watcher should only be enabled on the active leader. When being -// enabled the state is passed in as it is no longer valid once a -// leader election has taken place. -func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) { +// enabled the state and leader's ACL is passed in as it is no longer +// valid once a leader election has taken place. +func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore, leaderAcl string) { w.wlock.Lock() defer w.wlock.Unlock() wasEnabled := w.enabled w.enabled = enabled + w.leaderAcl = leaderAcl if state != nil { w.state = state diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 185a7225e68..7f0365be351 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -23,7 +23,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) @@ -48,7 +48,7 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) } @@ -70,7 +70,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") index++ err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) @@ -94,7 +94,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // watches for that change will fire on the new watcher // step-down (this is sync) - watcher.SetEnabled(false, nil) + watcher.SetEnabled(false, nil, "") require.Equal(0, len(watcher.watchers)) // allocation is now invalid @@ -116,7 +116,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // create a new watcher and enable it to simulate the leadership // transition watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Eventually(func() bool { watcher.wlock.RLock() @@ -142,7 +142,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin() @@ -237,7 +237,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") - watcher.SetEnabled(true, srv.State()) + watcher.SetEnabled(true, srv.State(), "") require.Equal(0, len(watcher.watchers)) plugin := mock.CSIPlugin()