diff --git a/pkg/wekafs/apiclient/nfs.go b/pkg/wekafs/apiclient/nfs.go index 838955a9f..a0fb1b7ba 100644 --- a/pkg/wekafs/apiclient/nfs.go +++ b/pkg/wekafs/apiclient/nfs.go @@ -800,7 +800,11 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName defer span.End() ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) logger := log.Ctx(ctx) - logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupName).Msg("Ensuring NFS permissions") + clientGroupCaption := clientGroupName + if clientGroupCaption == "" { + clientGroupCaption = NfsClientGroupName + } + logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupCaption).Msg("Ensuring NFS permissions") // Ensure client group logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group") cg, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName) diff --git a/pkg/wekafs/controllerserver.go b/pkg/wekafs/controllerserver.go index dd14a9094..1bd2c34ef 100644 --- a/pkg/wekafs/controllerserver.go +++ b/pkg/wekafs/controllerserver.go @@ -459,7 +459,7 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi ok, err := volume.Exists(ctx) if err != nil { - return ExpandVolumeError(ctx, codes.Internal, err.Error()) + return ExpandVolumeError(ctx, codes.NotFound, err.Error()) } if !ok { return ExpandVolumeError(ctx, codes.Internal, "Volume does not exist") diff --git a/pkg/wekafs/interfaces.go b/pkg/wekafs/interfaces.go index a3b80b3dc..d2c6a06d5 100644 --- a/pkg/wekafs/interfaces.go +++ b/pkg/wekafs/interfaces.go @@ -49,5 +49,4 @@ type AnyMount interface { getMountPoint() string getMountOptions() MountOptions getLastUsed() time.Time - locateMountIP() error // used only for NFS } diff --git a/pkg/wekafs/nfsmount.go b/pkg/wekafs/nfsmount.go index b2fb8cff8..8894d14b8 100644 --- a/pkg/wekafs/nfsmount.go +++ b/pkg/wekafs/nfsmount.go @@ -48,7 +48,11 @@ func (m *nfsMount) isInDevMode() bool { } func (m *nfsMount) isMounted() bool { - return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.mountPoint) + return PathExists(m.getMountPoint()) && PathIsWekaMount(context.Background(), m.getMountPoint()) +} + +func (m *nfsMount) getRefcountIdx() string { + return m.getMountPoint() + "^" + m.getMountOptions().AsNfs().String() } func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) error { @@ -57,9 +61,10 @@ func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) e logger.Error().Msg("Mounter is nil") return errors.New("mounter is nil") } + m.mounter.lock.Lock() defer m.mounter.lock.Unlock() - refCount, ok := m.mounter.mountMap[m.getMountPoint()] + refCount, ok := m.mounter.mountMap[m.getRefcountIdx()] if !ok { refCount = 0 } @@ -67,17 +72,23 @@ func (m *nfsMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) e if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil { return err } - } else if !m.isMounted() { + } + if refCount > 0 && !m.isMounted() { logger.Warn().Str("mount_point", m.getMountPoint()).Int("refcount", refCount).Msg("Mount not exists although should!") if err := m.doMount(ctx, apiClient, m.getMountOptions()); err != nil { return err } - } refCount++ - m.mounter.mountMap[m.getMountPoint()] = refCount - - logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount increased") + m.mounter.mountMap[m.getRefcountIdx()] = refCount + m.mounter.mountMap[m.getRefcountIdx()] = refCount + + logger.Trace(). + Int("refcount", refCount). + Strs("mount_options", m.getMountOptions().Strings()). + Str("filesystem_name", m.fsName). + Str("mount_point", m.getMountPoint()). + Msg("RefCount increased") return nil } @@ -89,26 +100,25 @@ func (m *nfsMount) decRef(ctx context.Context) error { } m.mounter.lock.Lock() defer m.mounter.lock.Unlock() - refCount, ok := m.mounter.mountMap[m.getMountPoint()] - defer func() { - if refCount == 0 { - delete(m.mounter.mountMap, m.getMountPoint()) - } else { - m.mounter.mountMap[m.getMountPoint()] = refCount - } - }() + refCount, ok := m.mounter.mountMap[m.getRefcountIdx()] if !ok { + logger.Error().Int("refcount", refCount).Str("mount_options", m.getMountOptions().String()).Str("mount_point", m.getMountPoint()).Msg("During decRef refcount not found") refCount = 0 } if refCount < 0 { - logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered") - refCount = 0 // to make sure that we don't have negative refcount later + logger.Error().Int("refcount", refCount).Msg("During decRef negative refcount encountered, probably due to failed unmount") } - if refCount == 1 { - if err := m.doUnmount(ctx); err != nil { - return err - } + if refCount > 0 { + logger.Trace().Int("refcount", refCount).Strs("mount_options", m.getMountOptions().Strings()).Str("filesystem_name", m.fsName).Msg("RefCount decreased") refCount-- + m.mounter.mountMap[m.getRefcountIdx()] = refCount + } + if refCount == 0 { + if m.isMounted() { + if err := m.doUnmount(ctx); err != nil { + return err + } + } } return nil } @@ -132,6 +142,12 @@ func (m *nfsMount) doUnmount(ctx context.Context) error { logger.Error().Err(err).Msg("Failed to unmount") } else { logger.Trace().Msg("Unmounted successfully") + if err := os.Remove(m.getMountPoint()); err != nil { + logger.Error().Err(err).Msg("Failed to remove mount point") + return err + } else { + logger.Trace().Msg("Removed mount point successfully") + } } return err } @@ -156,14 +172,6 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, return errors.New("no API client for mount, cannot do NFS mount") } - if err := m.ensureMountIpAddress(ctx, apiClient); err != nil { - logger.Error().Err(err).Msg("Failed to get mount IP address") - return err - } - - if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil { - return err - } if !m.isInDevMode() { nodeIP, err := apiclient.GetNodeIpAddressByRouting(m.mountIpAddress) @@ -181,26 +189,35 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, logger.Trace(). Strs("mount_options", m.getMountOptions().Strings()). Str("mount_target", mountTarget). + Str("mount_point", m.getMountPoint()). Str("mount_ip_address", m.mountIpAddress). Msg("Performing mount") - err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive) - if err != nil { - if os.IsNotExist(err) { - logger.Error().Err(err).Msg("Mount target not found") + logger.Trace().Msg("Ensuring mount point exists") + if err := os.MkdirAll(m.getMountPoint(), DefaultVolumePermissions); err != nil { + return err + } + maxRetries := 3 + for i := 0; i < maxRetries; i++ { + err = m.kMounter.MountSensitive(mountTarget, m.getMountPoint(), "nfs", mountOptions.Strings(), mountOptionsSensitive) + if err == nil { + logger.Trace().Msg("Mounted successfully") + return nil + } + if os.IsNotExist(err) || strings.Contains(strings.ToLower(err.Error()), "no such file or directory") { + logger.Error().Err(err).Msg("Mount point not found") } else if os.IsPermission(err) { logger.Error().Err(err).Msg("Mount failed due to permissions issue") - return err } else if strings.Contains(err.Error(), "invalid argument") { logger.Error().Err(err).Msg("Mount failed due to invalid argument") - return err } else { logger.Error().Err(err).Msg("Mount failed due to unknown issue") } - return err + logger.Warn().Int("attempt", i+1).Msg("Retrying mount") + time.Sleep(2 * time.Second) // Optional: Add a delay between retries } - logger.Trace().Msg("Mounted successfully") - return nil + logger.Error().Err(err).Int("retry_count", maxRetries).Msg("Failed to mount after retries") + return err } else { fakePath := filepath.Join(m.debugPath, m.fsName) if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil { diff --git a/pkg/wekafs/nfsmounter.go b/pkg/wekafs/nfsmounter.go index 2aae28e85..ffec42cd2 100644 --- a/pkg/wekafs/nfsmounter.go +++ b/pkg/wekafs/nfsmounter.go @@ -6,6 +6,7 @@ import ( "github.com/rs/zerolog/log" "github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient" "k8s.io/mount-utils" + "strings" "sync" "time" ) @@ -75,7 +76,12 @@ func (m *nfsMounter) mountWithOptions(ctx context.Context, fsName string, mountO mountOptions.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) mountOptions = mountOptions.AsNfs() mountOptions.Merge(mountOptions, m.exclusiveMountOptions) - mountObj := m.NewMount(fsName, mountOptions) + mountObj := m.NewMount(fsName, mountOptions).(*nfsMount) + + if err := mountObj.ensureMountIpAddress(ctx, apiClient); err != nil { + return "", err, func() {} + } + mountErr := mountObj.incRef(ctx, apiClient) if mountErr != nil { @@ -98,7 +104,7 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti options.setSelinux(m.getSelinuxStatus(ctx), MountProtocolNfs) options = options.AsNfs() options.Merge(options, m.exclusiveMountOptions) - mnt := m.NewMount(fsName, options) + mnt := m.NewMount(fsName, options).(*nfsMount) // since we are not aware of the IP address of the mount, we need to find the mount point by listing the mounts err := mnt.locateMountIP() if err != nil { @@ -111,45 +117,43 @@ func (m *nfsMounter) unmountWithOptions(ctx context.Context, fsName string, opti } func (m *nfsMounter) LogActiveMounts() { - //if len(m.mountMap) > 0 { - // count := 0 - // for fsName := range m.mountMap { - // for mnt := range m.mountMap[fsName] { - // mapEntry := m.mountMap[fsName][mnt] - // if mapEntry.getRefCount() > 0 { - // log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is active") - // count++ - // } else { - // log.Trace().Str("filesystem", fsName).Int("refcount", mapEntry.getRefCount()).Strs("mount_options", mapEntry.getMountOptions().Strings()).Msg("Mount is not active") - // } - // - // } - // } - // log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map") - //} + m.lock.Lock() + defer m.lock.Unlock() + if len(m.mountMap) > 0 { + count := 0 + for refIndex := range m.mountMap { + if mapEntry, ok := m.mountMap[refIndex]; ok { + parts := strings.Split(refIndex, "^") + logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Int("refcount", mapEntry).Logger() + + if mapEntry > 0 { + logger.Trace().Msg("Mount is active") + count++ + } else { + logger.Trace().Msg("Mount is not active") + } + + } + } + log.Debug().Int("total", len(m.mountMap)).Int("active", count).Msg("Periodic checkup on mount map") + } } func (m *nfsMounter) gcInactiveMounts() { - //if len(m.mountMap) > 0 { - // for fsName := range m.mountMap { - // for uniqueId, wekaMount := range m.mountMap[fsName] { - // if wekaMount.getRefCount() == 0 { - // if wekaMount.getLastUsed().Before(time.Now().Add(-inactiveMountGcPeriod)) { - // m.lock.Lock() - // if wekaMount.getRefCount() == 0 { - // log.Trace().Str("filesystem", fsName).Strs("mount_options", wekaMount.getMountOptions().Strings()). - // Time("last_used", wekaMount.getLastUsed()).Msg("Removing stale mount from map") - // delete(m.mountMap[fsName], uniqueId) - // } - // m.lock.Unlock() - // } - // } - // } - // if len(m.mountMap[fsName]) == 0 { - // delete(m.mountMap, fsName) - // } - // } - //} + m.lock.Lock() + defer m.lock.Unlock() + if len(m.mountMap) > 0 { + for refIndex := range m.mountMap { + if mapEntry, ok := m.mountMap[refIndex]; ok { + if mapEntry == 0 { + parts := strings.Split(refIndex, "^") + logger := log.With().Str("mount_point", parts[0]).Str("mount_options", parts[1]).Str("ref_index", refIndex).Logger() + logger.Trace().Msg("Removing inactive mount from map") + delete(m.mountMap, refIndex) + } + } + } + } } func (m *nfsMounter) schedulePeriodicMountGc() { diff --git a/pkg/wekafs/wekafsmount.go b/pkg/wekafs/wekafsmount.go index 9e178c87c..28280a911 100644 --- a/pkg/wekafs/wekafsmount.go +++ b/pkg/wekafs/wekafsmount.go @@ -100,6 +100,12 @@ func (m *wekafsMount) doUnmount(ctx context.Context) error { logger.Error().Err(err).Msg("Failed to unmount") } else { logger.Trace().Msg("Unmounted successfully") + if err := os.Remove(m.getMountPoint()); err != nil { + logger.Error().Err(err).Msg("Failed to remove mount point") + return err + } else { + logger.Trace().Msg("Removed mount point successfully") + } } return err } @@ -180,7 +186,3 @@ func (m *wekafsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClien return m.kMounter.Mount(fakePath, m.getMountPoint(), "", []string{"bind"}) } } - -func (m *wekafsMount) locateMountIP() error { - return nil -}