Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remount old mount point when csi plugin unexpect exit #282

Merged
merged 10 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/cephfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
nodeID = flag.String("nodeid", "", "node id")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
metadataStorage = flag.String("metadatastorage", "", "metadata persistence method [node|k8s_configmap]")
mountCacheDir = flag.String("mountcachedir", "", "mount info cache save dir")
huaizong marked this conversation as resolved.
Show resolved Hide resolved
)

func init() {
Expand All @@ -56,7 +57,7 @@ func main() {
}

driver := cephfs.NewDriver()
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, cp)
driver.Run(*driverName, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp)

os.Exit(0)
}
9 changes: 8 additions & 1 deletion pkg/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {

// Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cachePersister util.CachePersister) {
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) {
klog.Infof("Driver: %v version: %v", driverName, version)

// Configuration
Expand Down Expand Up @@ -105,6 +105,13 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter string, cacheP
klog.Fatalf("failed to write ceph configuration file: %v", err)
}

initVolumeMountCache(driverName, mountCacheDir, cachePersister)
if mountCacheDir != "" {
if err := remountCachedVolumes(); err != nil {
klog.Warningf("failed to remount cached volumes: %v", err)
//ignore remount fail
}
}
// Initialize default library driver

fs.cd = csicommon.NewCSIDriver(driverName, version, nodeID)
Expand Down
316 changes: 316 additions & 0 deletions pkg/cephfs/mountcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
package cephfs

import (
"encoding/base64"
"os"
"sync"
"syscall"
"time"

"github.com/ceph/ceph-csi/pkg/util"
"github.com/pkg/errors"
"k8s.io/klog"
)

type volumeMountCacheEntry struct {
DriverName string `json:"driverName"`
DriverVersion string `json:"driverVersion"`

VolumeID string `json:"volumeID"`
Secrets map[string]string `json:"secrets"`
StagingPath string `json:"stagingPath"`
TargetPaths map[string]bool `json:"targetPaths"`
CreateTime time.Time `json:"createTime"`
}

type volumeMountCacheMap struct {
driverName string
huaizong marked this conversation as resolved.
Show resolved Hide resolved
volumes map[string]volumeMountCacheEntry
nodeCacheStore util.NodeCache
metadataStore util.CachePersister
}

var (
volumeMountCachePrefix = "cephfs-mount-cache-"
volumeMountCache volumeMountCacheMap
volumeMountCacheMtx sync.Mutex
)

func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) {
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)

volumeMountCache.driverName = driverName
volumeMountCache.metadataStore = cachePersister
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
volumeMountCache.nodeCacheStore.CacheDir = ""
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir)
}

func remountCachedVolumes() error {
if err := os.MkdirAll(volumeMountCache.nodeCacheStore.BasePath, 0755); err != nil {
klog.Errorf("mount-cache: failed to create %s: %v", volumeMountCache.nodeCacheStore.BasePath, err)
return err
}
var remountFailCount, remountSuccCount int64
me := &volumeMountCacheEntry{}
ce := &controllerCacheEntry{}
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
volID := me.VolumeID
if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.Infof("mount-cache: metadata for volume %s not found, assuming the volume to be already deleted (%v)", volID, err)
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata nofound, delete volume cache entry for volume %s", volID)
}
}
} else {
if err := mountOneCacheEntry(ce, me); err == nil {
remountSuccCount++
volumeMountCache.volumes[me.VolumeID] = *me
klog.Infof("mount-cache: remount volume %s succ", volID)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
} else {
remountFailCount++
klog.Infof("mount-cache: remount volume cache %s fail", volID)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
})
if err != nil {
klog.Infof("mount-cache: metastore list cache fail %v", err)
return err
}
if remountFailCount > 0 {
klog.Infof("mount-cache: succ remount %d volumes, fail remount %d volumes", remountSuccCount, remountFailCount)
} else {
klog.Infof("mount-cache: volume cache num %d, all succ remount", remountSuccCount)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

var (
err error
cr *credentials
)
volID := ce.VolumeID
volOptions := ce.VolOptions

if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volID)
cr, err = getAdminCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}
var entity *cephEntity
entity, err = getCephUser(&volOptions, cr, volID)
if err != nil {
return err
}
cr = entity.toCredentials()
} else {
cr, err = getUserCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}
}

err = cleanupMountPoint(me.StagingPath)
if err != nil {
klog.Infof("mount-cache: failed to cleanup volume mount point %s, remove it: %s %v", volID, me.StagingPath, err)
return err
}

isMnt, err := isMountPoint(me.StagingPath)
if err != nil {
isMnt = false
klog.Infof("mount-cache: failed to check volume mounted %s: %s %v", volID, me.StagingPath, err)
}

if !isMnt {
m, err := newMounter(&volOptions)
if err != nil {
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
return err
}
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
return err
}
}
for targetPath, readOnly := range me.TargetPaths {
if err := cleanupMountPoint(targetPath); err == nil {
if err := bindMount(me.StagingPath, targetPath, readOnly); err != nil {
klog.Errorf("mount-cache: failed to bind-mount volume %s: %s %s %v %v",
volID, me.StagingPath, targetPath, readOnly, err)
} else {
klog.Infof("mount-cache: succ bind-mount volume %s: %s %s %v",
huaizong marked this conversation as resolved.
Show resolved Hide resolved
volID, me.StagingPath, targetPath, readOnly)
}
}
}
return nil
}

