Skip to content

Commit

Permalink
Purge spaces (cs3org#2431)
Browse files Browse the repository at this point in the history
* purge shares when purging storage spaces

When purging a space we also want to delete all shares in that space.
This is a first naive implementation for that but ideally we want to
solve the with an event system eventually to decouple the services.

* purge spaces in the storage driver

Spaces can now be purged in a two step process.
The code currently doesn't purge the blobs though.

* implement review remarks

* prevent normal users from listing deleted spaces

* refactor share storage id filter

* implement review remarks

* list correct number of trashed spaces

Signed-off-by: Jörn Friedrich Dreyer <[email protected]>

Co-authored-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
David Christofas and butonic committed Feb 14, 2022
1 parent b061960 commit bb960af
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 13 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/purge-spaces.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Delete shares when purging spaces

Implemented the second step of the two step spaces delete process.
The first step is marking the space as deleted, the second step is actually purging the space.
During the second step all shares, including public shares, in the space will be deleted.
When deleting a space the blobs are currently not yet deleted since the decomposedfs will receive some changes soon.

https://github.com/cs3org/reva/pull/2431
78 changes: 74 additions & 4 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/BurntSushi/toml"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
collaborationv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1"
linkv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/link/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
Expand All @@ -40,14 +42,15 @@ import (
"github.com/cs3org/reva/pkg/appctx"
ctxpkg "github.com/cs3org/reva/pkg/ctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/publicshare"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/rhttp/router"
sdk "github.com/cs3org/reva/pkg/sdk/common"
"github.com/cs3org/reva/pkg/share"
"github.com/cs3org/reva/pkg/utils"
"github.com/golang-jwt/jwt"
"github.com/pkg/errors"

gstatus "google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -290,7 +293,13 @@ func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorag
}

func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) (*provider.DeleteStorageSpaceResponse, error) {
// TODO: needs to be fixed
opaque := req.Opaque
var purge bool
// This is just a temporary hack until the CS3 API get's updated to have a dedicated purge parameter or a dedicated PurgeStorageSpace method.
if opaque != nil {
_, purge = opaque.Map["purge"]
}

storageid, opaqeid, err := utils.SplitStorageSpaceID(req.Id.OpaqueId)
if err != nil {
return &provider.DeleteStorageSpaceResponse{
Expand All @@ -309,15 +318,76 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
}, nil
}

res, err := c.DeleteStorageSpace(ctx, req)
dsRes, err := c.DeleteStorageSpace(ctx, req)
if err != nil {
return &provider.DeleteStorageSpaceResponse{
Status: status.NewStatusFromErrType(ctx, "gateway could not call DeleteStorageSpace", err),
}, nil
}

s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), &provider.ResourceId{OpaqueId: req.Id.OpaqueId})
return res, nil

if dsRes.Status.Code != rpc.Code_CODE_OK {
return dsRes, nil
}

if !purge {
return dsRes, nil
}

log := appctx.GetLogger(ctx)
log.Debug().Msg("purging storage space")
// List all shares in this storage space
lsRes, err := s.ListShares(ctx, &collaborationv1beta1.ListSharesRequest{
Filters: []*collaborationv1beta1.Filter{share.StorageIDFilter(storageid)},
})
switch {
case err != nil:
return &provider.DeleteStorageSpaceResponse{
Status: status.NewStatusFromErrType(ctx, "gateway could not delete shares of StorageSpace", err),
}, nil
case lsRes.Status.Code != rpc.Code_CODE_OK:
return &provider.DeleteStorageSpaceResponse{
Status: status.NewInternal(ctx, "gateway could not delete shares of StorageSpace"),
}, nil
}
for _, share := range lsRes.Shares {
rsRes, err := s.RemoveShare(ctx, &collaborationv1beta1.RemoveShareRequest{
Ref: &collaborationv1beta1.ShareReference{
Spec: &collaborationv1beta1.ShareReference_Id{Id: share.Id},
},
})
if err != nil || rsRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Err(err).Interface("status", rsRes.Status).Str("share_id", share.Id.OpaqueId).Msg("failed to delete share")
}
}

// List all public shares in this storage space
lpsRes, err := s.ListPublicShares(ctx, &linkv1beta1.ListPublicSharesRequest{
Filters: []*linkv1beta1.ListPublicSharesRequest_Filter{publicshare.StorageIDFilter(storageid)},
})
switch {
case err != nil:
return &provider.DeleteStorageSpaceResponse{
Status: status.NewStatusFromErrType(ctx, "gateway could not delete shares of StorageSpace", err),
}, nil
case lpsRes.Status.Code != rpc.Code_CODE_OK:
return &provider.DeleteStorageSpaceResponse{
Status: status.NewInternal(ctx, "gateway could not delete shares of StorageSpace"),
}, nil
}
for _, share := range lpsRes.Share {
rsRes, err := s.RemovePublicShare(ctx, &linkv1beta1.RemovePublicShareRequest{
Ref: &linkv1beta1.PublicShareReference{
Spec: &linkv1beta1.PublicShareReference_Id{Id: share.Id},
},
})
if err != nil || rsRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Err(err).Interface("status", rsRes.Status).Str("share_id", share.Id.OpaqueId).Msg("failed to delete share")
}
}

