diff --git a/docs/NFS.md b/docs/NFS.md index 3edbf33dd..b880a8129 100644 --- a/docs/NFS.md +++ b/docs/NFS.md @@ -164,7 +164,7 @@ This is done by setting the `pluginConfig.mountProtocol.allowNfsFailback` parame The parameter `pluginConfig.mountProtocol.useNfs` enforces the use of NFS transport even if Weka client is installed on the node, and recommended to be set to `true` ONLY for testing. -Follow the [Helm installation instructions](./charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin. +Follow the [Helm installation instructions](../charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin. Most of the installation steps are the same as for the native WekaFS driver, however, additional parameters should be set in the `values.yaml` file, or passed as command line arguments to the `helm install` command. diff --git a/docs/usage.md b/docs/usage.md index 3de0d75c7..dd331e010 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -352,7 +352,7 @@ pvc-ee54de25-14f3-4024-98d0-12225e4b8215 4Gi RWX Delete ``` -1. Check that configuration was applied +2. Check that configuration was applied ```shell script $ kubectl get pv NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE diff --git a/pkg/wekafs/apiclient/filesystem.go b/pkg/wekafs/apiclient/filesystem.go index 2968e77b4..0ce0ee009 100644 --- a/pkg/wekafs/apiclient/filesystem.go +++ b/pkg/wekafs/apiclient/filesystem.go @@ -214,6 +214,28 @@ func (a *ApiClient) DeleteFileSystem(ctx context.Context, r *FileSystemDeleteReq return nil } +func (a *ApiClient) EnsureNoNfsPermissionsForFilesystem(ctx context.Context, fsName string) error { + logger := log.Ctx(ctx) + logger.Trace().Str("filesystem", fsName).Msg("Ensuring no NFS permissions for filesystem") + permissions := &[]NfsPermission{} + err := a.FindNfsPermissionsByFilesystem(ctx, fsName, permissions) + if err != nil { + logger.Error().Err(err).Str("filesystem", fsName).Msg("Failed to list NFS permissions") + } + for _, p := range *permissions { + err = a.DeleteNfsPermission(ctx, &NfsPermissionDeleteRequest{Uid: p.Uid}) + if err != nil { + logger.Error().Err(err).Str("permission", p.Uid.String()).Str("filesystem", p.Filesystem).Str("client_group", p.Group).Msg("Failed to delete NFS permission") + return err + } + } + if len(*permissions) > 0 { + time.Sleep(time.Second * 5) // wait for NFS permissions reconfiguration + logger.Trace().Str("filesystem", fsName).Msg("Deleted NFS permissions") + } + return nil +} + func (a *ApiClient) GetFileSystemMountToken(ctx context.Context, r *FileSystemMountTokenRequest, token *FileSystemMountToken) error { op := "GetFileSystemMountToken" ctx, span := otel.Tracer(TracerName).Start(ctx, op) @@ -260,6 +282,7 @@ func (fs *FileSystem) GetType() string { return "filesystem" } +//goland:noinspection GoUnusedParameter func (fs *FileSystem) GetBasePath(a *ApiClient) string { return "fileSystems" } diff --git a/pkg/wekafs/apiclient/interfacegroup.go b/pkg/wekafs/apiclient/interfacegroup.go index 5d3a1d0bf..82ce4f604 100644 --- a/pkg/wekafs/apiclient/interfacegroup.go +++ b/pkg/wekafs/apiclient/interfacegroup.go @@ -41,6 +41,7 @@ func (i *InterfaceGroup) GetType() string { return "interfaceGroup" } +//goland:noinspection GoUnusedParameter func (i *InterfaceGroup) GetBasePath(client *ApiClient) string { return "interfaceGroups" } diff --git a/pkg/wekafs/apiclient/nfs.go b/pkg/wekafs/apiclient/nfs.go index 80102ed87..838955a9f 100644 --- a/pkg/wekafs/apiclient/nfs.go +++ b/pkg/wekafs/apiclient/nfs.go @@ -13,6 +13,7 @@ import ( "k8s.io/helm/pkg/urlutil" "strconv" "strings" + "time" ) type NfsPermissionType string @@ -33,6 +34,7 @@ func (n NfsVersionString) AsWeka() NfsVersionString { type NfsAuthType string +//goland:noinspection GoUnusedConst const ( NfsPermissionTypeReadWrite NfsPermissionType = "RW" NfsPermissionTypeReadOnly NfsPermissionType = "RO" @@ -75,13 +77,14 @@ func (n *NfsPermission) GetType() string { return "nfsPermission" } +//goland:noinspection GoUnusedParameter func (n *NfsPermission) GetBasePath(a *ApiClient) string { return "nfs/permissions" } func (n *NfsPermission) GetApiUrl(a *ApiClient) string { url, err := urlutil.URLJoin(n.GetBasePath(a), n.Uid.String()) - if err != nil { + if err == nil { return url } return "" @@ -105,29 +108,39 @@ func (n *NfsPermission) IsEligibleForCsi() bool { n.SquashMode == NfsPermissionSquashModeNone } -func (a *ApiClient) GetNfsPermissions(ctx context.Context, fsUid uuid.UUID, permissions *[]NfsPermission) error { - n := &NfsPermission{} - - err := a.Get(ctx, n.GetBasePath(a), nil, permissions) +func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error { + op := "FindNfsPermissionsByFilter" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + ret := &[]NfsPermission{} + q, _ := qs.Values(query) + err := a.Get(ctx, query.GetBasePath(a), q, ret) if err != nil { return err } + for _, r := range *ret { + if r.EQ(query) { + *resultSet = append(*resultSet, r) + } + } return nil } -func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error { +func (a *ApiClient) FindNfsPermissionsByFilesystem(ctx context.Context, fsName string, resultSet *[]NfsPermission) error { op := "FindNfsPermissionsByFilter" ctx, span := otel.Tracer(TracerName).Start(ctx, op) defer span.End() ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) ret := &[]NfsPermission{} + query := &NfsPermission{Filesystem: fsName} q, _ := qs.Values(query) err := a.Get(ctx, query.GetBasePath(a), q, ret) if err != nil { return err } for _, r := range *ret { - if r.EQ(query) { + if r.Filesystem == query.Filesystem { *resultSet = append(*resultSet, r) } } @@ -199,16 +212,22 @@ func (a *ApiClient) CreateNfsPermission(ctx context.Context, r *NfsPermissionCre ctx, span := otel.Tracer(TracerName).Start(ctx, op) defer span.End() ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + logger := log.Ctx(ctx).With().Str("nfs_permission", r.String()).Logger() if !r.hasRequiredFields() { return RequestMissingParams } payload, err := json.Marshal(r) if err != nil { + logger.Error().Err(err).Msg("Failed to marshal request") return err } - + logger.Trace().Msg("Creating permission") err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, p) - return err + if err != nil { + logger.Error().Err(err).Msg("Failed to create NFS permission") + return err + } + return nil } func EnsureNfsPermission(ctx context.Context, fsName string, group string, version NfsVersionString, apiClient *ApiClient) error { @@ -235,12 +254,66 @@ func EnsureNfsPermission(ctx context.Context, fsName string, group string, versi AnonUid: 65534, SupportedVersions: &[]string{string(NfsVersionV4)}, } - return apiClient.CreateNfsPermission(ctx, req, perm) + if err := apiClient.CreateNfsPermission(ctx, req, perm); err != nil { + return err + } + time.Sleep(5 * time.Second) // wait for the permission to be applied + return nil } } return err } +type NfsPermissionDeleteRequest struct { + Uid uuid.UUID `json:"-"` +} + +func (pd *NfsPermissionDeleteRequest) getApiUrl(a *ApiClient) string { + return pd.getRelatedObject().GetApiUrl(a) +} + +func (pd *NfsPermissionDeleteRequest) getRelatedObject() ApiObject { + return &NfsPermission{Uid: pd.Uid} +} + +func (pd *NfsPermissionDeleteRequest) getRequiredFields() []string { + return []string{"Uid"} +} + +func (pd *NfsPermissionDeleteRequest) hasRequiredFields() bool { + return ObjectRequestHasRequiredFields(pd) +} + +func (pd *NfsPermissionDeleteRequest) String() string { + return fmt.Sprintln("NfsPermissionDeleteRequest(uid:", pd.Uid) +} + +func (a *ApiClient) DeleteNfsPermission(ctx context.Context, r *NfsPermissionDeleteRequest) error { + op := "DeleteNfsPermission" + ctx, span := otel.Tracer(TracerName).Start(ctx, op) + defer span.End() + ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) + if !r.hasRequiredFields() { + return RequestMissingParams + } + apiResponse := &ApiResponse{} + err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse) + if err != nil { + switch t := err.(type) { + case *ApiNotFoundError: + return ObjectNotFoundError + case *ApiBadRequestError: + for _, c := range t.ApiResponse.ErrorCodes { + if c == "PermissionDoesNotExistException" { + return ObjectNotFoundError + } + } + } + } + time.Sleep(5 * time.Second) // wait for the permission to be removed + return nil +} + type NfsClientGroup struct { Uid uuid.UUID `json:"uid,omitempty" url:"-"` Rules []NfsClientGroupRule `json:"rules,omitempty" url:"-"` @@ -252,6 +325,7 @@ func (g *NfsClientGroup) GetType() string { return "clientGroup" } +//goland:noinspection GoUnusedParameter func (g *NfsClientGroup) GetBasePath(a *ApiClient) string { return "nfs/clientGroups" } @@ -739,6 +813,7 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName logger.Trace().Str("ip_address", ip).Msg("Ensuring NFS Client Group Rule for IP") err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, ip) if err != nil { + logger.Error().Err(err).Str("ip_address", ip).Msg("Failed to ensure NFS client group rule for IP") return err } // Ensure NFS permission diff --git a/pkg/wekafs/apiclient/nfs_test.go b/pkg/wekafs/apiclient/nfs_test.go index 046447afb..c5606629f 100644 --- a/pkg/wekafs/apiclient/nfs_test.go +++ b/pkg/wekafs/apiclient/nfs_test.go @@ -16,9 +16,9 @@ var fsName string var client *ApiClient func TestMain(m *testing.M) { - flag.StringVar(&endpoint, "api-endpoint", "vm125-1726039130891528-0.lan:14000", "API endpoint for tests") + flag.StringVar(&endpoint, "api-endpoint", "localhost:14000", "API endpoint for tests") flag.StringVar(&creds.Username, "api-username", "admin", "API username for tests") - flag.StringVar(&creds.Password, "api-password", "AAbb1234", "API password for tests") + flag.StringVar(&creds.Password, "api-password", "Qwerty1@", "API password for tests") flag.StringVar(&creds.Organization, "api-org", "Root", "API org for tests") flag.StringVar(&creds.HttpScheme, "api-scheme", "https", "API scheme for tests") flag.StringVar(&fsName, "fs-name", "default", "Filesystem name for tests") @@ -81,16 +81,27 @@ func GetApiClientForTest(t *testing.T) *ApiClient { // assert.NoError(t, err) // assert.NotNil(t, result) //} -// -//func TestGetNfsPermissionsByFilesystemName(t *testing.T) { -// apiClient := GetApiClientForTest(t) -// -// -// var permissions []NfsPermission -// err := apiClient.GetNfsPermissionsByFilesystemName(context.Background(), "fs1", &permissions) -// assert.NoError(t, err) -// assert.NotEmpty(t, permissions) -//} + +func TestFindNfsPermissionsByFilesystemName(t *testing.T) { + apiClient := GetApiClientForTest(t) + + var permissions []NfsPermission + err := apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions) + assert.NoError(t, err) + assert.NotEmpty(t, permissions) + if len(permissions) > 0 { + for _, p := range permissions { + r := &NfsPermissionDeleteRequest{Uid: p.Uid} + err := apiClient.DeleteNfsPermission(context.Background(), r) + assert.NoError(t, err) + } + } + err = apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions) + assert.NoError(t, err) + assert.Empty(t, permissions) + +} + // //func TestGetNfsPermissionByUid(t *testing.T) { // apiClient := GetApiClientForTest(t) diff --git a/pkg/wekafs/apiclient/quota.go b/pkg/wekafs/apiclient/quota.go index 943d0aa1e..27288d70f 100644 --- a/pkg/wekafs/apiclient/quota.go +++ b/pkg/wekafs/apiclient/quota.go @@ -17,14 +17,17 @@ import ( type QuotaType string type QuotaStatus string -const QuotaTypeHard QuotaType = "HARD" -const QuotaTypeSoft QuotaType = "SOFT" -const QuotaTypeDefault = QuotaTypeHard -const QuotaStatusActive = "ACTIVE" -const QuotaStatusPending = "ADDING" -const QuotaStatusError = "ERROR" -const QuotaStatusDeleting = "DELETING" -const MaxQuotaSize uint64 = 9223372036854775807 +//goland:noinspection GoUnusedConst +const ( + QuotaTypeHard QuotaType = "HARD" + QuotaTypeSoft QuotaType = "SOFT" + QuotaTypeDefault = QuotaTypeHard + QuotaStatusActive = "ACTIVE" + QuotaStatusPending = "ADDING" + QuotaStatusError = "ERROR" + QuotaStatusDeleting = "DELETING" + MaxQuotaSize uint64 = 9223372036854775807 +) type Quota struct { FilesystemUid uuid.UUID `json:"-"` diff --git a/pkg/wekafs/apiclient/snapshot.go b/pkg/wekafs/apiclient/snapshot.go index 4bc415c5b..9da559570 100644 --- a/pkg/wekafs/apiclient/snapshot.go +++ b/pkg/wekafs/apiclient/snapshot.go @@ -168,6 +168,7 @@ func (snap *Snapshot) GetType() string { return "snapshot" } +//goland:noinspection GoUnusedParameter func (snap *Snapshot) GetBasePath(a *ApiClient) string { return "snapshots" } diff --git a/pkg/wekafs/apiclient/utils.go b/pkg/wekafs/apiclient/utils.go index 9eede7da8..0275d8058 100644 --- a/pkg/wekafs/apiclient/utils.go +++ b/pkg/wekafs/apiclient/utils.go @@ -156,7 +156,7 @@ func GetNodeIpAddressByRouting(targetHost string) (string, error) { if err != nil { return "", err } - defer conn.Close() + defer func() { _ = conn.Close() }() // Set a deadline for the connection err = conn.SetDeadline(time.Now().Add(1 * time.Second)) diff --git a/pkg/wekafs/identityserver.go b/pkg/wekafs/identityserver.go index 132cf746d..191d0ff25 100644 --- a/pkg/wekafs/identityserver.go +++ b/pkg/wekafs/identityserver.go @@ -36,6 +36,7 @@ type identityServer struct { config *DriverConfig } +//goland:noinspection GoExportedFuncWithUnexportedType func NewIdentityServer(name, version string, config *DriverConfig) *identityServer { return &identityServer{ name: name, @@ -44,6 +45,7 @@ func NewIdentityServer(name, version string, config *DriverConfig) *identityServ } } +//goland:noinspection GoUnusedParameter func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { op := "GetPluginInfo" result := "SUCCESS" @@ -78,6 +80,7 @@ func (ids *identityServer) getConfig() *DriverConfig { return ids.config } +//goland:noinspection GoUnusedParameter func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { logger := log.Ctx(ctx) isReady := ids.getConfig().isInDevMode() || isWekaInstalled() @@ -96,6 +99,7 @@ func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*c }, nil } +//goland:noinspection GoUnusedParameter func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { op := "GetPluginCapabilities" result := "SUCCESS" diff --git a/pkg/wekafs/nfsmount.go b/pkg/wekafs/nfsmount.go index 9abf05286..b2fb8cff8 100644 --- a/pkg/wekafs/nfsmount.go +++ b/pkg/wekafs/nfsmount.go @@ -173,7 +173,7 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, } if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName, apiclient.NfsVersionV4, m.clientGroupName) != nil { - logger.Error().Msg("Failed to ensure NFS permissions") + logger.Error().Err(err).Msg("Failed to ensure NFS permissions") return errors.New("failed to ensure NFS permissions") } diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index 0d66991a4..321961ff6 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -92,8 +92,16 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo volumePath := req.GetVolumePath() // Validate request fields - if volumeID == "" || volumePath == "" { - return nil, status.Error(codes.InvalidArgument, "Volume ID and path must be provided") + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided") + } + if volumePath == "" { + return nil, status.Error(codes.InvalidArgument, "Volume path must be provided") + } + if req.GetStagingTargetPath() != "" { + if !PathExists(req.GetStagingTargetPath()) { + return nil, status.Error(codes.NotFound, "Staging area path not found") + } } // Check if the volume path exists @@ -185,7 +193,6 @@ func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounte csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, csi.NodeServiceCapability_RPC_VOLUME_CONDITION, - //csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, ), nodeID: nodeId, @@ -454,6 +461,10 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } else { msg := fmt.Sprintf("Directory %s exists, but not a weka mount, assuming already unpublished", targetPath) logger.Warn().Msg(msg) + if err := os.Remove(targetPath); err != nil { + result = "FAILURE" + return NodeUnpublishVolumeError(ctx, codes.Internal, err.Error()) + } result = "SUCCESS_WITH_WARNING" return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -475,75 +486,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func NodeStageVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.NodeStageVolumeResponse, error) { - err := status.Error(errorCode, strings.ToLower(errorMessage)) - log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error staging volume") - return &csi.NodeStageVolumeResponse{}, err -} - +//goland:noinspection GoUnusedParameter func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - op := "NodeStageVolume" - ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot()) - defer span.End() - ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) - - volumeId := req.GetVolumeId() - logger := log.Ctx(ctx) - result := "FAILURE" - logger.Info().Str("volume_id", volumeId).Msg(">>>> Received request") - defer func() { - level := zerolog.InfoLevel - if result != "SUCCESS" { - level = zerolog.ErrorLevel - } - logger.WithLevel(level).Str("result", result).Msg("<<<< Completed processing request") - }() - - // Check arguments - if len(req.GetStagingTargetPath()) == 0 { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Target path missing in request") - } - - if req.GetVolumeCapability() == nil { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Error occured, volume Capability missing in request") - } - - if req.GetVolumeCapability().GetBlock() != nil { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Block accessType is unsupported") - } - result = "SUCCESS" - return &csi.NodeStageVolumeResponse{}, nil -} - -func NodeUnstageVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.NodeUnstageVolumeResponse, error) { - err := status.Error(errorCode, strings.ToLower(errorMessage)) - log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error unstaging volume") - return &csi.NodeUnstageVolumeResponse{}, err + return nil, status.Error(codes.Unimplemented, "NodeStageVolume is not supported") } +//goland:noinspection GoUnusedParameter func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - op := "NodeUnstageVolume" - result := "FAILURE" - volumeId := req.GetVolumeId() - ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot()) - defer span.End() - ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) - - logger := log.Ctx(ctx) - logger.Info().Str("volume_id", volumeId).Msg(">>>> Received request") - defer func() { - level := zerolog.InfoLevel - if result != "SUCCESS" { - level = zerolog.ErrorLevel - } - logger.WithLevel(level).Str("result", result).Msg("<<<< Completed processing request") - }() - - if len(req.GetStagingTargetPath()) == 0 { - return NodeUnstageVolumeError(ctx, codes.InvalidArgument, "Target path missing in request") - } - result = "SUCCESS" - return &csi.NodeUnstageVolumeResponse{}, nil + return nil, status.Error(codes.Unimplemented, "NodeUnstageVolume is not supported") } //goland:noinspection GoUnusedParameter diff --git a/pkg/wekafs/server.go b/pkg/wekafs/server.go index 440c107dc..9877505bb 100644 --- a/pkg/wekafs/server.go +++ b/pkg/wekafs/server.go @@ -35,6 +35,7 @@ const ( xattrVolumeName = "user.weka_k8s_volname" ) +//goland:noinspection GoExportedFuncWithUnexportedType func NewNonBlockingGRPCServer(mode CsiPluginMode) *nonBlockingGRPCServer { return &nonBlockingGRPCServer{ csiMmode: mode, diff --git a/pkg/wekafs/utilities.go b/pkg/wekafs/utilities.go index 07b77d315..cdb3a7eb1 100644 --- a/pkg/wekafs/utilities.go +++ b/pkg/wekafs/utilities.go @@ -281,6 +281,7 @@ func fileExists(filename string) bool { return false } +//goland:noinspection GoUnusedParameter func PathIsWekaMount(ctx context.Context, path string) bool { file, err := os.Open("/proc/mounts") if err != nil { diff --git a/pkg/wekafs/volume.go b/pkg/wekafs/volume.go index d807d52ab..9a3305fb7 100644 --- a/pkg/wekafs/volume.go +++ b/pkg/wekafs/volume.go @@ -59,6 +59,7 @@ type Volume struct { server AnyServer } +//goland:noinspection GoUnusedParameter func (v *Volume) getCsiContentSource(ctx context.Context) *csi.VolumeContentSource { if v.srcVolume != nil { return &csi.VolumeContentSource{ @@ -103,10 +104,12 @@ func (v *Volume) pruneUnsupportedMountOptions(ctx context.Context) { } } +//goland:noinspection GoUnusedParameter func (v *Volume) setMountOptions(ctx context.Context, mountOptions MountOptions) { v.mountOptions.Merge(mountOptions, v.server.getConfig().mutuallyExclusiveOptions) } +//goland:noinspection GoUnusedParameter func (v *Volume) getMountOptions(ctx context.Context) MountOptions { return v.mountOptions } @@ -643,6 +646,8 @@ func (v *Volume) getInnerPath() string { } // GetFullPath returns a full path on which volume is accessible including snapshot subdir and inner path +// +//goland:noinspection GoUnusedParameter func (v *Volume) GetFullPath(ctx context.Context) string { mountParts := []string{v.mountPath} if v.isOnSnapshot() { @@ -1364,7 +1369,8 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error { logger.Debug().Str("filesystem", v.FilesystemName).Msg("Deleting filesystem") fsObj, err := v.getFilesystemObj(ctx) if err != nil { - return status.Errorf(codes.Internal, "Failed to delete filesystem %s", v.FilesystemName) + logger.Error().Err(err).Str("filesystem", v.FilesystemName).Msg("Failed to fetch filesystem for deletion") + return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion %s", v.FilesystemName) } if fsObj == nil || fsObj.Uid == uuid.Nil { logger.Warn().Str("filesystem", v.FilesystemName).Msg("Apparently filesystem not exists, returning OK") @@ -1372,6 +1378,14 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error { return nil } if !fsObj.IsRemoving { // if filesystem is already removing, just wait + if v.server.getMounter().getTransport() == dataTransportNfs { + logger.Trace().Str("filesystem", v.FilesystemName).Msg("Ensuring no NFS permissions exist that could block filesystem deletion") + err := v.apiClient.EnsureNoNfsPermissionsForFilesystem(ctx, fsObj.Name) + if err != nil { + logger.Error().Str("filesystem", v.FilesystemName).Err(err).Msg("Failed to remove NFS permissions, cannot delete filesystem") + return err + } + } logger.Trace().Str("filesystem", v.FilesystemName).Msg("Attempting deletion of filesystem") fsd := &apiclient.FileSystemDeleteRequest{Uid: fsObj.Uid} err = v.apiClient.DeleteFileSystem(ctx, fsd) @@ -1504,6 +1518,8 @@ func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Log // ObtainRequestParams takes additional optional params from storage class params and applies them to Volume object // those params then need to be set during actual volume creation via UpdateParams function +// +//goland:noinspection GoUnusedParameter func (v *Volume) ObtainRequestParams(ctx context.Context, params map[string]string) error { // set explicit mount options if were passed in storageclass if val, ok := params["mountOptions"]; ok { @@ -1569,10 +1585,10 @@ func (v *Volume) CreateSnapshot(ctx context.Context, name string) (*Snapshot, er ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) s, err := NewSnapshotFromVolumeCreate(ctx, name, v, v.apiClient, v.server) - logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Str("snapshot_id", s.GetId()).Logger() if err != nil { return &Snapshot{}, err } + logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Str("snapshot_id", s.GetId()).Logger() // check if snapshot with this name already exists exists, err := s.Exists(ctx) if err != nil { diff --git a/pkg/wekafs/volumeconstructors.go b/pkg/wekafs/volumeconstructors.go index b74eeb62c..dfc463285 100644 --- a/pkg/wekafs/volumeconstructors.go +++ b/pkg/wekafs/volumeconstructors.go @@ -139,7 +139,7 @@ func NewVolumeForBlankVolumeRequest(ctx context.Context, req *csi.CreateVolumeRe if client == nil && !cs.config.alwaysAllowSnapshotVolumes { return nil, status.Error(codes.FailedPrecondition, "Quota enforcement is supported only with API-bound volumes") } - if !client.SupportsQuotaOnSnapshots() && !cs.config.alwaysAllowSnapshotVolumes { + if client != nil && !client.SupportsQuotaOnSnapshots() && !cs.config.alwaysAllowSnapshotVolumes { return nil, status.Error(codes.FailedPrecondition, "Quota enforcement is not supported for snapshot-backed volumes by current Weka software version, please upgrade Weka cluster") } snapName = generateWekaSnapNameForSnapBasedVol(cs.getConfig().VolumePrefix, requestedVolumeName)