Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(CSI-269): nfsmount mountPoint may be incorrect in certain cases #345

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,11 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupName).Msg("Ensuring NFS permissions")
clientGroupCaption := clientGroupName
if clientGroupCaption == "" {
clientGroupCaption = NfsClientGroupName
}
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupCaption).Msg("Ensuring NFS permissions")
// Ensure client group
logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group")
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi

ok, err := volume.Exists(ctx)
if err != nil {
return ExpandVolumeError(ctx, codes.Internal, err.Error())
return ExpandVolumeError(ctx, codes.NotFound, err.Error())
}
if !ok {
return ExpandVolumeError(ctx, codes.Internal, "Volume does not exist")
Expand Down
1 change: 0 additions & 1 deletion pkg/wekafs/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,4 @@ type AnyMount interface {
getMountPoint() string
getMountOptions() MountOptions
getLastUsed() time.Time
locateMountIP() error // used only for NFS
}
93 changes: 55 additions & 38 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ func (m *nfsMount) isInDevMode() bool {
}

func (m *nfsMount) isMounted() bool {
return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.mountPoint)
return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.getMountPoint())
}

func (m *nfsMount) getRefcountIdx() string {
return m.getMountPoint() + "^" + m.getMountOptions().AsNfs().String()
}

func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error {
Expand All @@ -57,27 +61,34 @@ func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) e
logger.Error().Msg("Mounter is nil")
return errors.New("mounter is nil")
}

m.mounter.lock.Lock()
defer m.mounter.lock.Unlock()
refCount, ok := m.mounter.mountMap[m.getMountPoint()]
refCount, ok := m.mounter.mountMap[m.getRefcountIdx()]
if !ok {
refCount = 0
}
if refCount == 0 {
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}
} else if !m.isMounted() {
}
if refCount > 0 && !m.isMounted() {
logger.Warn().Str("mount_point", m.getMountPoint()).Int("refcount", refCount).Msg("Mount not exists although should!")
if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil {
return err
}

}
refCount++
m.mounter.mountMap[m.getMountPoint()] = refCount

logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased")
m.mounter.mountMap[m.getRefcountIdx()] = refCount
m.mounter.mountMap[m.getRefcountIdx()] = refCount

logger.Trace().
Int("refcount", refCount).
Strs("mount_options", m.getMountOptions().Strings()).
Str("filesystem_name", m.fsName).
Str("mount_point", m.getMountPoint()).
Msg("RefCount increased")
return nil
}

Expand All @@ -89,26 +100,25 @@ func (m *nfsMount) decRef(ctx context.Context) error {
}
m.mounter.lock.Lock()
defer m.mounter.lock.Unlock()
refCount, ok := m.mounter.mountMap[m.getMountPoint()]
defer func() {
if refCount == 0 {
delete(m.mounter.mountMap, m.getMountPoint())
} else {
m.mounter.mountMap[m.getMountPoint()] = refCount
}
}()
refCount, ok := m.mounter.mountMap[m.getRefcountIdx()]
if !ok {
logger.Error().Int("refcount", refCount).Str("mount_options", m.getMountOptions().String()).Str("mount_point", m.getMountPoint()).Msg("During decRef refcount not found")
refCount = 0
}
if refCount < 0 {
logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered")
refCount = 0 // to make sure that we don't have negative refcount later
logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered, probably due to failed unmount")
}
if refCount == 1 {
if err := m.doUnmount(ctx); err != nil {
return err
}
if refCount > 0 {
logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased")
refCount--
m.mounter.mountMap[m.getRefcountIdx()] = refCount
}
if refCount == 0 {
if m.isMounted() {
if err := m.doUnmount(ctx); err != nil {
return err
}
}
}
return nil
}
Expand All @@ -132,6 +142,12 @@ func (m *nfsMount) doUnmount(ctx context.Context) error {
logger.Error().Err(err).Msg("Failed to unmount")
} else {
logger.Trace().Msg("Unmounted successfully")
if err := os.Remove(m.getMountPoint()); err != nil {
logger.Error().Err(err).Msg("Failed to remove mount point")
return err
} else {
logger.Trace().Msg("Removed mount point successfully")
}
}
return err
}
Expand All @@ -156,14 +172,6 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
return errors.New("no API client for mount, cannot do NFS mount")
}

if err := m.ensureMountIpAddress(ctx, apiClient); err != nil {
logger.Error().Err(err).Msg("Failed to get mount IP address")
return err
}

