Skip to content

Commit

Permalink
limit the no of files being garbage collected per run
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Apr 1, 2022
1 parent 754e302 commit 68ef63d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 17 deletions.
2 changes: 1 addition & 1 deletion controllers/gitrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (r *GitRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourc
return nil
}
if obj.GetArtifact() != nil {
deleted, err := r.Storage.RemoveGarbageFiles(*obj.GetArtifact(), r.artifactRetentionRecords, r.artifactRetentionTTL)
deleted, err := r.Storage.GarbageCollect(*obj.GetArtifact(), r.artifactRetentionRecords, r.artifactRetentionTTL)
if err != nil {
return &serror.Event{
Err: fmt.Errorf("garbage collection of old artifacts failed: %w", err),
Expand Down
44 changes: 29 additions & 15 deletions controllers/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ import (

"github.com/fluxcd/pkg/lockedfile"

"io/fs"

"github.com/fluxcd/pkg/untar"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/fs"
sourcefs "github.com/fluxcd/source-controller/internal/fs"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)

const GarbageCountLimit = 1000

// Storage manages artifacts
type Storage struct {
// BasePath is the local directory path where the source artifacts are stored.
Expand Down Expand Up @@ -151,7 +155,7 @@ func (s *Storage) RemoveAllButCurrent(artifact sourcev1.Artifact) ([]string, err
// 1. collect all files with an expired ttl
// 2. if we satisfy maxItemsToBeRetained, then return
// 3. else, remove all files till the latest n files remain, where n=maxItemsToBeRetained
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, totalCountLimit, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
localPath := s.LocalPath(artifact)
dir := filepath.Dir(localPath)
garbageFiles := []string{}
Expand All @@ -161,11 +165,23 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetain
now := time.Now().UTC()
totalFiles := 0
var errors []string
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
creationTimestamps := []time.Time{}
_ = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
errors = append(errors, err.Error())
return nil
}
info, err := d.Info()
if err != nil {
errors = append(errors, err.Error())
return nil
}
// Exit if we have already walked through `totalCountLimit` files, to avoid
// proccess time starvation. The remaining potential garbage files will be cleaned up
// in the next reconciler run.
if totalFiles >= totalCountLimit {
return nil
}
createdAt := info.ModTime().UTC()
diff := now.Sub(createdAt)
// compare the time difference between now and the time at which the file was created
Expand All @@ -176,9 +192,11 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetain
garbageFiles = append(garbageFiles, path)
}
totalFiles += 1
filesWithCreatedTs[info.ModTime().UTC()] = path
filesWithCreatedTs[createdAt] = path
creationTimestamps = append(creationTimestamps, createdAt)
}
return nil

})
if len(errors) > 0 {
return nil, fmt.Errorf("can't walk over file: %s", strings.Join(errors, ","))
Expand All @@ -190,10 +208,6 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetain
return garbageFiles, nil
}

creationTimestamps := []time.Time{}
for ts := range filesWithCreatedTs {
creationTimestamps = append(creationTimestamps, ts)
}
// sort all timestamps in an ascending order.
sort.Slice(creationTimestamps, func(i, j int) bool { return creationTimestamps[i].Before(creationTimestamps[j]) })
for _, ts := range creationTimestamps {
Expand Down Expand Up @@ -229,10 +243,10 @@ func (s *Storage) getGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetain
return garbageFiles, nil
}

// RemoveGarbageFiles removes all garabge files in the artifact dir according to the provided
// GarbageCollect removes all garabge files in the artifact dir according to the provided
// retention options.
func (s *Storage) RemoveGarbageFiles(artifact sourcev1.Artifact, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
garbageFiles, err := s.getGarbageFiles(artifact, maxItemsToBeRetained, ttl)
func (s *Storage) GarbageCollect(artifact sourcev1.Artifact, maxItemsToBeRetained int, ttl time.Duration) ([]string, error) {
garbageFiles, err := s.getGarbageFiles(artifact, GarbageCountLimit, maxItemsToBeRetained, ttl)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -399,7 +413,7 @@ func (s *Storage) Archive(artifact *sourcev1.Artifact, dir string, filter Archiv
return err
}

if err := fs.RenameWithFallback(tmpName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tmpName, localPath); err != nil {
return err
}

Expand Down Expand Up @@ -441,7 +455,7 @@ func (s *Storage) AtomicWriteFile(artifact *sourcev1.Artifact, reader io.Reader,
return err
}

if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}

Expand Down Expand Up @@ -479,7 +493,7 @@ func (s *Storage) Copy(artifact *sourcev1.Artifact, reader io.Reader) (err error
return err
}

if err := fs.RenameWithFallback(tfName, localPath); err != nil {
if err := sourcefs.RenameWithFallback(tfName, localPath); err != nil {
return err
}

Expand Down Expand Up @@ -539,7 +553,7 @@ func (s *Storage) CopyToPath(artifact *sourcev1.Artifact, subPath, toPath string
if err != nil {
return err
}
if err := fs.RenameWithFallback(fromPath, toPath); err != nil {
if err := sourcefs.RenameWithFallback(fromPath, toPath); err != nil {
return err
}
return nil
Expand Down
26 changes: 25 additions & 1 deletion controllers/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ func TestStorage_getGarbageFiles(t *testing.T) {
createPause time.Duration
ttl time.Duration
maxItemsToBeRetained int
totalCountLimit int
wantDeleted []string
}{
{
Expand All @@ -509,6 +510,7 @@ func TestStorage_getGarbageFiles(t *testing.T) {
},
createPause: time.Millisecond * 10,
ttl: time.Minute * 2,
totalCountLimit: 10,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
Expand All @@ -527,6 +529,7 @@ func TestStorage_getGarbageFiles(t *testing.T) {
},
createPause: time.Second * 1,
ttl: time.Second*3 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
Expand All @@ -545,12 +548,33 @@ func TestStorage_getGarbageFiles(t *testing.T) {
},
createPause: time.Second * 1,
ttl: time.Second*5 + time.Millisecond*500,
totalCountLimit: 10,
maxItemsToBeRetained: 4,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
},
},
{
name: "delete files based on ttl and maxItemsToBeRetained and totalCountLimit",
artifactPaths: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
path.Join(artifactFolder, "artifact4.tar.gz"),
path.Join(artifactFolder, "artifact5.tar.gz"),
path.Join(artifactFolder, "artifact6.tar.gz"),
},
createPause: time.Millisecond * 500,
ttl: time.Millisecond * 500,
totalCountLimit: 3,
maxItemsToBeRetained: 2,
wantDeleted: []string{
path.Join(artifactFolder, "artifact1.tar.gz"),
path.Join(artifactFolder, "artifact2.tar.gz"),
path.Join(artifactFolder, "artifact3.tar.gz"),
},
},
}

for _, tt := range tests {
Expand All @@ -574,7 +598,7 @@ func TestStorage_getGarbageFiles(t *testing.T) {
time.Sleep(tt.createPause)
}

deletedPaths, err := s.getGarbageFiles(artifact, tt.maxItemsToBeRetained, tt.ttl)
deletedPaths, err := s.getGarbageFiles(artifact, tt.totalCountLimit, tt.maxItemsToBeRetained, tt.ttl)
g.Expect(err).ToNot(HaveOccurred(), "failed to collect garbage files")
g.Expect(len(tt.wantDeleted)).To(Equal(len(deletedPaths)))
for _, wantDeletedPath := range tt.wantDeleted {
Expand Down

0 comments on commit 68ef63d

Please sign in to comment.