func cleanupMountPoint(mountPoint string) error {
if _, err := os.Stat(mountPoint); err != nil {
if isCorruptedMnt(err) {
klog.Infof("mount-cache: corrupted mount point %s, need unmount", mountPoint)
err := execCommandErr("umount", mountPoint)
if err != nil {
klog.Infof("mount-cache: unmount %s fail %v", mountPoint, err)
//ignore error return err
}
}
}
if _, err := os.Stat(mountPoint); err != nil {
klog.Errorf("mount-cache: mount point %s stat fail %v", mountPoint, err)
return err
}
return nil
}

func isCorruptedMnt(err error) bool {
if err == nil {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
huaizong marked this conversation as resolved.
Show resolved Hide resolved
underlyingError = pe.Err
}

return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}

func genVolumeMountCacheFileName(volID string) string {
cachePath := volumeMountCachePrefix + volID
return cachePath
}
func (mc *volumeMountCacheMap) isEnable() bool {
//if mount cache dir unset, disable state
return mc.nodeCacheStore.BasePath != ""
}

func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
if !mc.isEnable() {
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

lastTargetPaths := make(map[string]bool)
me, ok := volumeMountCache.volumes[volID]
if ok {
if me.StagingPath == stagingTargetPath {
klog.Warningf("mount-cache: node unexpected restage volume for volume %s", volID)
return nil
}
lastTargetPaths = me.TargetPaths
klog.Warningf("mount-cache: node stage volume ignore last cache entry for volume %s", volID)
}

me = volumeMountCacheEntry{DriverName: mc.driverName, DriverVersion: version}

me.VolumeID = volID
me.Secrets = encodeCredentials(secrets)
me.StagingPath = stagingTargetPath
me.TargetPaths = lastTargetPaths

me.CreateTime = time.Now()
volumeMountCache.volumes[volID] = me
if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
return err
}
return nil
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}

func (mc *volumeMountCacheMap) nodeUnStageVolume(volID string) error {
if !mc.isEnable() {
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
delete(volumeMountCache.volumes, volID)
if err := mc.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err != nil {
huaizong marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}

func (mc *volumeMountCacheMap) nodePublishVolume(volID string, targetPath string, readOnly bool) error {
if !mc.isEnable() {
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

_, ok := volumeMountCache.volumes[volID]
if !ok {
return errors.New("mount-cache: node publish volume failed to find cache entry for volume")
}
volumeMountCache.volumes[volID].TargetPaths[targetPath] = readOnly
return mc.updateNodeCache(volID)
}

func (mc *volumeMountCacheMap) nodeUnPublishVolume(volID string, targetPath string) error {
if !mc.isEnable() {
return nil
}
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()

_, ok := volumeMountCache.volumes[volID]
if !ok {
return errors.New("mount-cache: node unpublish volume failed to find cache entry for volume")
}
delete(volumeMountCache.volumes[volID].TargetPaths, targetPath)
return mc.updateNodeCache(volID)
}

func (mc *volumeMountCacheMap) updateNodeCache(volID string) error {
me := volumeMountCache.volumes[volID]
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata nofound, delete mount cache failed for volume %s", volID)
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
if err := mc.nodeCacheStore.Create(genVolumeMountCacheFileName(volID), me); err != nil {
return err
}
return nil
}

func encodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey := base64.StdEncoding.EncodeToString([]byte(key))
nValue := base64.StdEncoding.EncodeToString([]byte(value))
output[nKey] = nValue
}
return output
}

func decodeCredentials(input map[string]string) (output map[string]string) {
output = make(map[string]string)
for key, value := range input {
nKey, err := base64.StdEncoding.DecodeString(key)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
nValue, err := base64.StdEncoding.DecodeString(value)
if err != nil {
klog.Errorf("mount-cache: decode secret fail")
continue
}
output[string(nKey)] = string(nValue)
}
return output
}
38 changes: 38 additions & 0 deletions pkg/cephfs/mountcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package cephfs

import (
"testing"
)

func init() {
}

func TestMountOneCacheEntry(t *testing.T) {
}

func TestRemountHisMountedPath(t *testing.T) {
}

func TestNodeStageVolume(t *testing.T) {
}

func TestNodeUnStageVolume(t *testing.T) {
}

func TestNodePublishVolume(t *testing.T) {
}

func TestNodeUnpublishVolume(t *testing.T) {
}

func TestEncodeDecodeCredentials(t *testing.T) {
secrets := make(map[string]string)
secrets["user_1"] = "value_1"
enSecrets := encodeCredentials(secrets)
deSecrets := decodeCredentials(enSecrets)
for key, value := range secrets {
if deSecrets[key] != value {
t.Errorf("key %s value %s not equal %s after encode decode", key, value, deSecrets[key])
huaizong marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading