Skip to content

Commit

Permalink
CSI: node unmount from the client before unpublish RPC (#11892)
Browse files Browse the repository at this point in the history
When an allocation stops, the `csi_hook` makes an unpublish RPC to the
servers to unpublish via the CSI RPCs: first to the node plugins and
then the controller plugins. The controller RPCs must happen after the
node RPCs so that the node has had a chance to unmount the volume
before the controller tries to detach the associated device.

But the client has local access to the node plugins and can
independently determine if it's safe to send unpublish RPC to those
plugins. This will allow the server to treat the node plugin as
abandoned if a client is disconnected and `stop_on_client_disconnect`
is set. This will let the server try to send unpublish RPCs to the
controller plugins, under the assumption that the client will be
trying to unmount the volume on its end first.

Note that the CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can 
return ignorable errors in the case where the volume has already been
unmounted from the node. Handle all other errors by retrying until we
get success so as to give operators the opportunity to reschedule a
failed node plugin (ex. in the case where they accidentally drained a
node without `-ignore-system`). Fan-out the work for each volume into
its own goroutine so that we can release a subset of volumes if only
one is stuck.
  • Loading branch information
tgross committed Jan 28, 2022
1 parent 593b8db commit 707b4b3
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 34 deletions.
160 changes: 130 additions & 30 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package allocrunner
import (
"context"
"fmt"
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
Expand All @@ -24,7 +26,9 @@ type csiHook struct {
updater hookResourceSetter
nodeSecret string

volumeRequests map[string]*volumeAndRequest
volumeRequests map[string]*volumeAndRequest
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}

// implemented by allocrunner
Expand All @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
}

Expand Down Expand Up @@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error {
return nil
}

var mErr *multierror.Error
var wg sync.WaitGroup
errs := make(chan error, len(c.volumeRequests))

for _, pair := range c.volumeRequests {
wg.Add(1)

// CSI RPCs can potentially fail for a very long time if a
// node plugin has failed. split the work into goroutines so
// that operators could potentially reuse one of a set of
// volumes even if this hook is stuck waiting on the others
go func(pair *volumeAndRequest) {
defer wg.Done()

// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up
err := c.unmountWithRetry(pair)
if err != nil {
errs <- err
return
}

mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
// we can't recover from this RPC error client-side; the
// volume claim GC job will have to clean up for us once
// the allocation is marked terminal
errs <- c.unpublish(pair)
}(pair)
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
wg.Wait()
close(errs) // so we don't block waiting if there were no errors

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
mErr = multierror.Append(mErr, err)
}
var mErr *multierror.Error
for err := range errs {
mErr = multierror.Append(mErr, err)
}

return mErr.ErrorOrNil()
}

Expand Down Expand Up @@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool {

return false
}

func (c *csiHook) unpublish(pair *volumeAndRequest) error {

mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}

return c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})

}

// unmountWithRetry tries to unmount/unstage the volume, retrying with
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
ticker := time.NewTicker(backoff)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return err
case <-ticker.C:
}

err = c.unmountImpl(pair)
if err == nil {
break
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
ticker.Reset(backoff)
}
return nil
}

// unmountImpl implements the call to the CSI plugin manager to
// unmount the volume. Each retry will write an "Unmount volume"
// NodeEvent
func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {

mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID)
if err != nil {
return err
}

usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: pair.request.AttachmentMode,
AccessMode: pair.request.AccessMode,
MountOptions: pair.request.MountOptions,
}

return mounter.UnmountVolume(context.TODO(),
pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts)
}
10 changes: 7 additions & 3 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
Expand All @@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) {
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
// },
// expectedMountCalls: 1,
// expectedUnmountCalls: 0, // not until this is done client-side
// expectedUnmountCalls: 1,
// expectedClaimCalls: 1,
// expectedUnpublishCalls: 1,
// },
Expand All @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second

require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
Expand Down
7 changes: 6 additions & 1 deletion client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
}
}

if errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
logger.Trace("unmounting volume failed with ignorable error", "error", err)
err = nil
}

event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
if err == nil {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
Expand Down

0 comments on commit 707b4b3

Please sign in to comment.