Skip to content

Commit

Permalink
S3 DeleteAllVersions: use pagination
Browse files Browse the repository at this point in the history
This way we're not limited to one page of versions.  This is likely a
purely theoretical concern, at least as we're using it today.
  • Loading branch information
justinsb committed May 31, 2020
1 parent f943710 commit 2f2b032
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions util/pkg/vfs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,29 @@ func (p *S3Path) RemoveAllVersions() error {
Prefix: aws.String(p.key),
}

response, err := client.ListObjectVersions(request)
if err != nil {
var versions []*s3.ObjectVersion
var deleteMarkers []*s3.DeleteMarkerEntry
if err := client.ListObjectVersionsPages(request, func(page *s3.ListObjectVersionsOutput, lastPage bool) error {
versions = append(versions, page.Versions...)
deleteMarkers = append(deleteMarkers, page.DeleteMarkers)
}); err != nil {
return fmt.Errorf("error listing all versions of file %s: %v", p, err)
}

if len(response.Versions) == 0 && len(response.DeleteMarkers) == 0 {
if len(versions) == 0 && len(deleteMarkers) == 0 {
return os.ErrNotExist
}

// Sometimes S3 will return paginated results if there are too many versions and markers for an object.
// This happens at about entries 1000, so it is unlikely with current use cases.
if aws.BoolValue(response.IsTruncated) {
klog.Warningf("too many versions for %s", p)
}

objects := []*s3.ObjectIdentifier{}
for _, version := range response.Versions {
var objects []s3.ObjectIdentifier
for _, version := range versions {
klog.V(8).Infof("removing file %s version %q", p, aws.StringValue(version.VersionId))
file := s3.ObjectIdentifier{
Key: version.Key,
VersionId: version.VersionId,
}
objects = append(objects, &file)
}
for _, version := range response.DeleteMarkers {
for _, version := range deleteMarkers {
klog.V(8).Infof("removing marker %s version %q", p, aws.StringValue(version.VersionId))
marker := s3.ObjectIdentifier{
Key: version.Key,
Expand All @@ -152,19 +150,26 @@ func (p *S3Path) RemoveAllVersions() error {
objects = append(objects, &marker)
}

if len(objects) > 0 {
klog.V(8).Infof("removing %d file/marker versions\n", len(objects))
for len(objects) > 0 {
klog.V(8).Infof("removing %d file/marker versions\n", len(page))

request := &s3.DeleteObjectsInput{
Bucket: aws.String(p.bucket),
Delete: &s3.Delete{
Objects: objects,
},
Delete: &s3.Delete{},
}

// DeleteObjects can only process 1000 objects per call
if len(objects) > 1000 {
request.Delete.Objects = objects[:1000]
objects = objects[1000:]
} else {
request.Delete.Objects = objects
objects = nil
}

_, err = client.DeleteObjects(request)
if err != nil {
return fmt.Errorf("error removing %d file/marker versions: %v", len(objects), err)
return fmt.Errorf("error removing %d file/marker versions: %v", len(page), err)
}
}

Expand Down

0 comments on commit 2f2b032

Please sign in to comment.