Skip to content

Commit

Permalink
Platform agnostic device removal
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Mar 26, 2022
1 parent 99859b5 commit 019dd49
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 22 deletions.
28 changes: 28 additions & 0 deletions pkg/driver/mock_mount.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/driver/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Mounter interface {
MakeDir(path string) error
PathExists(path string) (bool, error)
NeedResize(devicePath string, deviceMountPath string) (bool, error)
Unpublish(path string) error
Unstage(path string) error
}

// NodeMounter implements Mounter.
Expand Down
11 changes: 10 additions & 1 deletion pkg/driver/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ package driver

import (
"fmt"
"k8s.io/klog"
"os"
"strconv"
"strings"

"k8s.io/klog"

mountutils "k8s.io/mount-utils"
)

Expand Down Expand Up @@ -188,3 +189,11 @@ func (m *NodeMounter) parseFsInfoOutput(cmdOutput string, spliter string, blockS
}
return blockSize, blockCount, err
}

func (m *NodeMounter) Unpublish(path string) error {
return m.Unmount(path)
}

func (m *NodeMounter) Unstage(path string) error {
return m.Unmount(path)
}
36 changes: 36 additions & 0 deletions pkg/driver/mount_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,39 @@ func (m *NodeMounter) NeedResize(devicePath string, deviceMountPath string) (boo
// Implement it to respect spec v1.4.0 https://github.com/container-storage-interface/spec/pull/452
return false, nil
}

// Unmount volume from target path
func (m *NodeMounter) Unpublish(target string) error {
proxyMounter, ok := m.SafeFormatAndMount.Interface.(*mounter.CSIProxyMounter)
if !ok {
return fmt.Errorf("failed to cast mounter to csi proxy mounter")
}
// WriteVolumeCache before unmount
proxyMounter.WriteVolumeCache(target)
// Remove symlink
err := proxyMounter.Rmdir(target)
if err != nil {
return err
}
return nil
}

// Unmount volume from staging path
// usually this staging path is a global directory on the node
func (m *NodeMounter) Unstage(target string) error {
proxyMounter, ok := m.SafeFormatAndMount.Interface.(*mounter.CSIProxyMounter)
if !ok {
return fmt.Errorf("failed to cast mounter to csi proxy mounter")
}
// Unmounts and offlines the disk via the CSI Proxy API
err := proxyMounter.Unmount(target)
if err != nil {
return err
}
// Cleanup stage path
err = proxyMounter.Rmdir(target)
if err != nil {
return err
}
return nil
}
6 changes: 3 additions & 3 deletions pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (d *nodeService) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
}

klog.V(4).Infof("NodeUnstageVolume: unmounting %s", target)
err = d.mounter.Unmount(target)
err = d.mounter.Unstage(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not unmount target %q: %v", target, err)
}
Expand Down Expand Up @@ -416,7 +416,7 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
}()

