From 707b4b3e0e5b3b8fb33b07561d1d3312ae5a055d Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 28 Jan 2022 08:30:31 -0500 Subject: [PATCH] CSI: node unmount from the client before unpublish RPC (#11892) When an allocation stops, the `csi_hook` makes an unpublish RPC to the servers to unpublish via the CSI RPCs: first to the node plugins and then the controller plugins. The controller RPCs must happen after the node RPCs so that the node has had a chance to unmount the volume before the controller tries to detach the associated device. But the client has local access to the node plugins and can independently determine if it's safe to send unpublish RPC to those plugins. This will allow the server to treat the node plugin as abandoned if a client is disconnected and `stop_on_client_disconnect` is set. This will let the server try to send unpublish RPCs to the controller plugins, under the assumption that the client will be trying to unmount the volume on its end first. Note that the CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can return ignorable errors in the case where the volume has already been unmounted from the node. Handle all other errors by retrying until we get success so as to give operators the opportunity to reschedule a failed node plugin (ex. in the case where they accidentally drained a node without `-ignore-system`). Fan-out the work for each volume into its own goroutine so that we can release a subset of volumes if only one is stuck. --- client/allocrunner/csi_hook.go | 160 ++++++++++++++++++---- client/allocrunner/csi_hook_test.go | 10 +- client/pluginmanager/csimanager/volume.go | 7 +- 3 files changed, 143 insertions(+), 34 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index e7e7385a3e8..6fb1b2866ab 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,8 @@ package allocrunner import ( "context" "fmt" + "sync" + "time" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -24,7 +26,9 @@ type csiHook struct { updater hookResourceSetter nodeSecret string - volumeRequests map[string]*volumeAndRequest + volumeRequests map[string]*volumeAndRequest + maxBackoffInterval time.Duration + maxBackoffDuration time.Duration } // implemented by allocrunner @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + maxBackoffInterval: time.Minute, + maxBackoffDuration: time.Hour * 24, } } @@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error { return nil } - var mErr *multierror.Error + var wg sync.WaitGroup + errs := make(chan error, len(c.volumeRequests)) for _, pair := range c.volumeRequests { + wg.Add(1) + + // CSI RPCs can potentially fail for a very long time if a + // node plugin has failed. split the work into goroutines so + // that operators could potentially reuse one of a set of + // volumes even if this hook is stuck waiting on the others + go func(pair *volumeAndRequest) { + defer wg.Done() + + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up + err := c.unmountWithRetry(pair) + if err != nil { + errs <- err + return + } - mode := structs.CSIVolumeClaimRead - if !pair.request.ReadOnly { - mode = structs.CSIVolumeClaimWrite - } + // we can't recover from this RPC error client-side; the + // volume claim GC job will have to clean up for us once + // the allocation is marked terminal + errs <- c.unpublish(pair) + }(pair) + } - source := pair.request.Source - if pair.request.PerAlloc { - // NOTE: PerAlloc can't be set if we have canaries - source = source + structs.AllocSuffix(c.alloc.Name) - } + wg.Wait() + close(errs) // so we don't block waiting if there were no errors - req := &structs.CSIVolumeUnpublishRequest{ - VolumeID: source, - Claim: &structs.CSIVolumeClaim{ - AllocationID: c.alloc.ID, - NodeID: c.alloc.NodeID, - Mode: mode, - State: structs.CSIVolumeClaimStateUnpublishing, - }, - WriteRequest: structs.WriteRequest{ - Region: c.alloc.Job.Region, - Namespace: c.alloc.Job.Namespace, - AuthToken: c.nodeSecret, - }, - } - err := c.rpcClient.RPC("CSIVolume.Unpublish", - req, &structs.CSIVolumeUnpublishResponse{}) - if err != nil { - mErr = multierror.Append(mErr, err) - } + var mErr *multierror.Error + for err := range errs { + mErr = multierror.Append(mErr, err) } + return mErr.ErrorOrNil() } @@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool { return false } + +func (c *csiHook) unpublish(pair *volumeAndRequest) error { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + + source := pair.request.Source + if pair.request.PerAlloc { + // NOTE: PerAlloc can't be set if we have canaries + source = source + structs.AllocSuffix(c.alloc.Name) + } + + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, + Namespace: c.alloc.Job.Namespace, + AuthToken: c.nodeSecret, + }, + } + + return c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + +} + +// unmountWithRetry tries to unmount/unstage the volume, retrying with +// exponential backoff capped to a maximum interval +func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + var err error + backoff := time.Second + ticker := time.NewTicker(backoff) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + + err = c.unmountImpl(pair) + if err == nil { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + ticker.Reset(backoff) + } + return nil +} + +// unmountImpl implements the call to the CSI plugin manager to +// unmount the volume. Each retry will write an "Unmount volume" +// NodeEvent +func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { + + mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) + if err != nil { + return err + } + + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: pair.request.AttachmentMode, + AccessMode: pair.request.AccessMode, + MountOptions: pair.request.MountOptions, + } + + return mounter.UnmountVolume(context.TODO(), + pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) +} diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 045ef3e0afc..d05d07385c3 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) { // "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)}, // }, // expectedMountCalls: 1, - // expectedUnmountCalls: 0, // not until this is done client-side + // expectedUnmountCalls: 1, // expectedClaimCalls: 1, // expectedUnpublishCalls: 1, // }, @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook.maxBackoffInterval = 100 * time.Millisecond + hook.maxBackoffDuration = 2 * time.Second + require.NotNil(t, hook) require.NoError(t, hook.Prerun()) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index c359f32e33a..599c3fe06db 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -348,11 +348,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } } + if errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + logger.Trace("unmounting volume failed with ignorable error", "error", err) + err = nil + } + event := structs.NewNodeEvent(). SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + if err == nil { event.AddDetail("success", "true") } else { event.AddDetail("success", "false")