diff --git a/pkg/wekafs/controllerserver.go b/pkg/wekafs/controllerserver.go index 3b71ef0a4..bfe9f9c37 100644 --- a/pkg/wekafs/controllerserver.go +++ b/pkg/wekafs/controllerserver.go @@ -18,6 +18,7 @@ package wekafs import ( "context" + "errors" "fmt" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/rs/zerolog" @@ -365,6 +366,7 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol volume, err := NewVolumeFromId(ctx, volumeID, client, cs) if err != nil { // Should return ok on incorrect ID (by CSI spec) + logger.Error().Err(err).Str("volume_id", volumeID).Msg("Failed to create volume object from ID") result = "SUCCESS" return &csi.DeleteVolumeResponse{}, nil } @@ -382,7 +384,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } // cleanup if err != nil { - if err == ErrFilesystemHasUnderlyingSnapshots { + logger.Error().Err(err).Msg("Failed to delete volume") + if errors.Is(err, ErrFilesystemHasUnderlyingSnapshots) { return &csi.DeleteVolumeResponse{}, err } return DeleteVolumeError(ctx, codes.Internal, err.Error()) diff --git a/pkg/wekafs/gc.go b/pkg/wekafs/gc.go index beb102dcf..3bc75fe7e 100644 --- a/pkg/wekafs/gc.go +++ b/pkg/wekafs/gc.go @@ -38,16 +38,18 @@ func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *a } func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) { + logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger() + logger.Info().Msg("Triggering garbage collection of volume") fsName := volume.FilesystemName gc.Lock() defer gc.Unlock() + go gc.purgeVolume(ctx, volume) if gc.isRunning[fsName] { gc.isDeferred[fsName] = true return } gc.isRunning[fsName] = true gc.isDeferred[fsName] = true - go gc.purgeVolume(ctx, volume) } func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { @@ -58,6 +60,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient) defer unmount() volumeTrashLoc := filepath.Join(path, garbagePath) + gc.Lock() if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil { logger.Error().Err(err).Msg("Failed to create garbage collector directory") } else { @@ -70,6 +73,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Error().Err(err).Str("full_path", fullPath). Str("volume_trash_location", volumeTrashLoc).Msg("Failed to move volume contents to volumeTrashLoc") } + gc.Unlock() // NOTE: there is a problem of directory leaks here. If the volume innerPath is deeper than /csi-volumes/vol-name, // e.g. if using statically provisioned volume, we move only the deepest directory // so if the volume is dir/v1//this/is/a/path/to/volume, we might move only the `volume` @@ -81,7 +85,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing") return } - if err := purgeDirectory(ctx, volumeTrashLoc); err != nil { + if err := purgeDirectory(ctx, volumeTrashLoc, volumeTrashLoc); err != nil { logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory") return } @@ -89,7 +93,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Debug().Msg("Volume purged") } -func purgeDirectory(ctx context.Context, path string) error { +func purgeDirectory(ctx context.Context, path string, rootPath string) error { logger := log.Ctx(ctx).With().Str("path", path).Logger() if !PathExists(path) { logger.Error().Str("path", path).Msg("Failed to remove existing directory") @@ -104,7 +108,7 @@ func purgeDirectory(ctx context.Context, path string) error { for _, f := range files { fp := filepath.Join(path, f.Name()) if f.IsDir() { - if err := purgeDirectory(ctx, fp); err != nil { + if err := purgeDirectory(ctx, fp, rootPath); err != nil { logger.Error().Err(err).Msg("") return err } @@ -113,7 +117,11 @@ func purgeDirectory(ctx context.Context, path string) error { } } } - return os.Remove(path) + if path != rootPath { + // we only want to remove the directory if it's not the root directory (always retain the .__weka_async_delete) + return os.Remove(path) + } + return nil } func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) { diff --git a/pkg/wekafs/mount.go b/pkg/wekafs/mount.go index cf02f2d55..b15931a09 100644 --- a/pkg/wekafs/mount.go +++ b/pkg/wekafs/mount.go @@ -46,7 +46,8 @@ func (m *wekaMount) incRef(ctx context.Context, apiClient *apiclient.ApiClient) return err } } else if !m.isMounted() { - logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Msg("Mount not exists although should!") + t := PathIsWekaMount(ctx, m.mountPoint) + logger.Warn().Str("mount_point", m.mountPoint).Int("refcount", m.refCount).Bool("is_mounted", t).Msg("Mount not exists although should!") if err := m.doMount(ctx, apiClient, m.mountOptions); err != nil { return err } diff --git a/pkg/wekafs/utilities.go b/pkg/wekafs/utilities.go index 0a33de5e2..6bbcf732f 100644 --- a/pkg/wekafs/utilities.go +++ b/pkg/wekafs/utilities.go @@ -17,7 +17,6 @@ import ( "google.golang.org/grpc/status" timestamp "google.golang.org/protobuf/types/known/timestamppb" "os" - "os/exec" "path/filepath" "regexp" "strings" @@ -272,10 +271,21 @@ func PathExists(p string) bool { } func PathIsWekaMount(ctx context.Context, path string) bool { - log.Ctx(ctx).Trace().Str("full_path", path).Msg("Checking if path is wekafs mount") - mountcmd := "mount -t wekafs | grep " + path - res, _ := exec.Command("sh", "-c", mountcmd).Output() - return strings.Contains(string(res), path) + file, err := os.Open("/proc/mounts") + if err != nil { + return false + } + defer func() { _ = file.Close() }() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + fields := strings.Fields(scanner.Text()) + if len(fields) >= 3 && fields[2] == "wekafs" && fields[1] == path { + return true + } + } + + return false } func validateVolumeId(volumeId string) error {