if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
if !m.isInDevMode() {

nodeIP, err := apiclient.GetNodeIpAddressByRouting(m.mountIpAddress)
Expand All @@ -181,26 +189,35 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
logger.Trace().
Strs("mount_options", m.getMountOptions().Strings()).
Str("mount_target", mountTarget).
Str("mount_point", m.getMountPoint()).
Str("mount_ip_address", m.mountIpAddress).
Msg("Performing mount")

err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive)
if err != nil {
if os.IsNotExist(err) {
logger.Error().Err(err).Msg("Mount target not found")
logger.Trace().Msg("Ensuring mount point exists")
if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive)
if err == nil {
logger.Trace().Msg("Mounted successfully")
return nil
}
if os.IsNotExist(err) || strings.Contains(strings.ToLower(err.Error()), "no such file or directory") {
logger.Error().Err(err).Msg("Mount point not found")
} else if os.IsPermission(err) {
logger.Error().Err(err).Msg("Mount failed due to permissions issue")
return err
} else if strings.Contains(err.Error(), "invalid argument") {
logger.Error().Err(err).Msg("Mount failed due to invalid argument")
return err
} else {
logger.Error().Err(err).Msg("Mount failed due to unknown issue")
}
return err
logger.Warn().Int("attempt", i+1).Msg("Retrying mount")
time.Sleep(2 * time.Second) // Optional: Add a delay between retries
}
logger.Trace().Msg("Mounted successfully")
return nil
logger.Error().Err(err).Int("retry_count", maxRetries).Msg("Failed to mount after retries")
return err
} else {
fakePath := filepath.Join(m.debugPath, m.fsName)
if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil {
Expand Down
80 changes: 42 additions & 38 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"k8s.io/mount-utils"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -75,7 +76,12 @@ func (m *nfsMounter) mountWithOptions(ctx context.Context, fsName string, mountO
mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs)
mountOptions = mountOptions.AsNfs()
mountOptions.Merge(mountOptions, m.exclusiveMountOptions)
mountObj := m.NewMount(fsName, mountOptions)
mountObj := m.NewMount(fsName, mountOptions).(*nfsMount)

if err := mountObj.ensureMountIpAddress(ctx, apiClient); err != nil {
return "", err, func() {}
}

mountErr := mountObj.incRef(ctx, apiClient)

if mountErr != nil {
Expand All @@ -98,7 +104,7 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti
options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs)
options = options.AsNfs()
options.Merge(options, m.exclusiveMountOptions)
mnt := m.NewMount(fsName, options)
mnt := m.NewMount(fsName, options).(*nfsMount)
// since we are not aware of the IP address of the mount, we need to find the mount point by listing the mounts
err := mnt.locateMountIP()
if err != nil {
Expand All @@ -111,45 +117,43 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti
}

func (m *nfsMounter) LogActiveMounts() {
//if len(m.mountMap) > 0 {
// count := 0
// for fsName := range m.mountMap {
// for mnt := range m.mountMap[fsName] {
// mapEntry := m.mountMap[fsName][mnt]
// if mapEntry.getRefCount() > 0 {
// log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active")
// count++
// } else {
// log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active")
// }
//
// }
// }
// log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map")
//}
m.lock.Lock()
defer m.lock.Unlock()
if len(m.mountMap) > 0 {
count := 0
for refIndex := range m.mountMap {
if mapEntry, ok := m.mountMap[refIndex]; ok {
parts := strings.Split(refIndex, "^")
logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Int("refcount", mapEntry).Logger()

if mapEntry > 0 {
logger.Trace().Msg("Mount is active")
count++
} else {
logger.Trace().Msg("Mount is not active")
}

}
}
log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map")
}
}

func (m *nfsMounter) gcInactiveMounts() {
//if len(m.mountMap) > 0 {
// for fsName := range m.mountMap {
// for uniqueId, wekaMount := range m.mountMap[fsName] {
// if wekaMount.getRefCount() == 0 {
// if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) {
// m.lock.Lock()
// if wekaMount.getRefCount() == 0 {
// log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()).
// Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map")
// delete(m.mountMap[fsName], uniqueId)
// }
// m.lock.Unlock()
// }
// }
// }
// if len(m.mountMap[fsName]) == 0 {
// delete(m.mountMap, fsName)
// }
// }
//}
m.lock.Lock()
defer m.lock.Unlock()
if len(m.mountMap) > 0 {
for refIndex := range m.mountMap {
if mapEntry, ok := m.mountMap[refIndex]; ok {
if mapEntry == 0 {
parts := strings.Split(refIndex, "^")
logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Logger()
logger.Trace().Msg("Removing inactive mount from map")
delete(m.mountMap, refIndex)
}
}
}
}
}

func (m *nfsMounter) schedulePeriodicMountGc() {
Expand Down
10 changes: 6 additions & 4 deletions pkg/wekafs/wekafsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func (m *wekafsMount) doUnmount(ctx context.Context) error {
logger.Error().Err(err).Msg("Failed to unmount")
} else {
logger.Trace().Msg("Unmounted successfully")
if err := os.Remove(m.getMountPoint()); err != nil {
logger.Error().Err(err).Msg("Failed to remove mount point")
return err
} else {
logger.Trace().Msg("Removed mount point successfully")
}
}
return err
}
Expand Down Expand Up @@ -180,7 +186,3 @@ func (m *wekafsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClien
return m.kMounter.Mount(fakePath, m.getMountPoint(), "", []string{"bind"})
}
}

func (m *wekafsMount) locateMountIP() error {
return nil
}