Skip to content

Commit

Permalink
CSI: don't block client shutdown for node unmount (#12457)
Browse files Browse the repository at this point in the history
When we unmount a volume we need to be able to recover from cases
where the plugin has been shutdown before the allocation that needs
it, so in #11892 we blocked shutting down the alloc runner hook. But
this blocks client shutdown if we're in the middle of unmounting. The
client won't be able to communicate with the plugin or send the
unpublish RPC anyways, so we should cancel the context and assume that
we'll resume the unmounting process when the client restarts.

For `-dev` mode we don't send the graceful `Shutdown()` method and
instead destroy all the allocations. In this case, we'll never be able
to communicate with the plugin but also never close the context we
need to prevent the hook from blocking. To fix this, move the retries
into their own goroutine that doesn't block the main `Postrun`.
  • Loading branch information
tgross authored Apr 5, 2022
1 parent b7d19a6 commit a8d5e5e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
98 changes: 61 additions & 37 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@ import (
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager

// interfaces implemented by the allocRunner
rpcClient RPCer
taskCapabilityGetter taskCapabilityGetter
updater hookResourceSetter
nodeSecret string

nodeSecret string
volumeRequests map[string]*volumeAndRequest
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration

shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
}

// implemented by allocrunner
Expand All @@ -40,6 +45,9 @@ type taskCapabilityGetter interface {
}

func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, updater hookResourceSetter, nodeSecret string) *csiHook {

shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())

return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
Expand All @@ -52,6 +60,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
}

Expand All @@ -64,11 +74,6 @@ func (c *csiHook) Prerun() error {
return nil
}

// We use this context only to attach hclog to the gRPC context. The
// lifetime is the lifetime of the gRPC stream, not specific RPC timeouts,
// but we manage the stream lifetime via Close in the pluginmanager.
ctx := context.Background()

volumes, err := c.claimVolumesFromAlloc()
if err != nil {
return fmt.Errorf("claim volumes: %v", err)
Expand All @@ -77,7 +82,12 @@ func (c *csiHook) Prerun() error {

mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
mounter, err := c.csimanager.MounterForPlugin(ctx, pair.volume.PluginID)

// We use this context only to attach hclog to the gRPC
// context. The lifetime is the lifetime of the gRPC stream,
// not specific RPC timeouts, but we manage the stream
// lifetime via Close in the pluginmanager.
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
if err != nil {
return err
}
Expand All @@ -89,7 +99,8 @@ func (c *csiHook) Prerun() error {
MountOptions: pair.request.MountOptions,
}

mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts, pair.publishContext)
mountInfo, err := mounter.MountVolume(
c.shutdownCtx, pair.volume, c.alloc, usageOpts, pair.publishContext)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,21 +129,27 @@ func (c *csiHook) Postrun() error {

for _, pair := range c.volumeRequests {
wg.Add(1)

// CSI RPCs can potentially fail for a very long time if a
// node plugin has failed. split the work into goroutines so
// that operators could potentially reuse one of a set of
// volumes even if this hook is stuck waiting on the others
// CSI RPCs can potentially take a long time. Split the work
// into goroutines so that operators could potentially reuse
// one of a set of volumes
go func(pair *volumeAndRequest) {
defer wg.Done()

// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up
err := c.unmountWithRetry(pair)
err := c.unmountImpl(pair)
if err != nil {
errs <- err
return
// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up. Don't block shutdown so that
// we don't block shutting down the client in -dev mode
go func(pair *volumeAndRequest) {
err := c.unmountWithRetry(pair)
if err != nil {
c.logger.Error("volume could not be unmounted")
}
err = c.unpublish(pair)
if err != nil {
c.logger.Error("volume could not be unpublished")
}
}(pair)
}

// we can't recover from this RPC error client-side; the
Expand Down Expand Up @@ -236,12 +253,7 @@ func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error)
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()

var resp structs.CSIVolumeClaimResponse
Expand Down Expand Up @@ -348,12 +360,7 @@ func (c *csiHook) unpublish(pair *volumeAndRequest) error {
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()
var err error
backoff := c.minBackoffInterval
Expand Down Expand Up @@ -388,7 +395,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
// NodeEvent
func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {

mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID)
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, pair.volume.PluginID)
if err != nil {
return err
}
Expand All @@ -400,6 +407,23 @@ func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {
MountOptions: pair.request.MountOptions,
}

return mounter.UnmountVolume(context.TODO(),
return mounter.UnmountVolume(c.shutdownCtx,
pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts)
}

// Shutdown will get called when the client is gracefully
// stopping. Cancel our shutdown context so that we don't block client
// shutdown while in the CSI RPC retry loop.
func (c *csiHook) Shutdown() {
c.logger.Trace("shutting down hook")
c.shutdownCancelFn()
}

// Destroy will get called when an allocation gets GC'd on the client
// or when a -dev mode client is stopped. Cancel our shutdown context
// so that we don't block client shutdown while in the CSI RPC retry
// loop.
func (c *csiHook) Destroy() {
c.logger.Trace("destroying hook")
c.shutdownCancelFn()
}
4 changes: 0 additions & 4 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
var _ interfaces.RunnerPrerunHook = (*csiHook)(nil)
var _ interfaces.RunnerPostrunHook = (*csiHook)(nil)

// TODO https://github.com/hashicorp/nomad/issues/11786
// we should implement Update as well
// var _ interfaces.RunnerUpdateHook = (*csiHook)(nil)

func TestCSIHook(t *testing.T) {
ci.Parallel(t)

Expand Down

0 comments on commit a8d5e5e

Please sign in to comment.