return dsRes, nil
}

func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/publicshare/publicshare.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
"github.com/cs3org/reva/pkg/utils"
)

const (
// StorageIDFilterType defines a new filter type for storage id.
// TODO: Remove this once the filter type is in the CS3 API.
StorageIDFilterType link.ListPublicSharesRequest_Filter_Type = 4
)

// Manager manipulates public shares.
type Manager interface {
CreatePublicShare(ctx context.Context, u *user.User, md *provider.ResourceInfo, g *link.Grant) (*link.PublicShare, error)
Expand Down Expand Up @@ -95,11 +101,25 @@ func ResourceIDFilter(id *provider.ResourceId) *link.ListPublicSharesRequest_Fil
}
}

// StorageIDFilter is an abstraction for creating filter by storage id.
func StorageIDFilter(id string) *link.ListPublicSharesRequest_Filter {
return &link.ListPublicSharesRequest_Filter{
Type: StorageIDFilterType,
Term: &link.ListPublicSharesRequest_Filter_ResourceId{
ResourceId: &provider.ResourceId{
StorageId: id,
},
},
}
}

// MatchesFilter tests if the share passes the filter.
func MatchesFilter(share *link.PublicShare, filter *link.ListPublicSharesRequest_Filter) bool {
switch filter.Type {
case link.ListPublicSharesRequest_Filter_TYPE_RESOURCE_ID:
return utils.ResourceIDEqual(share.ResourceId, filter.GetResourceId())
case StorageIDFilterType:
return share.ResourceId.StorageId == filter.GetResourceId().GetStorageId()
default:
return false
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/rgrpc/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func NewStatusFromErrType(ctx context.Context, msg string, err error) *rpc.Statu
return NewUnauthenticated(ctx, err, msg+": "+err.Error())
case codes.PermissionDenied:
return NewPermissionDenied(ctx, err, msg+": "+err.Error())
case codes.Unimplemented:
return NewUnimplemented(ctx, err, msg+": "+err.Error())
}
}
// the actual error can be wrapped multiple times
Expand Down
20 changes: 20 additions & 0 deletions pkg/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
"google.golang.org/genproto/protobuf/field_mask"
)

const (
// StorageIDFilterType defines a new filter type for storage id.
// TODO: Remove once this filter type is in the CS3 API.
StorageIDFilterType collaboration.Filter_Type = 7
)

//go:generate mockery -name Manager

// Manager is the interface that manipulates shares.
Expand Down Expand Up @@ -89,6 +95,18 @@ func ResourceIDFilter(id *provider.ResourceId) *collaboration.Filter {
}
}

// StorageIDFilter is an abstraction for creating filter by storage id.
func StorageIDFilter(id string) *collaboration.Filter {
return &collaboration.Filter{
Type: StorageIDFilterType,
Term: &collaboration.Filter_ResourceId{
ResourceId: &provider.ResourceId{
StorageId: id,
},
},
}
}

