Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(CSI-270): filesystem-backed volumes cannot be deleted due to stale NFS permissions #344

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/NFS.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ This is done by setting the `pluginConfig.mountProtocol.allowNfsFailback` parame
The parameter `pluginConfig.mountProtocol.useNfs` enforces the use of NFS transport even if Weka client is installed on the node,
and recommended to be set to `true` ONLY for testing.

Follow the [Helm installation instructions](./charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin.
Follow the [Helm installation instructions](../charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin.
Most of the installation steps are the same as for the native WekaFS driver, however, additional parameters should be set in the `values.yaml` file,
or passed as command line arguments to the `helm install` command.

Expand Down
2 changes: 1 addition & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ pvc-ee54de25-14f3-4024-98d0-12225e4b8215 4Gi RWX Delete
<REPLACE spec.resources.requests.storage value with 4Gi>
```

1. Check that configuration was applied
2. Check that configuration was applied
```shell script
$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
Expand Down
23 changes: 23 additions & 0 deletions pkg/wekafs/apiclient/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ func (a *ApiClient) DeleteFileSystem(ctx context.Context, r *FileSystemDeleteReq
return nil
}

func (a *ApiClient) EnsureNoNfsPermissionsForFilesystem(ctx context.Context, fsName string) error {
logger := log.Ctx(ctx)
logger.Trace().Str("filesystem", fsName).Msg("Ensuring no NFS permissions for filesystem")
permissions := &[]NfsPermission{}
err := a.FindNfsPermissionsByFilesystem(ctx, fsName, permissions)
if err != nil {
logger.Error().Err(err).Str("filesystem", fsName).Msg("Failed to list NFS permissions")
}
for _, p := range *permissions {
err = a.DeleteNfsPermission(ctx, &NfsPermissionDeleteRequest{Uid: p.Uid})
if err != nil {
logger.Error().Err(err).Str("permission", p.Uid.String()).Str("filesystem", p.Filesystem).Str("client_group", p.Group).Msg("Failed to delete NFS permission")
return err
}
}
if len(*permissions) > 0 {
time.Sleep(time.Second * 5) // wait for NFS permissions reconfiguration
logger.Trace().Str("filesystem", fsName).Msg("Deleted NFS permissions")
}
return nil
}

func (a *ApiClient) GetFileSystemMountToken(ctx context.Context, r *FileSystemMountTokenRequest, token *FileSystemMountToken) error {
op := "GetFileSystemMountToken"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
Expand Down Expand Up @@ -260,6 +282,7 @@ func (fs *FileSystem) GetType() string {
return "filesystem"
}

//goland:noinspection GoUnusedParameter
func (fs *FileSystem) GetBasePath(a *ApiClient) string {
return "fileSystems"
}
Expand Down
1 change: 1 addition & 0 deletions pkg/wekafs/apiclient/interfacegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (i *InterfaceGroup) GetType() string {
return "interfaceGroup"
}

//goland:noinspection GoUnusedParameter
func (i *InterfaceGroup) GetBasePath(client *ApiClient) string {
return "interfaceGroups"
}
Expand Down
95 changes: 85 additions & 10 deletions pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/helm/pkg/urlutil"
"strconv"
"strings"
"time"
)

type NfsPermissionType string
Expand All @@ -33,6 +34,7 @@ func (n NfsVersionString) AsWeka() NfsVersionString {

type NfsAuthType string

//goland:noinspection GoUnusedConst
const (
NfsPermissionTypeReadWrite NfsPermissionType = "RW"
NfsPermissionTypeReadOnly NfsPermissionType = "RO"
Expand Down Expand Up @@ -75,13 +77,14 @@ func (n *NfsPermission) GetType() string {
return "nfsPermission"
}

//goland:noinspection GoUnusedParameter
func (n *NfsPermission) GetBasePath(a *ApiClient) string {
return "nfs/permissions"
}

func (n *NfsPermission) GetApiUrl(a *ApiClient) string {
url, err := urlutil.URLJoin(n.GetBasePath(a), n.Uid.String())
if err != nil {
if err == nil {
return url
}
return ""
Expand All @@ -105,29 +108,39 @@ func (n *NfsPermission) IsEligibleForCsi() bool {
n.SquashMode == NfsPermissionSquashModeNone
}

func (a *ApiClient) GetNfsPermissions(ctx context.Context, fsUid uuid.UUID, permissions *[]NfsPermission) error {
n := &NfsPermission{}

err := a.Get(ctx, n.GetBasePath(a), nil, permissions)
func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error {
op := "FindNfsPermissionsByFilter"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
ret := &[]NfsPermission{}
q, _ := qs.Values(query)
err := a.Get(ctx, query.GetBasePath(a), q, ret)
if err != nil {
return err
}
for _, r := range *ret {
if r.EQ(query) {
*resultSet = append(*resultSet, r)
}
}
return nil
}

func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error {
func (a *ApiClient) FindNfsPermissionsByFilesystem(ctx context.Context, fsName string, resultSet *[]NfsPermission) error {
op := "FindNfsPermissionsByFilter"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
ret := &[]NfsPermission{}
query := &NfsPermission{Filesystem: fsName}
q, _ := qs.Values(query)
err := a.Get(ctx, query.GetBasePath(a), q, ret)
if err != nil {
return err
}
for _, r := range *ret {
if r.EQ(query) {
if r.Filesystem == query.Filesystem {
*resultSet = append(*resultSet, r)
}
}
Expand Down Expand Up @@ -199,16 +212,22 @@ func (a *ApiClient) CreateNfsPermission(ctx context.Context, r *NfsPermissionCre
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("nfs_permission", r.String()).Logger()
if !r.hasRequiredFields() {
return RequestMissingParams
}
payload, err := json.Marshal(r)
if err != nil {
logger.Error().Err(err).Msg("Failed to marshal request")
return err
}

logger.Trace().Msg("Creating permission")
err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, p)
return err
if err != nil {
logger.Error().Err(err).Msg("Failed to create NFS permission")
return err
}
return nil
}

func EnsureNfsPermission(ctx context.Context, fsName string, group string, version NfsVersionString, apiClient *ApiClient) error {
Expand All @@ -235,12 +254,66 @@ func EnsureNfsPermission(ctx context.Context, fsName string, group string, versi
AnonUid: 65534,
SupportedVersions: &[]string{string(NfsVersionV4)},
}
return apiClient.CreateNfsPermission(ctx, req, perm)
if err := apiClient.CreateNfsPermission(ctx, req, perm); err != nil {
return err
}
time.Sleep(5 * time.Second) // wait for the permission to be applied
return nil
}
}
return err
}

type NfsPermissionDeleteRequest struct {
Uid uuid.UUID `json:"-"`
}

func (pd *NfsPermissionDeleteRequest) getApiUrl(a *ApiClient) string {
return pd.getRelatedObject().GetApiUrl(a)
}

func (pd *NfsPermissionDeleteRequest) getRelatedObject() ApiObject {
return &NfsPermission{Uid: pd.Uid}
}

func (pd *NfsPermissionDeleteRequest) getRequiredFields() []string {
return []string{"Uid"}
}

func (pd *NfsPermissionDeleteRequest) hasRequiredFields() bool {
return ObjectRequestHasRequiredFields(pd)
}

func (pd *NfsPermissionDeleteRequest) String() string {
return fmt.Sprintln("NfsPermissionDeleteRequest(uid:", pd.Uid)
}

func (a *ApiClient) DeleteNfsPermission(ctx context.Context, r *NfsPermissionDeleteRequest) error {
op := "DeleteNfsPermission"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
if !r.hasRequiredFields() {
return RequestMissingParams
}
apiResponse := &ApiResponse{}
err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse)
if err != nil {
switch t := err.(type) {
case *ApiNotFoundError:
return ObjectNotFoundError
case *ApiBadRequestError:
for _, c := range t.ApiResponse.ErrorCodes {
if c == "PermissionDoesNotExistException" {
return ObjectNotFoundError
}
}
}
}
time.Sleep(5 * time.Second) // wait for the permission to be removed
return nil
}

type NfsClientGroup struct {
Uid uuid.UUID `json:"uid,omitempty" url:"-"`
Rules []NfsClientGroupRule `json:"rules,omitempty" url:"-"`
Expand All @@ -252,6 +325,7 @@ func (g *NfsClientGroup) GetType() string {
return "clientGroup"
}

//goland:noinspection GoUnusedParameter
func (g *NfsClientGroup) GetBasePath(a *ApiClient) string {
return "nfs/clientGroups"
}
Expand Down Expand Up @@ -739,6 +813,7 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName
logger.Trace().Str("ip_address", ip).Msg("Ensuring NFS Client Group Rule for IP")
err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, ip)
if err != nil {
logger.Error().Err(err).Str("ip_address", ip).Msg("Failed to ensure NFS client group rule for IP")
return err
}
// Ensure NFS permission
Expand Down
35 changes: 23 additions & 12 deletions pkg/wekafs/apiclient/nfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ var fsName string
var client *ApiClient

func TestMain(m *testing.M) {
flag.StringVar(&endpoint, "api-endpoint", "vm125-1726039130891528-0.lan:14000", "API endpoint for tests")
flag.StringVar(&endpoint, "api-endpoint", "localhost:14000", "API endpoint for tests")
flag.StringVar(&creds.Username, "api-username", "admin", "API username for tests")
flag.StringVar(&creds.Password, "api-password", "AAbb1234", "API password for tests")
flag.StringVar(&creds.Password, "api-password", "Qwerty1@", "API password for tests")
flag.StringVar(&creds.Organization, "api-org", "Root", "API org for tests")
flag.StringVar(&creds.HttpScheme, "api-scheme", "https", "API scheme for tests")
flag.StringVar(&fsName, "fs-name", "default", "Filesystem name for tests")
Expand Down Expand Up @@ -81,16 +81,27 @@ func GetApiClientForTest(t *testing.T) *ApiClient {
// assert.NoError(t, err)
// assert.NotNil(t, result)
//}
//
//func TestGetNfsPermissionsByFilesystemName(t *testing.T) {
// apiClient := GetApiClientForTest(t)
//
//
// var permissions []NfsPermission
// err := apiClient.GetNfsPermissionsByFilesystemName(context.Background(), "fs1", &permissions)
// assert.NoError(t, err)
// assert.NotEmpty(t, permissions)
//}

func TestFindNfsPermissionsByFilesystemName(t *testing.T) {
apiClient := GetApiClientForTest(t)

var permissions []NfsPermission
err := apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions)
assert.NoError(t, err)
assert.NotEmpty(t, permissions)
if len(permissions) > 0 {
for _, p := range permissions {
r := &NfsPermissionDeleteRequest{Uid: p.Uid}
err := apiClient.DeleteNfsPermission(context.Background(), r)
assert.NoError(t, err)
}
}
err = apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions)
assert.NoError(t, err)
assert.Empty(t, permissions)

}

//
//func TestGetNfsPermissionByUid(t *testing.T) {
// apiClient := GetApiClientForTest(t)
Expand Down
19 changes: 11 additions & 8 deletions pkg/wekafs/apiclient/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (
type QuotaType string
type QuotaStatus string

const QuotaTypeHard QuotaType = "HARD"
const QuotaTypeSoft QuotaType = "SOFT"
const QuotaTypeDefault = QuotaTypeHard
const QuotaStatusActive = "ACTIVE"
const QuotaStatusPending = "ADDING"
const QuotaStatusError = "ERROR"
const QuotaStatusDeleting = "DELETING"
const MaxQuotaSize uint64 = 9223372036854775807
//goland:noinspection GoUnusedConst
const (
QuotaTypeHard QuotaType = "HARD"
QuotaTypeSoft QuotaType = "SOFT"
QuotaTypeDefault = QuotaTypeHard
QuotaStatusActive = "ACTIVE"
QuotaStatusPending = "ADDING"
QuotaStatusError = "ERROR"
QuotaStatusDeleting = "DELETING"
MaxQuotaSize uint64 = 9223372036854775807
)

type Quota struct {
FilesystemUid uuid.UUID `json:"-"`
Expand Down
1 change: 1 addition & 0 deletions pkg/wekafs/apiclient/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (snap *Snapshot) GetType() string {
return "snapshot"
}

//goland:noinspection GoUnusedParameter
func (snap *Snapshot) GetBasePath(a *ApiClient) string {
return "snapshots"
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/wekafs/apiclient/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func GetNodeIpAddressByRouting(targetHost string) (string, error) {
if err != nil {
return "", err
}
defer conn.Close()
defer func() { _ = conn.Close() }()

// Set a deadline for the connection
err = conn.SetDeadline(time.Now().Add(1 * time.Second))
Expand Down
4 changes: 4 additions & 0 deletions pkg/wekafs/identityserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type identityServer struct {
config *DriverConfig
}

//goland:noinspection GoExportedFuncWithUnexportedType
func NewIdentityServer(name, version string, config *DriverConfig) *identityServer {
return &identityServer{
name: name,
Expand All @@ -44,6 +45,7 @@ func NewIdentityServer(name, version string, config *DriverConfig) *identityServ
}
}

//goland:noinspection GoUnusedParameter
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
op := "GetPluginInfo"
result := "SUCCESS"
Expand Down Expand Up @@ -78,6 +80,7 @@ func (ids *identityServer) getConfig() *DriverConfig {
return ids.config
}

//goland:noinspection GoUnusedParameter
func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
logger := log.Ctx(ctx)
isReady := ids.getConfig().isInDevMode() || isWekaInstalled()
Expand All @@ -96,6 +99,7 @@ func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*c
}, nil
}

//goland:noinspection GoUnusedParameter
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
op := "GetPluginCapabilities"
result := "SUCCESS"
Expand Down
2 changes: 1 addition & 1 deletion pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
}

if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName, apiclient.NfsVersionV4, m.clientGroupName) != nil {
logger.Error().Msg("Failed to ensure NFS permissions")
logger.Error().Err(err).Msg("Failed to ensure NFS permissions")
return errors.New("failed to ensure NFS permissions")
}

Expand Down
Loading