Skip to content

Commit

Permalink
Add DeleteWithHost support for storage driver;
Browse files Browse the repository at this point in the history
Add Vacuum coding version;

Signed-off-by: Juan Chan <[email protected]>
  • Loading branch information
reedchan7 committed Jun 21, 2021
1 parent 9f86ecf commit 8c08c95
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 4 deletions.
5 changes: 5 additions & 0 deletions registry/storage/driver/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

// URLFor returns a publicly accessible URL for the blob stored at given path
// for specified duration by making use of Azure Storage Shared Access Signatures (SAS).
// See https://msdn.microsoft.com/en-us/library/azure/ee395415.aspx for more info.
Expand Down
7 changes: 6 additions & 1 deletion registry/storage/driver/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ import (
"io"
"time"

"github.com/docker/go-metrics"
dcontext "github.com/juan-chan/distribution/context"
prometheus "github.com/juan-chan/distribution/metrics"
storagedriver "github.com/juan-chan/distribution/registry/storage/driver"
"github.com/docker/go-metrics"
)

var (
Expand Down Expand Up @@ -212,6 +212,11 @@ func (base *Base) Delete(ctx context.Context, path string) error {
return err
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (base *Base) DeleteWithHost(ctx context.Context, host, path string) error {
return base.Delete(ctx, path)
}

// URLFor wraps URLFor of underlying storage driver.
func (base *Base) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
ctx, done := dcontext.WithTrace(ctx)
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/base/regulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func (r *regulator) Delete(ctx context.Context, path string) error {
return r.StorageDriver.Delete(ctx, path)
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (r *regulator) DeleteWithHost(ctx context.Context, host, path string) error {
return r.Delete(ctx, path)
}

// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// May return an ErrUnsupportedMethod in certain StorageDriver
Expand Down
78 changes: 75 additions & 3 deletions registry/storage/driver/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ import (
"text/template"
"time"

"github.com/juan-chan/distribution/registry/grpc"
"github.com/juan-chan/distribution/registry/storage/manager"

"github.com/sirupsen/logrus"
"github.com/tencentyun/cos-go-sdk-v5"

dcontext "github.com/juan-chan/distribution/context"
"github.com/juan-chan/distribution/registry/grpc"
storagedriver "github.com/juan-chan/distribution/registry/storage/driver"
"github.com/juan-chan/distribution/registry/storage/driver/base"
"github.com/juan-chan/distribution/registry/storage/driver/cos/cci"
"github.com/juan-chan/distribution/registry/storage/driver/cos/cdn"
"github.com/juan-chan/distribution/registry/storage/driver/factory"
"github.com/juan-chan/distribution/registry/storage/manager"
)

const (
Expand Down Expand Up @@ -718,6 +717,64 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
cosPath, err := d.cosPathWithHost(ctx, host, path)
if err != nil {
return err
}

opt := &cos.BucketGetOptions{
Prefix: cosPath,
MaxKeys: listMax,
}
// list max objects
listResponse, _, err := d.Client.Bucket.Get(ctx, opt)
if err != nil || len(listResponse.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: cosPath}
}

cosObjects := make([]cos.Object, listMax)

for len(listResponse.Contents) > 0 {
numCosObjects := len(listResponse.Contents)
for index, key := range listResponse.Contents {
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
if len(key.Key) > len(cosPath) && (key.Key)[len(cosPath)] != '/' {
numCosObjects = index
break
}
cosObjects[index].Key = key.Key
}

// delete by keys
opt := &cos.ObjectDeleteMultiOptions{
Objects: cosObjects[0:numCosObjects],
Quiet: false,
}
_, _, err := d.Client.Object.DeleteMulti(ctx, opt)
if err != nil {
// delete fail
return parseError(path, err)
}

// contents contain keys which not in a subpath
if numCosObjects < len(listResponse.Contents) {
return nil
}

// fetch objects again
listResponse, _, err = d.Client.Bucket.Get(ctx, &cos.BucketGetOptions{
Prefix: cosPath,
MaxKeys: listMax,
})
if err != nil {
return err
}
}
return nil
}

func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {

// FIXME: docker-for-mac will replace "%3d" to "=" , "/sign=a%3Db" -> "/sign=a=b" , so sad :!
Expand Down Expand Up @@ -989,6 +1046,21 @@ func (d *driver) cosPath(subPath string, ctx context.Context) (string, error) {

}

func (d *driver) cosPathWithHost(ctx context.Context, host, subPath string) (string, error) {
if d.StorageManagerAddress == "" {
return d.resolvePath(subPath, path.Join(subPath)), nil
}
if host == "" {
return d.cosPath(subPath, ctx)
}

storagePath, err := manager.GetStoragePath(d.grpcConnPool, d.StorageManagerAddress, host, subPath)
if err != nil {
return "", fmt.Errorf("failed to get storage path: %v", err)
}
return d.resolvePath(subPath, storagePath), nil
}

// copy copies an object stored at sourcePath to destPath.
func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) error {
fileInfo, err := d.Stat(ctx, sourcePath)
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/filesystem/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func (d *driver) Delete(ctx context.Context, subPath string) error {
return err
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down
34 changes: 34 additions & 0 deletions registry/storage/driver/filesystemmc/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,24 @@ func (d *driver) Delete(ctx context.Context, subPath string) error {
return err
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error{
fullPath, err := d.fullPathWithHost(ctx, host, path)
if err != nil {
return err
}

_, err = os.Stat(fullPath)
if err != nil && !os.IsNotExist(err) {
return err
} else if err != nil {
return storagedriver.PathNotFoundError{Path: path}
}

err = os.RemoveAll(fullPath)
return err
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down Expand Up @@ -416,6 +434,22 @@ func (d *driver) fullPath(subPath string, ctx context.Context) (string, error) {
return path.Join(d.rootDirectory, prefix, subPath), nil
}

func (d *driver) fullPathWithHost(ctx context.Context, host, subPath string) (string, error) {
prefix := getPrefix(ctx)
if d.storageManagerAddress == "" {
return path.Join(d.rootDirectory, prefix, subPath), nil
}
if host == "" {
return d.fullPath(subPath, ctx)
}

storagePath, err := manager.GetStoragePath(d.grpcConnPool, d.storageManagerAddress, host, subPath)
if err != nil {
return "", fmt.Errorf("failed to get storage path: %v", err)
}
return path.Join(d.rootDirectory, storagePath), nil
}

type fileInfo struct {
os.FileInfo
path string
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ func (d *driver) Delete(context context.Context, path string) error {
return err
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

func storageDeleteObject(context context.Context, bucket string, name string) error {
return retry(func() error {
return storage.DeleteObject(context, bucket, name)
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/inmemory/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (d *driver) Delete(ctx context.Context, path string) error {
}
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down
77 changes: 77 additions & 0 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,68 @@ ListLoop:
return nil
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
storagePath, err := d.storagePathWithHost(ctx, host, path)
if err != nil {
return err
}

s3Objects := make([]*s3.ObjectIdentifier, 0, listMax)
s3Path := storagePath
listObjectsInput := &s3.ListObjectsInput{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(s3Path),
}
ListLoop:
for {
// list all the objects
resp, err := d.S3.ListObjects(listObjectsInput)

// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
// and the loop would be exited without recalling ListObjects
if err != nil || len(resp.Contents) == 0 {
return storagedriver.PathNotFoundError{Path: storagePath}
}

for _, key := range resp.Contents {
// Stop if we encounter a key that is not a subpath (so that deleting "/a" does not delete "/ab").
if len(*key.Key) > len(s3Path) && (*key.Key)[len(s3Path)] != '/' {
break ListLoop
}
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: key.Key,
})
}

// resp.Contents must have at least one element or we would have returned not found
listObjectsInput.Marker = resp.Contents[len(resp.Contents)-1].Key

// from the s3 api docs, IsTruncated "specifies whether (true) or not (false) all of the results were returned"
// if everything has been returned, break
if resp.IsTruncated == nil || !*resp.IsTruncated {
break
}
}

// need to chunk objects into groups of 1000 per s3 restrictions
total := len(s3Objects)
for i := 0; i < total; i += 1000 {
_, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects[i:min(i+1000, total)],
Quiet: aws.Bool(false),
},
})
if err != nil {
return err
}
}
return nil
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
// May return an UnsupportedMethodErr in certain StorageDriver implementations.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
Expand Down Expand Up @@ -1154,6 +1216,21 @@ func (d *driver) storagePath(subPath string, ctx context.Context) (string, error
return d.resolvePath(subPath, path.Join(subPath)), nil
}

func (d *driver) storagePathWithHost(ctx context.Context, host, subPath string) (string, error) {
if d.StorageManagerAddress == "" {
return d.resolvePath(subPath, path.Join(subPath)), nil
}
if host == "" {
return d.storagePath(subPath, ctx)
}

storagePath, err := manager.GetStoragePath(d.grpcConnPool, d.StorageManagerAddress, host, subPath)
if err != nil {
return "", fmt.Errorf("failed to get storage path: %v", err)
}
return d.resolvePath(subPath, storagePath), nil
}

func findPrefixes(path, storagePath string) (string, string) {
for i, j := len(path)-1, len(storagePath)-1; i >= 0 && j >= 0; i, j = i-1, j-1 {
if path[i] != storagePath[j] {
Expand Down
3 changes: 3 additions & 0 deletions registry/storage/driver/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type StorageDriver interface {
// Delete recursively deletes all objects stored at "path" and its subpaths.
Delete(ctx context.Context, path string) error

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
DeleteWithHost(ctx context.Context, host, path string) error

// URLFor returns a URL which may be used to retrieve the content stored at
// the given path, possibly using the given options.
// May return an ErrUnsupportedMethod in certain StorageDriver
Expand Down
5 changes: 5 additions & 0 deletions registry/storage/driver/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ func (d *driver) Delete(ctx context.Context, path string) error {
return nil
}

// DeleteWithHost recursively deletes all objects stored at "path" and its subPaths with coding host.
func (d *driver) DeleteWithHost(ctx context.Context, host, path string) error {
return d.Delete(ctx, path)
}

// URLFor returns a URL which may be used to retrieve the content stored at the given path.
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
if d.SecretKey == "" {
Expand Down
Loading

0 comments on commit 8c08c95

Please sign in to comment.