Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(CSI-224,WEKAPP-417375): race condition on multiple volume deletion in parallel #286

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ RUN apk add util-linux libselinux libselinux-utils util-linux pciutils usbutils
# Update CA certificates
RUN apk add ca-certificates
RUN update-ca-certificates
ADD https://github.com/tigrawap/locar/releases/download/0.4.0/locar_linux_amd64 /locar
RUN chmod +x /locar
COPY --from=go-builder /bin/wekafsplugin /wekafsplugin
ARG binary=/bin/wekafsplugin
ENTRYPOINT ["/wekafsplugin"]
142 changes: 73 additions & 69 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package wekafs

import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"go.opentelemetry.io/otel"
"io"
"os"
"os/exec"
"path/filepath"
"sync"
)

const garbagePath = ".__internal__wekafs-async-delete"

//const garbageCollectionMaxThreads = 32

type innerPathVolGc struct {
isRunning map[string]bool
isDeferred map[string]bool
Expand All @@ -26,42 +31,36 @@ func initInnerPathVolumeGc(mounter *wekaMounter) *innerPathVolGc {
return &gc
}

func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fs] {
gc.isDeferred[fs] = true
return
}
gc.isRunning[fs] = true
go gc.purgeLeftovers(ctx, fs, apiClient)
}

func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
fsName := volume.FilesystemName
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fsName] {
gc.isDeferred[fsName] = true
return
}
gc.isRunning[fsName] = true
gc.isDeferred[fsName] = true
go gc.purgeVolume(ctx, volume)
op := "triggerGcVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Info().Msg("Triggering garbage collection of volume")
gc.moveVolumeToTrash(ctx, volume) // always do it synchronously
}

func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
func (gc *innerPathVolGc) moveVolumeToTrash(ctx context.Context, volume *Volume) {
op := "moveVolumeToTrash"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Debug().Msg("Starting garbage collection of volume")
fsName := volume.FilesystemName
defer gc.finishGcCycle(ctx, fsName, volume.apiClient)
defer gc.initiateGarbageCollection(ctx, fsName, volume.apiClient)
path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient)
defer unmount()
if err != nil {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
volumeTrashLoc := filepath.Join(path, garbagePath)
if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil {
logger.Error().Err(err).Msg("Failed to create garbage collector directory")
logger.Error().Str("garbage_collection_path", volumeTrashLoc).Err(err).Msg("Failed to create garbage collector directory")
} else {
logger.Debug().Str("garbage_collection_path", volumeTrashLoc).Msg("Successfuly created garbage collection directory")
logger.Debug().Str("garbage_collection_path", volumeTrashLoc).Msg("Successfully created garbage collection directory")
}
fullPath := filepath.Join(path, volume.GetFullPath(ctx))
logger.Debug().Str("full_path", fullPath).Str("volume_trash_location", volumeTrashLoc).Msg("Moving volume contents to trash")
Expand All @@ -75,65 +74,70 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
// so if the volume is dir/v1/<filesystem>/this/is/a/path/to/volume, we might move only the `volume`
// but otherwise it could be risky as if we have multiple volumes we might remove other data too, e.g.
// vol1: dir/v1/<filesystem>/this/is/a/path/to/volume, vol2: dir/v1/<filesystem>/this/is/a/path/to/another_volume

logger.Trace().Str("purge_path", volumeTrashLoc).Msg("Purging deleted volume data")
if err != nil {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
if err := purgeDirectory(ctx, volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory")
return
}

logger.Debug().Msg("Volume purged")
}

func purgeDirectory(ctx context.Context, path string) error {
logger := log.Ctx(ctx).With().Str("path", path).Logger()
if !PathExists(path) {
logger.Error().Str("path", path).Msg("Failed to remove existing directory")
return nil
}
for !pathIsEmptyDir(path) { // to make sure that if new files still appeared during invocation
files, err := os.ReadDir(path)
if err != nil {
logger.Error().Err(err).Msg("GC failed to read directory contents")
return err
}
for _, f := range files {
fp := filepath.Join(path, f.Name())
if f.IsDir() {
if err := purgeDirectory(ctx, fp); err != nil {
logger.Error().Err(err).Msg("")
return err
}
} else if err := os.Remove(fp); err != nil {
logger.Error().Err(err).Msg("Failed to remove directory that was used mount point")
}
}
}
return os.Remove(path)
// 2024-07-29: apparently seems this is not a real problem since static volumes are not deleted this way
// and dynamic volumes are always created inside the /csi-volumes
logger.Debug().Str("full_path", fullPath).Str("volume_trash_location", volumeTrashLoc).Msg("Volume contents moved to trash")
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
defer gc.finishGcCycle(ctx, fs, apiClient)
op := "purgeLeftovers"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
gc.Lock()
gc.isRunning[fs] = true
gc.Unlock()
path, err, unmount := gc.mounter.Mount(ctx, fs, apiClient)
defer unmount()
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("filesystem", fs).Str("path", path).Msg("Failed mounting FS for garbage collection")
return
}
}
volumeTrashLoc := filepath.Join(path, garbagePath)

func (gc *innerPathVolGc) finishGcCycle(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
if fileExists("/locar") {
logger.Debug().Msg("Using locar for fast deletion")
deleteCmd := exec.Command("bash", "-c", fmt.Sprintf("/locar --type dir %s | /usr/bin/xargs -P32 -n128 rm -rf", volumeTrashLoc))
output, err := deleteCmd.CombinedOutput()
if err != nil {
logger.Error().Err(err).Msg("Error running locar")
logger.Trace().Str("output", string(output)).Msg("Locar output")
}
} else {
logger.Debug().Msg("Using default deletion method")
if err := os.RemoveAll(volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("path", volumeTrashLoc).Msg("Failed to perform garbage collection")
}
}
logger.Debug().Msg("Garbage collection completed")
gc.Lock()
defer gc.Unlock()
gc.isRunning[fs] = false
if gc.isDeferred[fs] {
gc.isDeferred[fs] = false
go gc.triggerGc(ctx, fs, apiClient)
go gc.purgeLeftovers(ctx, fs, apiClient)
}
}

func (gc *innerPathVolGc) initiateGarbageCollection(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
op := "initiateGarbageCollection"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
logger.Trace().Msg("Initiating garbage collection")
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fs] {
logger.Trace().Msg("Garbage collection already running, deferring next run")
gc.isDeferred[fs] = true
return
}
if !gc.isDeferred[fs] {
logger.Trace().Msg("Garbage collection not running, starting")
go gc.purgeLeftovers(ctx, fs, apiClient)
}
gc.Unlock()
}

// pathIsEmptyDir is a simple check to determine if directory is empty or not.
Expand Down
11 changes: 11 additions & 0 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ func PathExists(p string) bool {
return true
}

func fileExists(filename string) bool {
_, err := os.Stat(filename)
if err == nil {
return true
}
if os.IsNotExist(err) {
return false
}
return false
}

func PathIsWekaMount(ctx context.Context, path string) bool {
file, err := os.Open("/proc/mounts")
if err != nil {
Expand Down
Loading