// IsCreatedByUser checks if the user is the owner or creator of the share.
func IsCreatedByUser(share *collaboration.Share, user *userv1beta1.User) bool {
return utils.UserEqual(user.Id, share.Owner) || utils.UserEqual(user.Id, share.Creator)
Expand Down Expand Up @@ -121,6 +139,8 @@ func MatchesFilter(share *collaboration.Share, filter *collaboration.Filter) boo
// This filter type is used to filter out "denial shares". These are currently implemented by having the permission "0".
// I.e. if the permission is 0 we don't want to show it.
return int(conversions.RoleFromResourcePermissions(share.Permissions.Permissions).OCSPermissions()) != 0
case StorageIDFilterType:
return share.ResourceId.StorageId == filter.GetResourceId().GetStorageId()
default:
return false
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ const (
QuotaUnlimited = "-3"

NoSpaceID = ""
// TrashIDDelimiter represents the characters used to separate the nodeid and the deletion time.
TrashIDDelimiter = ".T."
)

// Node represents a node in the tree and provides methods to get a Parent or Child instance
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/utils/decomposedfs/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (fs *Decomposedfs) createTrashItem(ctx context.Context, spaceID, parentNode
log.Error().Err(err).Msg("error reading trash link, skipping")
return nil, err
}
parts := strings.SplitN(filepath.Base(parentNode), ".T.", 2)
parts := strings.SplitN(filepath.Base(parentNode), node.TrashIDDelimiter, 2)
if len(parts) != 2 {
log.Error().Str("trashnode", trashnode).Interface("parts", parts).Msg("malformed trash link, skipping")
return nil, errors.New("malformed trash link")
Expand Down Expand Up @@ -191,7 +191,7 @@ func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*p
log.Error().Err(err).Str("trashRoot", trashRoot).Str("name", names[i]).Msg("error reading trash link, skipping")
continue
}
parts := strings.SplitN(filepath.Base(trashnode), ".T.", 2)
parts := strings.SplitN(filepath.Base(trashnode), node.TrashIDDelimiter, 2)
if len(parts) != 2 {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("name", names[i]).Str("trashnode", trashnode).Interface("parts", parts).Msg("malformed trash link, skipping")
continue
Expand Down
49 changes: 45 additions & 4 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,35 @@ func (fs *Decomposedfs) UpdateStorageSpace(ctx context.Context, req *provider.Up

// DeleteStorageSpace deletes a storage space
func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorageSpaceRequest) error {
opaque := req.Opaque
var purge bool
if opaque != nil {
_, purge = opaque.Map["purge"]
}

if purge {
ip := fs.lu.InternalPath(req.Id.OpaqueId)
matches, err := filepath.Glob(ip)
if err != nil {
return err
}

// TODO: remove blobs
if err := os.RemoveAll(matches[0]); err != nil {
return err
}

matches, err = filepath.Glob(filepath.Join(fs.o.Root, "spaces", spaceTypeAny, req.Id.OpaqueId))
if err != nil {
return err
}
if len(matches) != 1 {
return fmt.Errorf("delete space failed: found %d matching spaces", len(matches))
}

return os.RemoveAll(matches[0])
}

spaceID := req.Id.OpaqueId

matches, err := filepath.Glob(filepath.Join(fs.o.Root, "spaces", spaceTypeAny, spaceID))
Expand All @@ -352,20 +381,20 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
}

if len(matches) != 1 {
return fmt.Errorf("update space failed: found %d matching spaces", len(matches))
return fmt.Errorf("delete space failed: found %d matching spaces", len(matches))
}

target, err := os.Readlink(matches[0])
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("match", matches[0]).Msg("could not read link, skipping")
}

node, err := node.ReadNode(ctx, fs.lu, spaceID, filepath.Base(target))
n, err := node.ReadNode(ctx, fs.lu, spaceID, filepath.Base(target))
if err != nil {
return err
}

err = fs.tp.Delete(ctx, node)
err = fs.tp.Delete(ctx, n)
if err != nil {
return err
}
Expand All @@ -374,7 +403,16 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
if err != nil {
return err
}
return nil

trashPathMatches, err := filepath.Glob(n.InternalPath() + node.TrashIDDelimiter + "*")
if err != nil {
return err
}
if len(trashPathMatches) != 1 {
return fmt.Errorf("delete space failed: found %d matching trashed spaces", len(trashPathMatches))
}
trashPath := trashPathMatches[0]
return os.Symlink(trashPath, filepath.Join(filepath.Dir(matches[0]), filepath.Base(trashPath)))
}

// createHiddenSpaceFolder bootstraps a storage space root with a hidden ".space" folder used to store space related
Expand Down Expand Up @@ -459,6 +497,9 @@ func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node,
if err != nil || !ok {
return nil, errtypes.PermissionDenied(fmt.Sprintf("user %s is not allowed to Stat the space %+v", user.Username, space))
}
if strings.Contains(space.Root.OpaqueId, node.TrashIDDelimiter) {
return nil, errtypes.PermissionDenied(fmt.Sprintf("user %s is not allowed to list deleted spaces %+v", user.Username, space))
}
}

space.Owner = &userv1beta1.User{ // FIXME only return a UserID, not a full blown user object
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// first make node appear in the space trash
// parent id and name are stored as extended attributes in the node itself
trashLink := filepath.Join(t.root, "trash", n.SpaceRoot.ID, n.ID)
err = os.Symlink("../../nodes/"+n.SpaceRoot.ID+n.ID+".T."+deletionTime, trashLink)
err = os.Symlink("../../nodes/"+n.SpaceRoot.ID+n.ID+node.TrashIDDelimiter+deletionTime, trashLink)
if err != nil {
// To roll back changes
// TODO unset trashOriginAttr
Expand All @@ -431,7 +431,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
// at this point we have a symlink pointing to a non existing destination, which is fine

// rename the trashed node so it is not picked up when traversing up the tree and matches the symlink
trashPath := nodePath + ".T." + deletionTime
trashPath := nodePath + node.TrashIDDelimiter + deletionTime
err = os.Rename(nodePath, trashPath)
if err != nil {
// To roll back changes
Expand Down Expand Up @@ -841,7 +841,7 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) (
err = recycleNode.FindStorageSpaceRoot()

if path == "" || path == "/" {
parts := strings.SplitN(filepath.Base(link), ".T.", 2)
parts := strings.SplitN(filepath.Base(link), node.TrashIDDelimiter, 2)
if len(parts) != 2 {
appctx.GetLogger(ctx).Error().Err(err).Str("trashItem", trashItem).Interface("parts", parts).Msg("malformed trash link")
return
Expand Down

0 comments on commit bb960af

Please sign in to comment.