Skip to content

Commit

Permalink
fix(CSI-234): better deletion for multi-entry directories
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Jul 30, 2024
1 parent a711153 commit 93fda96
Showing 1 changed file with 109 additions and 32 deletions.
141 changes: 109 additions & 32 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

const garbagePath = ".__internal__wekafs-async-delete"
const garbageCollectionMaxThreads = 32

//const garbageCollectionMaxThreads = 32

type innerPathVolGc struct {
isRunning map[string]bool
Expand Down Expand Up @@ -89,41 +90,99 @@ func deleteDirectoryWorker(paths <-chan string, wg *sync.WaitGroup) {
}
}

func deleteDirectoryTree(ctx context.Context, path string) error {
op := "purgeDirectory"
//func deleteDirectoryTree(ctx context.Context, path string) error {
// op := "deleteDirectoryTree"
// ctx, span := otel.Tracer(TracerName).Start(ctx, op)
// defer span.End()
// ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
// logger := log.Ctx(ctx).With().Str("path", path).Logger()
// paths := make(chan string, garbageCollectionMaxThreads)
// var wg sync.WaitGroup
//
// // Start deleteDirectoryWorker goroutines
// for i := 0; i < garbageCollectionMaxThreads; i++ {
// wg.Add(1)
// go deleteDirectoryWorker(paths, &wg)
// }
//
// // Walk the directory tree and send paths to the workers
// err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
// if err != nil {
// return err
// }
// paths <- path
// return nil
// })
// if err != nil {
// close(paths)
// logger.Trace().Msg("Waiting for deletion workers to finish")
// wg.Wait()
// return err
// }
//
// // Close the paths channel and wait for all workers to finish
// close(paths)
// wg.Wait()
//
// return nil
//}

func deleteDirectoryRecursively(ctx context.Context, path string, wg *sync.WaitGroup, errChan chan<- error) {
op := "deleteDirectoryRecursively"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("path", path).Logger()
paths := make(chan string, garbageCollectionMaxThreads)
var wg sync.WaitGroup

// Start deleteDirectoryWorker goroutines
for i := 0; i < garbageCollectionMaxThreads; i++ {
wg.Add(1)
go deleteDirectoryWorker(paths, &wg)
dir, err := os.Open(path)
if err != nil {
errChan <- fmt.Errorf("failed to open directory %s: %v", path, err)
return
}

// Walk the directory tree and send paths to the workers
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
for {
names, err := dir.Readdirnames(1000) // Read directory entries in chunks
if err != nil {
return err
if err != io.EOF {
logger.Error().Err(err).Msg("Failed to read directory entries")
errChan <- fmt.Errorf("failed to read directory entries: %v", err)
}
break
}
paths <- path
return nil
})
if err != nil {
close(paths)
logger.Trace().Msg("Waiting for deletion workers to finish")
wg.Wait()
return err
}

// Close the paths channel and wait for all workers to finish
close(paths)
wg.Wait()
if len(names) == 0 {
break
}
logger.Trace().Int("num_entries", len(names)).Msg("Processing directory entries")
for _, name := range names {
subPath := filepath.Join(path, name)
wg.Add(1)
go func(p string) {
defer wg.Done()
fi, err := os.Lstat(p)
if err != nil {
logger.Error().Err(err).Str("path", p).Msg("Failed to stat path")
errChan <- fmt.Errorf("failed to stat %s: %v", p, err)
return
}
if fi.IsDir() {
deleteDirectoryRecursively(ctx, p, wg, errChan) // Recurse into subdirectory
} else {
if err := os.Remove(p); err != nil {
errChan <- fmt.Errorf("failed to remove file %s: %v", p, err)
} else {
logger.Trace().Str("path", p).Msg("File removed")
}

return nil
}
}(subPath)
}
}
if err := dir.Close(); err != nil {
errChan <- fmt.Errorf("failed to close directory %s: %v", path, err)
}
// Remove the directory itself
if err := os.Remove(path); err != nil {
errChan <- fmt.Errorf("failed to remove directory %s: %v", path, err)
}
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
Expand All @@ -143,13 +202,31 @@ func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClie
}
volumeTrashLoc := filepath.Join(path, garbagePath)

err = deleteDirectoryTree(ctx, volumeTrashLoc)
if err != nil {
fmt.Printf("Error: %s\n", err)
} else {
fmt.Println("Directory tree deleted successfully")
var wg sync.WaitGroup
errChan := make(chan error, 10000)

wg.Add(1)
go deleteDirectoryRecursively(ctx, volumeTrashLoc, &wg, errChan)

go func() {
wg.Wait()
close(errChan)
}()

for err := range errChan {
if err != nil {
logger.Warn().Err(err).Msg("Error occured during deletion")
}
}

//
//err = deleteDirectoryTree(ctx, volumeTrashLoc)
//if err != nil {
// logger.Error().Err(err).Msg("Failed to remove directory tree")
//} else {
// logger.Trace().Msg("Directory tree deleted successfully")
//}

logger.Debug().Msg("Garbage collection completed")
gc.Lock()
defer gc.Unlock()
Expand Down

0 comments on commit 93fda96

Please sign in to comment.