klog.V(4).Infof("NodeUnpublishVolume: unmounting %s", target)
err := d.mounter.Unmount(target)
err := d.mounter.Unpublish(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not unmount %q: %v", target, err)
}
Expand Down Expand Up @@ -620,7 +620,7 @@ func (d *nodeService) isMounted(source string, target string) (bool, error) {
_, pathErr := d.mounter.PathExists(target)
if pathErr != nil && d.mounter.IsCorruptedMnt(pathErr) {
klog.V(4).Infof("NodePublishVolume: Target path %q is a corrupted mount. Trying to unmount.", target)
if mntErr := d.mounter.Unmount(target); mntErr != nil {
if mntErr := d.mounter.Unpublish(target); mntErr != nil {
return false, status.Errorf(codes.Internal, "Unable to unmount the target %q : %v", target, mntErr)
}
//After successful unmount, the device is ready to be mounted.
Expand Down
14 changes: 7 additions & 7 deletions pkg/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func TestNodeUnstageVolume(t *testing.T) {
}

mockMounter.EXPECT().GetDeviceNameFromMount(gomock.Eq(targetPath)).Return(devicePath, 1, nil)
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Unstage(gomock.Eq(targetPath)).Return(nil)

req := &csi.NodeUnstageVolumeRequest{
StagingTargetPath: targetPath,
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestNodeUnstageVolume(t *testing.T) {
}

mockMounter.EXPECT().GetDeviceNameFromMount(gomock.Eq(targetPath)).Return(devicePath, 2, nil)
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Unstage(gomock.Eq(targetPath)).Return(nil)

req := &csi.NodeUnstageVolumeRequest{
StagingTargetPath: targetPath,
Expand Down Expand Up @@ -737,7 +737,7 @@ func TestNodePublishVolume(t *testing.T) {

mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(true, errors.New("internal system error"))
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Unpublish(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Mount(gomock.Eq(stagingTargetPath), gomock.Eq(targetPath), gomock.Eq(defaultFsType), gomock.Eq([]string{"bind"})).Return(nil)

req := &csi.NodePublishVolumeRequest{
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func TestNodePublishVolume(t *testing.T) {

mockMounter.EXPECT().MakeDir(gomock.Eq("/test")).Return(nil)
mockMounter.EXPECT().MakeFile(targetPath).Return(nil)
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Unpublish(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(true, errors.New("Internal System Error"))
mockMounter.EXPECT().Mount(gomock.Eq(devicePath), gomock.Eq(targetPath), gomock.Any(), gomock.Any()).Return(nil)

Expand Down Expand Up @@ -1598,7 +1598,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
VolumeId: volumeID,
}

mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
mockMounter.EXPECT().Unpublish(gomock.Eq(targetPath)).Return(nil)
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
if err != nil {
t.Fatalf("Expect no error but got: %v", err)
Expand Down Expand Up @@ -1656,7 +1656,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
},
},
{
name: "fail error on unmount",
name: "fail error on unpublish",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
defer mockCtl.Finish()
Expand All @@ -1677,7 +1677,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
VolumeId: volumeID,
}

mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(errors.New("test Unmount error"))
mockMounter.EXPECT().Unpublish(gomock.Eq(targetPath)).Return(errors.New("test Unpublish error"))
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
expectErr(t, err, codes.Internal)
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/driver/sanity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ func (f *fakeMounter) Unmount(target string) error {
return nil
}

func (f *fakeMounter) Unstage(target string) error {
return nil
}

func (f *fakeMounter) Unpublish(target string) error {
return nil
}

func (f *fakeMounter) List() ([]mount.MountPoint, error) {
return []mount.MountPoint{}, nil
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/mounter/README

This file was deleted.

51 changes: 43 additions & 8 deletions pkg/mounter/safe_mounter_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,47 @@ func (mounter *CSIProxyMounter) Mount(source string, target string, fstype strin
return nil
}

func (mounter *CSIProxyMounter) Unmount(target string) error {
// Find the volume id
getVolumeIdRequest := &volume.GetVolumeIDFromTargetPathRequest{
TargetPath: normalizeWindowsPath(target),
}
volumeIdResponse, err := mounter.VolumeClient.GetVolumeIDFromTargetPath(context.Background(), getVolumeIdRequest)
volumeId := volumeIdResponse.GetVolumeId()

// Call UnmountVolume CSI proxy function which flushes data cache to disk and removes the global staging path
unmountVolumeRequest := &volume.UnmountVolumeRequest{
VolumeId: volumeId,
TargetPath: normalizeWindowsPath(target),
}
_, err = mounter.VolumeClient.UnmountVolume(context.Background(), unmountVolumeRequest)
if err != nil {
return err
}

// Get disk number
getDiskNumberRequest := &volume.GetDiskNumberFromVolumeIDRequest{
VolumeId: volumeId,
}
getDiskNumberResponse, err := mounter.VolumeClient.GetDiskNumberFromVolumeID(context.Background(), getDiskNumberRequest)
diskNumber := getDiskNumberResponse.GetDiskNumber()
if err != nil {
return err
}

// Offline the disk
setDiskStateRequest := &disk.SetDiskStateRequest{
DiskNumber: diskNumber,
IsOnline: false,
}
_, err = mounter.DiskClient.SetDiskState(context.Background(), setDiskStateRequest)
if err != nil {
return err
}
return nil
}

// Rmdir - delete the given directory
// TODO: Call separate rmdir for pod context and plugin context. v1alpha1 for CSI
// proxy does a relaxed check for prefix as c:\var\lib\kubelet, so we can do
// rmdir with either pod or plugin context.
func (mounter *CSIProxyMounter) Rmdir(path string) error {
rmdirRequest := &fs.RmdirRequest{
Path: normalizeWindowsPath(path),
Expand All @@ -87,10 +124,9 @@ func (mounter *CSIProxyMounter) Rmdir(path string) error {
return nil
}

// Unmount - Removes the directory - equivalent to unmount on Linux.
func (mounter *CSIProxyMounter) Unmount(target string) error {
// WriteVolumeCache before unmount
response, err := mounter.VolumeClient.GetVolumeIDFromTargetPath(context.Background(), &volume.GetVolumeIDFromTargetPathRequest{TargetPath: target})
func (mounter *CSIProxyMounter) WriteVolumeCache(target string) {
request := &volume.GetVolumeIDFromTargetPathRequest{TargetPath: normalizeWindowsPath(target)}
response, err := mounter.VolumeClient.GetVolumeIDFromTargetPath(context.Background(), request)
if err != nil || response == nil {
klog.Warningf("GetVolumeIDFromTargetPath(%s) failed with error: %v, response: %v", target, err, response)
} else {
Expand All @@ -101,7 +137,6 @@ func (mounter *CSIProxyMounter) Unmount(target string) error {
klog.Warningf("WriteVolumeCache(%s) failed with error: %v, response: %v", response.VolumeId, err, res)
}
}
return mounter.Rmdir(target)
}

func (mounter *CSIProxyMounter) List() ([]mount.MountPoint, error) {
Expand Down

0 comments on commit 019dd49

Please sign in to comment.