Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI: node unmount from the client before unpublish RPC #11892

Merged
merged 2 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 130 additions & 30 deletions client/allocrunner/csi_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package allocrunner
import (
"context"
"fmt"
"sync"
"time"

hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
Expand All @@ -24,7 +26,9 @@ type csiHook struct {
updater hookResourceSetter
nodeSecret string

volumeRequests map[string]*volumeAndRequest
volumeRequests map[string]*volumeAndRequest
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}

// implemented by allocrunner
Expand All @@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
}

Expand Down Expand Up @@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error {
return nil
}

var mErr *multierror.Error
var wg sync.WaitGroup
errs := make(chan error, len(c.volumeRequests))

for _, pair := range c.volumeRequests {
wg.Add(1)

// CSI RPCs can potentially fail for a very long time if a
// node plugin has failed. split the work into goroutines so
// that operators could potentially reuse one of a set of
// volumes even if this hook is stuck waiting on the others
go func(pair *volumeAndRequest) {
defer wg.Done()

// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up
err := c.unmountWithRetry(pair)
if err != nil {
errs <- err
return
}

mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
// we can't recover from this RPC error client-side; the
// volume claim GC job will have to clean up for us once
// the allocation is marked terminal
errs <- c.unpublish(pair)
}(pair)
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
wg.Wait()
close(errs) // so we don't block waiting if there were no errors

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
mErr = multierror.Append(mErr, err)
}
var mErr *multierror.Error
for err := range errs {
mErr = multierror.Append(mErr, err)
}

return mErr.ErrorOrNil()
}

Expand Down Expand Up @@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool {

return false
}

func (c *csiHook) unpublish(pair *volumeAndRequest) error {

mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}

source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}

req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}

return c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})

}

// unmountWithRetry tries to unmount/unstage the volume, retrying with
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {

// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
ticker := time.NewTicker(backoff)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return err
case <-ticker.C:
}

err = c.unmountImpl(pair)
if err == nil {
break
}

if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
ticker.Reset(backoff)
}
return nil
}

// unmountImpl implements the call to the CSI plugin manager to
// unmount the volume. Each retry will write an "Unmount volume"
// NodeEvent
func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {

mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID)
if err != nil {
return err
}

usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: pair.request.AttachmentMode,
AccessMode: pair.request.AccessMode,
MountOptions: pair.request.MountOptions,
}

return mounter.UnmountVolume(context.TODO(),
pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts)
}
10 changes: 7 additions & 3 deletions client/allocrunner/csi_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
Expand All @@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) {
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
// },
// expectedMountCalls: 1,
// expectedUnmountCalls: 0, // not until this is done client-side
// expectedUnmountCalls: 1,
// expectedClaimCalls: 1,
// expectedUnpublishCalls: 1,
// },
Expand All @@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second

require.NotNil(t, hook)

require.NoError(t, hook.Prerun())
Expand Down
7 changes: 6 additions & 1 deletion client/pluginmanager/csimanager/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,11 +353,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
}
}

if errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
logger.Trace("unmounting volume failed with ignorable error", "error", err)
err = nil
}

event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
if err == nil {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
Expand Down