Skip to content

Commit

Permalink
fix(CSI-234): locar instead of go native rmdirtree
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Jul 30, 2024
1 parent b0b2e20 commit b93a9d9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 127 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ RUN apk add util-linux libselinux libselinux-utils util-linux pciutils usbutils
# Update CA certificates
RUN apk add ca-certificates
RUN update-ca-certificates
ADD https://github.com/tigrawap/locar/releases/download/0.4.0/locar_linux_amd64 /locar
RUN chmod +x /locar
COPY --from=go-builder /bin/wekafsplugin /wekafsplugin
ARG binary=/bin/wekafsplugin
ENTRYPOINT ["/wekafsplugin"]
139 changes: 12 additions & 127 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"go.opentelemetry.io/otel"
"io"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
)

const garbagePath = ".__internal__wekafs-async-delete"
Expand Down Expand Up @@ -38,9 +38,7 @@ func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Info().Msg("Triggering garbage collection of volume")
fsName := volume.FilesystemName
gc.moveVolumeToTrash(ctx, volume) // always do it synchronously
go gc.initiateGarbageCollection(ctx, fsName, volume.apiClient)
}

func (gc *innerPathVolGc) moveVolumeToTrash(ctx context.Context, volume *Volume) {
Expand Down Expand Up @@ -81,106 +79,6 @@ func (gc *innerPathVolGc) moveVolumeToTrash(ctx context.Context, volume *Volume)
logger.Debug().Str("full_path", fullPath).Str("volume_trash_location", volumeTrashLoc).Msg("Volume contents moved to trash")
}

func deleteDirectoryWorker(paths <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for path := range paths {
err := os.RemoveAll(path)
if err != nil {
fmt.Printf("Failed to remove %s: %s\n", path, err)
}
}
}

//func deleteDirectoryTree(ctx context.Context, path string) error {
// op := "deleteDirectoryTree"
// ctx, span := otel.Tracer(TracerName).Start(ctx, op)
// defer span.End()
// ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
// logger := log.Ctx(ctx).With().Str("path", path).Logger()
// paths := make(chan string, garbageCollectionMaxThreads)
// var wg sync.WaitGroup
//
// // Start deleteDirectoryWorker goroutines
// for i := 0; i < garbageCollectionMaxThreads; i++ {
// wg.Add(1)
// go deleteDirectoryWorker(paths, &wg)
// }
//
// // Walk the directory tree and send paths to the workers
// err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
// if err != nil {
// return err
// }
// paths <- path
// return nil
// })
// if err != nil {
// close(paths)
// logger.Trace().Msg("Waiting for deletion workers to finish")
// wg.Wait()
// return err
// }
//
// // Close the paths channel and wait for all workers to finish
// close(paths)
// wg.Wait()
//
// return nil
//}

func deleteDirectoryRecursively(ctx context.Context, path string, wg *sync.WaitGroup, errChan chan<- error, sem chan struct{}) {
op := "deleteDirectoryRecursively"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("path", path).Logger()

dir, err := os.Open(path)
if err != nil {
errChan <- fmt.Errorf("failed to open directory %s: %v", path, err)
return
}
defer dir.Close()

for {
names, err := dir.Readdirnames(100) // Read directory entries in chunks
if err != nil {
if err != io.EOF {
logger.Error().Err(err).Msg("Failed to read directory entries")
errChan <- fmt.Errorf("failed to read directory entries: %v", err)
}
break
}
if len(names) == 0 {
break
}
logger.Trace().Int("num_entries", len(names)).Msg("Processing directory entries")
for _, name := range names {
subPath := filepath.Join(path, name)
go func(p string) {
logger.Trace().Str("sub_path", p).Msg("Processing subpath, acquiring semaphore")
sem <- struct{}{} // Acquire semaphore
logger.Trace().Str("sub_path", p).Msg("Processing subpath, acquired semaphore")
wg.Add(1)
defer wg.Done()
defer func() { <-sem }() // Release semaphore
fi, err := os.Lstat(p)
if err != nil {
logger.Error().Err(err).Str("path", p).Msg("Failed to stat path")
errChan <- fmt.Errorf("failed to stat %s: %v", p, err)
return
}
if fi.IsDir() {
deleteDirectoryRecursively(ctx, p, wg, errChan, sem) // Recurse into subdirectory
if err := os.Remove(p); err != nil {
errChan <- fmt.Errorf("failed to remove entry %s: %v", p, err)
}
}
}(subPath)
}
}
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
op := "purgeLeftovers"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
Expand All @@ -198,33 +96,20 @@ func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClie
}
volumeTrashLoc := filepath.Join(path, garbagePath)

var wg sync.WaitGroup
errChan := make(chan error, 10000)
sem := make(chan struct{}, 1000)
go deleteDirectoryRecursively(ctx, volumeTrashLoc, &wg, errChan, sem)

time.Sleep(3 * time.Second) // Wait for some time to allow the workers to start
wg.Wait()

if err := os.Remove(path); err != nil {
errChan <- fmt.Errorf("failed to remove file %s: %v", path, err)
}

close(errChan)
for err := range errChan {
if fileExists("/locar") {
logger.Debug().Msg("Using locar for fast deletion")
deleteCmd := exec.Command("bash", "-c", fmt.Sprintf("/locar --type dir %s | /usr/bin/xargs -P32 -0 -n16 rm -rf", volumeTrashLoc))
output, err := deleteCmd.CombinedOutput()
if err != nil {
logger.Warn().Err(err).Msg("Error occured during deletion")
logger.Error().Err(err).Msg("Error running locar")
}
logger.Trace().Str("output", string(output)).Msg("Output of locar")
} else {
logger.Debug().Msg("Using default deletion method")
if err := os.RemoveAll(volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("path", volumeTrashLoc).Msg("Failed to perform garbage collection")
}
}

//
//err = deleteDirectoryTree(ctx, volumeTrashLoc)
//if err != nil {
// logger.Error().Err(err).Msg("Failed to remove directory tree")
//} else {
// logger.Trace().Msg("Directory tree deleted successfully")
//}

logger.Debug().Msg("Garbage collection completed")
gc.Lock()
defer gc.Unlock()
Expand Down
11 changes: 11 additions & 0 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ func PathExists(p string) bool {
return true
}

func fileExists(filename string) bool {
_, err := os.Stat(filename)
if err == nil {
return true
}
if os.IsNotExist(err) {
return false
}
return false
}

func PathIsWekaMount(ctx context.Context, path string) bool {
file, err := os.Open("/proc/mounts")
if err != nil {
Expand Down

0 comments on commit b93a9d9

Please sign in to comment.