From 2a995844340ceb9a31f04dce3e15e99380732beb Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 14 Jan 2022 16:38:38 -0500 Subject: [PATCH 1/2] csi: node unmount from the client before unpublish RPC When an allocation stops, the `csi_hook` makes an unpublish RPC to the servers to unpublish via the CSI RPCs: first to the node plugins and then the controller plugins. The controller RPCs must happen after the node RPCs so that the node has had a chance to unmount the volume before the controller tries to detach the associated device. But the client has local access to the node plugins and can independently determine if it's safe to send unpublish RPC to those plugins. This will allow the server to treat the node plugin as abandoned if a client is disconnected and `stop_on_client_disconnect` is set. This will let the server try to send unpublish RPCs to the controller plugins, under the assumption that the client will be trying to unmount the volume on its end first. --- client/allocrunner/csi_hook.go | 29 ++++++++++++++++++++++++++++- client/allocrunner/csi_hook_test.go | 6 +++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index e7e7385a3e8..89cb9bb4ded 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -107,6 +107,11 @@ func (c *csiHook) Postrun() error { for _, pair := range c.volumeRequests { + err := c.unmount(pair) + if err != nil { + mErr = multierror.Append(mErr, err) + } + mode := structs.CSIVolumeClaimRead if !pair.request.ReadOnly { mode = structs.CSIVolumeClaimWrite @@ -132,7 +137,7 @@ func (c *csiHook) Postrun() error { AuthToken: c.nodeSecret, }, } - err := c.rpcClient.RPC("CSIVolume.Unpublish", + err = c.rpcClient.RPC("CSIVolume.Unpublish", req, &structs.CSIVolumeUnpublishResponse{}) if err != nil { mErr = multierror.Append(mErr, err) @@ -231,3 +236,25 @@ func (c *csiHook) shouldRun() bool { return false } + +func (c *csiHook) unmount(pair *volumeAndRequest) error { + + mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) + if err != nil { + return err + } + + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: pair.request.AttachmentMode, + AccessMode: pair.request.AccessMode, + MountOptions: pair.request.MountOptions, + } + + err = mounter.UnmountVolume(context.TODO(), + pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) + if err != nil { + return err + } + return nil +} diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 045ef3e0afc..89b7f3f0a3f 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -59,7 +59,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -83,7 +83,7 @@ func TestCSIHook(t *testing.T) { "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)}, }, expectedMountCalls: 1, - expectedUnmountCalls: 0, // not until this is done client-side + expectedUnmountCalls: 1, expectedClaimCalls: 1, expectedUnpublishCalls: 1, }, @@ -122,7 +122,7 @@ func TestCSIHook(t *testing.T) { // "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)}, // }, // expectedMountCalls: 1, - // expectedUnmountCalls: 0, // not until this is done client-side + // expectedUnmountCalls: 1, // expectedClaimCalls: 1, // expectedUnpublishCalls: 1, // }, From 3b7dedbb14f48833f0a3748819589b5b59722e6b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 13:54:54 -0500 Subject: [PATCH 2/2] [squash me] address comments from code review The CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can return ignorable errors in the case where the volume has already been unmounted from the node. Handle all other errors by retrying until we get success so as to give operators the opportunity to reschedule a failed node plugin (ex. in the case where they accidentally drained a node without `-ignore-system`). Fan-out the work for each volume into its own goroutine so that we can release a subset of volumes if only one is stuck. --- client/allocrunner/csi_hook.go | 155 ++++++++++++++++------ client/allocrunner/csi_hook_test.go | 4 + client/pluginmanager/csimanager/volume.go | 7 +- 3 files changed, 124 insertions(+), 42 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 89cb9bb4ded..6fb1b2866ab 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,8 @@ package allocrunner import ( "context" "fmt" + "sync" + "time" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -24,7 +26,9 @@ type csiHook struct { updater hookResourceSetter nodeSecret string - volumeRequests map[string]*volumeAndRequest + volumeRequests map[string]*volumeAndRequest + maxBackoffInterval time.Duration + maxBackoffDuration time.Duration } // implemented by allocrunner @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + maxBackoffInterval: time.Minute, + maxBackoffDuration: time.Hour * 24, } } @@ -103,46 +109,43 @@ func (c *csiHook) Postrun() error { return nil } - var mErr *multierror.Error + var wg sync.WaitGroup + errs := make(chan error, len(c.volumeRequests)) for _, pair := range c.volumeRequests { + wg.Add(1) + + // CSI RPCs can potentially fail for a very long time if a + // node plugin has failed. split the work into goroutines so + // that operators could potentially reuse one of a set of + // volumes even if this hook is stuck waiting on the others + go func(pair *volumeAndRequest) { + defer wg.Done() + + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up + err := c.unmountWithRetry(pair) + if err != nil { + errs <- err + return + } - err := c.unmount(pair) - if err != nil { - mErr = multierror.Append(mErr, err) - } - - mode := structs.CSIVolumeClaimRead - if !pair.request.ReadOnly { - mode = structs.CSIVolumeClaimWrite - } + // we can't recover from this RPC error client-side; the + // volume claim GC job will have to clean up for us once + // the allocation is marked terminal + errs <- c.unpublish(pair) + }(pair) + } - source := pair.request.Source - if pair.request.PerAlloc { - // NOTE: PerAlloc can't be set if we have canaries - source = source + structs.AllocSuffix(c.alloc.Name) - } + wg.Wait() + close(errs) // so we don't block waiting if there were no errors - req := &structs.CSIVolumeUnpublishRequest{ - VolumeID: source, - Claim: &structs.CSIVolumeClaim{ - AllocationID: c.alloc.ID, - NodeID: c.alloc.NodeID, - Mode: mode, - State: structs.CSIVolumeClaimStateUnpublishing, - }, - WriteRequest: structs.WriteRequest{ - Region: c.alloc.Job.Region, - Namespace: c.alloc.Job.Namespace, - AuthToken: c.nodeSecret, - }, - } - err = c.rpcClient.RPC("CSIVolume.Unpublish", - req, &structs.CSIVolumeUnpublishResponse{}) - if err != nil { - mErr = multierror.Append(mErr, err) - } + var mErr *multierror.Error + for err := range errs { + mErr = multierror.Append(mErr, err) } + return mErr.ErrorOrNil() } @@ -237,7 +240,81 @@ func (c *csiHook) shouldRun() bool { return false } -func (c *csiHook) unmount(pair *volumeAndRequest) error { +func (c *csiHook) unpublish(pair *volumeAndRequest) error { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + + source := pair.request.Source + if pair.request.PerAlloc { + // NOTE: PerAlloc can't be set if we have canaries + source = source + structs.AllocSuffix(c.alloc.Name) + } + + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, + Namespace: c.alloc.Job.Namespace, + AuthToken: c.nodeSecret, + }, + } + + return c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + +} + +// unmountWithRetry tries to unmount/unstage the volume, retrying with +// exponential backoff capped to a maximum interval +func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + var err error + backoff := time.Second + ticker := time.NewTicker(backoff) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + + err = c.unmountImpl(pair) + if err == nil { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + ticker.Reset(backoff) + } + return nil +} + +// unmountImpl implements the call to the CSI plugin manager to +// unmount the volume. Each retry will write an "Unmount volume" +// NodeEvent +func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) if err != nil { @@ -251,10 +328,6 @@ func (c *csiHook) unmount(pair *volumeAndRequest) error { MountOptions: pair.request.MountOptions, } - err = mounter.UnmountVolume(context.TODO(), + return mounter.UnmountVolume(context.TODO(), pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) - if err != nil { - return err - } - return nil } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 89b7f3f0a3f..d05d07385c3 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook.maxBackoffInterval = 100 * time.Millisecond + hook.maxBackoffDuration = 2 * time.Second + require.NotNil(t, hook) require.NoError(t, hook.Prerun()) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 9bca471cf08..4c6bf1d144e 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -353,11 +353,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } } + if errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + logger.Trace("unmounting volume failed with ignorable error", "error", err) + err = nil + } + event := structs.NewNodeEvent(). SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + if err == nil { event.AddDetail("success", "true") } else { event.AddDetail("success", "false")