Skip to content

Commit

Permalink
refactor(CSI-269): change NFS mount refcount logic and add mount retries
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Sep 26, 2024
1 parent 31f83aa commit fcc9275
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 45 deletions.
6 changes: 5 additions & 1 deletion pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,11 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupName).Msg("Ensuring NFS permissions")
clientGroupCaption := clientGroupName
if clientGroupCaption == "" {
clientGroupCaption = NfsClientGroupName
}
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupCaption).Msg("Ensuring NFS permissions")
// Ensure client group
logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group")
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
Expand Down
52 changes: 30 additions & 22 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *nfsMount) isMounted() bool {
}

func (m *nfsMount) getRefcountIdx() string {
return m.getMountPoint() + "^" + m.mountOptions.AsNfs().String()
return m.getMountPoint() + "^" + m.getMountOptions().AsNfs().String()
}

func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error {
Expand Down Expand Up @@ -81,6 +81,7 @@ func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) e
}
refCount++
m.mounter.mountMap[m.getRefcountIdx()] = refCount
m.mounter.mountMap[m.getRefcountIdx()] = refCount

logger.Trace().
Int("refcount", refCount).
Expand All @@ -101,21 +102,23 @@ func (m *nfsMount) decRef(ctx context.Context) error {
defer m.mounter.lock.Unlock()
refCount, ok := m.mounter.mountMap[m.getRefcountIdx()]
if !ok {
logger.Error().Int("refcount", refCount).Str("mount_options", m.getMountOptions().String()).Str("mount_point", m.getMountPoint()).Msg("During decRef refcount not found")
refCount = 0
}
if refCount <= 0 {
logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered")
refCount = 0 // to make sure that we don't have negative refcount later
if refCount < 0 {
logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered, probably due to failed unmount")
}
if refCount == 1 {
if err := m.doUnmount(ctx); err != nil {
return err
}
if refCount > 0 {
logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased")
refCount--
m.mounter.mountMap[m.getRefcountIdx()] = refCount
}
m.mounter.mountMap[m.getRefcountIdx()] = refCount
if refCount == 0 {
delete(m.mounter.mountMap, m.getMountPoint())
if m.isMounted() {
if err := m.doUnmount(ctx); err != nil {
return err
}
}
}
return nil
}
Expand Down Expand Up @@ -169,9 +172,6 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
return errors.New("no API client for mount, cannot do NFS mount")
}

if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
if !m.isInDevMode() {

nodeIP, err := apiclient.GetNodeIpAddressByRouting(m.mountIpAddress)
Expand All @@ -193,23 +193,31 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
Str("mount_ip_address", m.mountIpAddress).
Msg("Performing mount")

err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive)
if err != nil {
if os.IsNotExist(err) {
logger.Error().Err(err).Msg("Mount target not found")
logger.Trace().Msg("Ensuring mount point exists")
if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil {
return err
}
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive)
if err == nil {
logger.Trace().Msg("Mounted successfully")
return nil
}
if os.IsNotExist(err) || strings.Contains(strings.ToLower(err.Error()), "no such file or directory") {
logger.Error().Err(err).Msg("Mount point not found")
} else if os.IsPermission(err) {
logger.Error().Err(err).Msg("Mount failed due to permissions issue")
return err
} else if strings.Contains(err.Error(), "invalid argument") {
logger.Error().Err(err).Msg("Mount failed due to invalid argument")
return err
} else {
logger.Error().Err(err).Msg("Mount failed due to unknown issue")
}
return err
logger.Warn().Int("attempt", i+1).Msg("Retrying mount")
time.Sleep(2 * time.Second) // Optional: Add a delay between retries
}
logger.Trace().Msg("Mounted successfully")
return nil
logger.Error().Err(err).Int("retry_count", maxRetries).Msg("Failed to mount after retries")
return err
} else {
fakePath := filepath.Join(m.debugPath, m.fsName)
if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil {
Expand Down
42 changes: 20 additions & 22 deletions pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,20 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti
}

func (m *nfsMounter) LogActiveMounts() {
m.lock.Lock()
defer m.lock.Unlock()
if len(m.mountMap) > 0 {
count := 0
for refIndex := range m.mountMap {
if mapEntry, ok := m.mountMap[refIndex]; ok {
parts := strings.Split(refIndex, "^")
logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Int("refcount", mapEntry).Logger()

if mapEntry > 0 {
log.Trace().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Int("refcount", mapEntry).Msg("Mount is active")
logger.Trace().Msg("Mount is active")
count++
} else {
log.Trace().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Int("refcount", mapEntry).Msg("Mount is not active")
logger.Trace().Msg("Mount is not active")
}

}
Expand All @@ -136,26 +140,20 @@ func (m *nfsMounter) LogActiveMounts() {
}

func (m *nfsMounter) gcInactiveMounts() {
//if len(m.mountMap) > 0 {
// for fsName := range m.mountMap {
// for uniqueId, wekaMount := range m.mountMap[fsName] {
// if wekaMount.getRefCount() == 0 {
// if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) {
// m.lock.Lock()
// if wekaMount.getRefCount() == 0 {
// log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()).
// Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map")
// delete(m.mountMap[fsName], uniqueId)
// }
// m.lock.Unlock()
// }
// }
// }
// if len(m.mountMap[fsName]) == 0 {
// delete(m.mountMap, fsName)
// }
// }
//}
m.lock.Lock()
defer m.lock.Unlock()
if len(m.mountMap) > 0 {
for refIndex := range m.mountMap {
if mapEntry, ok := m.mountMap[refIndex]; ok {
if mapEntry == 0 {
parts := strings.Split(refIndex, "^")
logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Logger()
logger.Trace().Msg("Removing inactive mount from map")
delete(m.mountMap, refIndex)
}
}
}
}
}

func (m *nfsMounter) schedulePeriodicMountGc() {
Expand Down

0 comments on commit fcc9275

Please sign in to comment.