From 3b7dedbb14f48833f0a3748819589b5b59722e6b Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 13:54:54 -0500 Subject: [PATCH] [squash me] address comments from code review The CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can return ignorable errors in the case where the volume has already been unmounted from the node. Handle all other errors by retrying until we get success so as to give operators the opportunity to reschedule a failed node plugin (ex. in the case where they accidentally drained a node without `-ignore-system`). Fan-out the work for each volume into its own goroutine so that we can release a subset of volumes if only one is stuck. --- client/allocrunner/csi_hook.go | 155 ++++++++++++++++------ client/allocrunner/csi_hook_test.go | 4 + client/pluginmanager/csimanager/volume.go | 7 +- 3 files changed, 124 insertions(+), 42 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 89cb9bb4ded..6fb1b2866ab 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -3,6 +3,8 @@ package allocrunner import ( "context" "fmt" + "sync" + "time" hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" @@ -24,7 +26,9 @@ type csiHook struct { updater hookResourceSetter nodeSecret string - volumeRequests map[string]*volumeAndRequest + volumeRequests map[string]*volumeAndRequest + maxBackoffInterval time.Duration + maxBackoffDuration time.Duration } // implemented by allocrunner @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M updater: updater, nodeSecret: nodeSecret, volumeRequests: map[string]*volumeAndRequest{}, + maxBackoffInterval: time.Minute, + maxBackoffDuration: time.Hour * 24, } } @@ -103,46 +109,43 @@ func (c *csiHook) Postrun() error { return nil } - var mErr *multierror.Error + var wg sync.WaitGroup + errs := make(chan error, len(c.volumeRequests)) for _, pair := range c.volumeRequests { + wg.Add(1) + + // CSI RPCs can potentially fail for a very long time if a + // node plugin has failed. split the work into goroutines so + // that operators could potentially reuse one of a set of + // volumes even if this hook is stuck waiting on the others + go func(pair *volumeAndRequest) { + defer wg.Done() + + // we can recover an unmount failure if the operator + // brings the plugin back up, so retry every few minutes + // but eventually give up + err := c.unmountWithRetry(pair) + if err != nil { + errs <- err + return + } - err := c.unmount(pair) - if err != nil { - mErr = multierror.Append(mErr, err) - } - - mode := structs.CSIVolumeClaimRead - if !pair.request.ReadOnly { - mode = structs.CSIVolumeClaimWrite - } + // we can't recover from this RPC error client-side; the + // volume claim GC job will have to clean up for us once + // the allocation is marked terminal + errs <- c.unpublish(pair) + }(pair) + } - source := pair.request.Source - if pair.request.PerAlloc { - // NOTE: PerAlloc can't be set if we have canaries - source = source + structs.AllocSuffix(c.alloc.Name) - } + wg.Wait() + close(errs) // so we don't block waiting if there were no errors - req := &structs.CSIVolumeUnpublishRequest{ - VolumeID: source, - Claim: &structs.CSIVolumeClaim{ - AllocationID: c.alloc.ID, - NodeID: c.alloc.NodeID, - Mode: mode, - State: structs.CSIVolumeClaimStateUnpublishing, - }, - WriteRequest: structs.WriteRequest{ - Region: c.alloc.Job.Region, - Namespace: c.alloc.Job.Namespace, - AuthToken: c.nodeSecret, - }, - } - err = c.rpcClient.RPC("CSIVolume.Unpublish", - req, &structs.CSIVolumeUnpublishResponse{}) - if err != nil { - mErr = multierror.Append(mErr, err) - } + var mErr *multierror.Error + for err := range errs { + mErr = multierror.Append(mErr, err) } + return mErr.ErrorOrNil() } @@ -237,7 +240,81 @@ func (c *csiHook) shouldRun() bool { return false } -func (c *csiHook) unmount(pair *volumeAndRequest) error { +func (c *csiHook) unpublish(pair *volumeAndRequest) error { + + mode := structs.CSIVolumeClaimRead + if !pair.request.ReadOnly { + mode = structs.CSIVolumeClaimWrite + } + + source := pair.request.Source + if pair.request.PerAlloc { + // NOTE: PerAlloc can't be set if we have canaries + source = source + structs.AllocSuffix(c.alloc.Name) + } + + req := &structs.CSIVolumeUnpublishRequest{ + VolumeID: source, + Claim: &structs.CSIVolumeClaim{ + AllocationID: c.alloc.ID, + NodeID: c.alloc.NodeID, + Mode: mode, + State: structs.CSIVolumeClaimStateUnpublishing, + }, + WriteRequest: structs.WriteRequest{ + Region: c.alloc.Job.Region, + Namespace: c.alloc.Job.Namespace, + AuthToken: c.nodeSecret, + }, + } + + return c.rpcClient.RPC("CSIVolume.Unpublish", + req, &structs.CSIVolumeUnpublishResponse{}) + +} + +// unmountWithRetry tries to unmount/unstage the volume, retrying with +// exponential backoff capped to a maximum interval +func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error { + + // note: allocrunner hooks don't have access to the client's + // shutdown context, just the allocrunner's shutdown; if we make + // it available in the future we should thread it through here so + // that retry can exit gracefully instead of dropping the + // in-flight goroutine + ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration) + defer cancel() + var err error + backoff := time.Second + ticker := time.NewTicker(backoff) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + + err = c.unmountImpl(pair) + if err == nil { + break + } + + if backoff < c.maxBackoffInterval { + backoff = backoff * 2 + if backoff > c.maxBackoffInterval { + backoff = c.maxBackoffInterval + } + } + ticker.Reset(backoff) + } + return nil +} + +// unmountImpl implements the call to the CSI plugin manager to +// unmount the volume. Each retry will write an "Unmount volume" +// NodeEvent +func (c *csiHook) unmountImpl(pair *volumeAndRequest) error { mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID) if err != nil { @@ -251,10 +328,6 @@ func (c *csiHook) unmount(pair *volumeAndRequest) error { MountOptions: pair.request.MountOptions, } - err = mounter.UnmountVolume(context.TODO(), + return mounter.UnmountVolume(context.TODO(), pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts) - if err != nil { - return err - } - return nil } diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 89b7f3f0a3f..d05d07385c3 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) { }, } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret") + hook.maxBackoffInterval = 100 * time.Millisecond + hook.maxBackoffDuration = 2 * time.Second + require.NotNil(t, hook) require.NoError(t, hook.Prerun()) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 9bca471cf08..4c6bf1d144e 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -353,11 +353,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } } + if errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + logger.Trace("unmounting volume failed with ignorable error", "error", err) + err = nil + } + event := structs.NewNodeEvent(). SetSubsystem(structs.NodeEventSubsystemStorage). SetMessage("Unmount volume"). AddDetail("volume_id", volID) - if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { + if err == nil { event.AddDetail("success", "true") } else { event.AddDetail("success", "false")