Skip to content

Commit

Permalink
fix(CSI-224): race condition on multiple volume deletion in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Jul 28, 2024
1 parent a54f898 commit 382d98f
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
5 changes: 4 additions & 1 deletion pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package wekafs

import (
"context"
"errors"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -365,6 +366,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
volume, err := NewVolumeFromId(ctx, volumeID, client, cs)
if err != nil {
// Should return ok on incorrect ID (by CSI spec)
logger.Error().Err(err).Str("volume_id", volumeID).Msg("Failed to create volume object from ID")
result = "SUCCESS"
return &csi.DeleteVolumeResponse{}, nil
}
Expand All @@ -382,7 +384,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
// cleanup
if err != nil {
if err == ErrFilesystemHasUnderlyingSnapshots {
logger.Error().Err(err).Msg("Failed to delete volume")
if errors.Is(err, ErrFilesystemHasUnderlyingSnapshots) {
return &csi.DeleteVolumeResponse{}, err
}
return DeleteVolumeError(ctx, codes.Internal, err.Error())
Expand Down
18 changes: 13 additions & 5 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@ func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *a
}

func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Info().Msg("Triggering garbage collection of volume")
fsName := volume.FilesystemName
gc.Lock()
defer gc.Unlock()
go gc.purgeVolume(ctx, volume)
if gc.isRunning[fsName] {
gc.isDeferred[fsName] = true
return
}
gc.isRunning[fsName] = true
gc.isDeferred[fsName] = true
go gc.purgeVolume(ctx, volume)
}

func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
Expand All @@ -58,6 +60,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient)
defer unmount()
volumeTrashLoc := filepath.Join(path, garbagePath)
gc.Lock()
if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil {
logger.Error().Err(err).Msg("Failed to create garbage collector directory")
} else {
Expand All @@ -70,6 +73,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
logger.Error().Err(err).Str("full_path", fullPath).
Str("volume_trash_location", volumeTrashLoc).Msg("Failed to move volume contents to volumeTrashLoc")
}
gc.Unlock()
// NOTE: there is a problem of directory leaks here. If the volume innerPath is deeper than /csi-volumes/vol-name,
// e.g. if using statically provisioned volume, we move only the deepest directory
// so if the volume is dir/v1/<filesystem>/this/is/a/path/to/volume, we might move only the `volume`
Expand All @@ -81,15 +85,15 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
if err := purgeDirectory(ctx, volumeTrashLoc); err != nil {
if err := purgeDirectory(ctx, volumeTrashLoc, volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory")
return
}

logger.Debug().Msg("Volume purged")
}

func purgeDirectory(ctx context.Context, path string) error {
func purgeDirectory(ctx context.Context, path string, rootPath string) error {
logger := log.Ctx(ctx).With().Str("path", path).Logger()
if !PathExists(path) {
logger.Error().Str("path", path).Msg("Failed to remove existing directory")
Expand All @@ -104,7 +108,7 @@ func purgeDirectory(ctx context.Context, path string) error {
for _, f := range files {
fp := filepath.Join(path, f.Name())
if f.IsDir() {
if err := purgeDirectory(ctx, fp); err != nil {
if err := purgeDirectory(ctx, fp, rootPath); err != nil {
logger.Error().Err(err).Msg("")
return err
}
Expand All @@ -113,7 +117,11 @@ func purgeDirectory(ctx context.Context, path string) error {
}
}
}
return os.Remove(path)
if path != rootPath {
// we only want to remove the directory if it's not the root directory (always retain the .__weka_async_delete)
return os.Remove(path)
}
return nil
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/wekafs/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func (m *wekaMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient)
return err
}
} else if !m.isMounted() {
logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("Mount not exists although should!")
t := PathIsWekaMount(ctx, m.mountPoint)
logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Bool("is_mounted", t).Msg("Mount not exists although should!")
if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil {
return err
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"google.golang.org/grpc/status"
timestamp "google.golang.org/protobuf/types/known/timestamppb"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -272,10 +271,21 @@ func PathExists(p string) bool {
}

func PathIsWekaMount(ctx context.Context, path string) bool {
log.Ctx(ctx).Trace().Str("full_path", path).Msg("Checking if path is wekafs mount")
mountcmd := "mount -t wekafs | grep " + path
res, _ := exec.Command("sh", "-c", mountcmd).Output()
return strings.Contains(string(res), path)
file, err := os.Open("/proc/mounts")
if err != nil {
return false
}
defer func() { _ = file.Close() }()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if len(fields) >= 3 && fields[2] == "wekafs" && fields[1] == path {
return true
}
}

return false
}

func validateVolumeId(volumeId string) error {
Expand Down

0 comments on commit 382d98f

Please sign in to comment.