Skip to content

Commit

Permalink
Merge pull request #3826 from butonic/decomposedfs-by-group-index
Browse files Browse the repository at this point in the history
add by group index to decomposedfs
  • Loading branch information
aduffeck authored Apr 27, 2023
2 parents 60610c7 + 12f0d93 commit 0d031f3
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 23 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/decomposedfs-by-group-index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: Add by group index to decomposedfs

https://github.com/cs3org/reva/pull/3826
3 changes: 3 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/jellydator/ttlcache/v2"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -95,6 +96,8 @@ type Decomposedfs struct {
chunkHandler *chunking.ChunkHandler
stream events.Stream
cache cache.StatCache

UserCache *ttlcache.Cache
}

// NewDefault returns an instance with default components
Expand Down
32 changes: 23 additions & 9 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
return err
}

spaceGrant := ctx.Value(utils.SpaceGrant)

var attr string
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
attr = prefixes.GrantGroupAcePrefix + g.Grantee.GetGroupId().OpaqueId
Expand All @@ -214,18 +212,34 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
return err
}

// TODO we need an index for groups
if spaceGrant != nil && g.Grantee.Type != provider.GranteeType_GRANTEE_TYPE_GROUP {
// remove from user index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", g.Grantee.GetUserId().OpaqueId, grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
if isShareGrant(ctx) {
// do not invalidate by user or group indexes
// FIXME we should invalidate the by-type index, but that requires reference counting
} else {
// invalidate space grant
switch {
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
// remove from user index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", g.Grantee.GetUserId().GetOpaqueId(), grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
}
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
// remove from group index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", g.Grantee.GetGroupId().GetOpaqueId(), grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
}
}
}

return fs.tp.Propagate(ctx, grantNode, 0)
}

func isShareGrant(ctx context.Context) bool {
return ctx.Value(utils.SpaceGrant) == nil
}

// UpdateGrant updates a grant on a resource
// TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92
func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) error {
Expand Down Expand Up @@ -319,7 +333,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
}

// update the indexes only after successfully setting the grant
err := fs.updateIndexes(ctx, g.GetGrantee().GetUserId().GetOpaqueId(), spaceType, n.ID)
err := fs.updateIndexes(ctx, g.GetGrantee(), spaceType, n.ID)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {

// the gateway address
GatewayAddr string `mapstructure:"gateway_addr"`

// the metadata backend to use, currently supports `xattr` or `ini`
MetadataBackend string `mapstructure:"metadata_backend"`

Expand Down Expand Up @@ -99,6 +102,8 @@ func New(m map[string]interface{}) (*Options, error) {
return nil, err
}

o.GatewayAddr = sharedconf.GetGatewaySVC(o.GatewayAddr)

if o.MetadataBackend == "" {
o.MetadataBackend = "xattrs"
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/utils/decomposedfs/spacepermissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package decomposedfs
import (
"context"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
v1beta11 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -70,12 +72,12 @@ func (p Permissions) ListAllSpaces(ctx context.Context) bool {
}

// ListSpacesOfUser returns true when the user is allowed to list the spaces of the given user
func (p Permissions) ListSpacesOfUser(ctx context.Context, userid string) bool {
switch userid {
case userIDAny:
func (p Permissions) ListSpacesOfUser(ctx context.Context, userid *userv1beta1.UserId) bool {
switch {
case userid == nil:
// there is no filter
return true // TODO: is `true` actually correct here? Shouldn't we check for ListAllSpaces too?
case ctxpkg.ContextMustGetUser(ctx).GetId().GetOpaqueId():
case utils.UserIDEqual(ctxpkg.ContextMustGetUser(ctx).GetId(), userid):
return true
default:
return p.ListAllSpaces(ctx)
Expand Down
113 changes: 103 additions & 10 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
Expand All @@ -54,13 +55,13 @@ const (
spaceTypeShare = "share"
spaceTypeAny = "*"
spaceIDAny = "*"
userIDAny = "*"

quotaUnrestricted = 0
)

// CreateStorageSpace creates a storage space
func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
ctx = context.WithValue(ctx, utils.SpaceGrant, struct{}{})

// "everything is a resource" this is the unique ID for the Space resource.
spaceID := uuid.New().String()
Expand Down Expand Up @@ -147,7 +148,10 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
}

// Write index
err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
err = fs.updateIndexes(ctx, &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_USER,
Id: &provider.Grantee_UserId{UserId: req.GetOwner().GetId()},
}, req.Type, root.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,7 +222,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
var (
spaceID = spaceIDAny
nodeID = spaceIDAny
requestedUserID = userIDAny
requestedUserID *userv1beta1.UserId
)

spaceTypes := map[string]struct{}{}
Expand All @@ -241,10 +245,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
case provider.ListStorageSpacesRequest_Filter_TYPE_USER:
// TODO: refactor this to GetUserId() in cs3
requestedUserID = filter[i].GetUser().GetOpaqueId()
requestedUserID = filter[i].GetUser()
case provider.ListStorageSpacesRequest_Filter_TYPE_OWNER:
// TODO: improve further by not evaluating shares
requestedUserID = filter[i].GetOwner().GetOpaqueId()
requestedUserID = filter[i].GetOwner()
}
}
if len(spaceTypes) == 0 {
Expand Down Expand Up @@ -292,8 +296,8 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide

matches := map[string]struct{}{}

if requestedUserID != userIDAny {
path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID, nodeID)
if requestedUserID != nil {
path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
Expand All @@ -305,9 +309,35 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
matches[link] = struct{}{}
}

// get Groups for userid
user := ctxpkg.ContextMustGetUser(ctx)
// TODO the user from context may not have groups populated
if !utils.UserIDEqual(user.GetId(), requestedUserID) {
user, err = fs.UserIDToUserAndGroups(ctx, requestedUserID)
if err != nil {
return nil, err // TODO log and continue?
}
}

for _, group := range user.Groups {
path := filepath.Join(fs.o.Root, "indexes", "by-group-id", group, nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
matches[link] = struct{}{}
}
}

}

if requestedUserID == userIDAny {
if requestedUserID == nil {
for spaceType := range spaceTypes {
path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, nodeID)
m, err := filepath.Glob(path)
Expand Down Expand Up @@ -412,6 +442,31 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide

}

// UserIDToUserAndGroups converts a user ID to a user with groups
func (fs *Decomposedfs) UserIDToUserAndGroups(ctx context.Context, userid *userv1beta1.UserId) (*userv1beta1.User, error) {
user, err := fs.UserCache.Get(userid.GetOpaqueId())
if err == nil {
return user.(*userv1beta1.User), nil
}

gwConn, err := pool.GetGatewayServiceClient(fs.o.GatewayAddr)
if err != nil {
return nil, err
}
getUserResponse, err := gwConn.GetUser(ctx, &userv1beta1.GetUserRequest{
UserId: userid,
SkipFetchingUserGroups: false,
})
if err != nil {
return nil, err
}
if getUserResponse.Status.Code != v1beta11.Code_CODE_OK {
return nil, status.NewErrorFromCode(getUserResponse.Status.Code, "gateway")
}
_ = fs.UserCache.Set(userid.GetOpaqueId(), getUserResponse.GetUser())
return getUserResponse.GetUser(), nil
}

// MustCheckNodePermissions checks if permission checks are needed to be performed when user requests spaces
func (fs *Decomposedfs) MustCheckNodePermissions(ctx context.Context, unrestricted bool) bool {
// canListAllSpaces indicates if the user has the permission from the global user role
Expand Down Expand Up @@ -649,12 +704,26 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
return n.SetDTime(&dtime)
}

func (fs *Decomposedfs) updateIndexes(ctx context.Context, userID, spaceType, spaceID string) error {
func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Grantee, spaceType, spaceID string) error {
err := fs.linkStorageSpaceType(ctx, spaceType, spaceID)
if err != nil {
return err
}
return fs.linkSpaceByUser(ctx, userID, spaceID)
if isShareGrant(ctx) {
// FIXME we should count the references for the by-type index currently removing the second share from the same
// space cannot determine if the by-type should be deletet, which is why we never delete them ...
return nil
}

// create space grant index
switch {
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
return fs.linkSpaceByUser(ctx, grantee.GetUserId().GetOpaqueId(), spaceID)
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
return fs.linkSpaceByGroup(ctx, grantee.GetGroupId().GetOpaqueId(), spaceID)
default:
return errtypes.BadRequest("invalid grantee type: " + grantee.GetType().String())
}
}

func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID string) error {
Expand All @@ -681,6 +750,30 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str
return nil
}

func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID string) error {
if groupID == "" {
return nil
}
// create group index dir
// TODO: pathify groupid
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-group-id", groupID), 0700); err != nil {
return err
}

err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "indexes/by-group-id", groupID, spaceID))
if err != nil {
if isAlreadyExists(err) {
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("symlink already exists")
// FIXME: is it ok to wipe this err if the symlink already exists?
err = nil //nolint
} else {
// TODO how should we handle error cases here?
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("could not create symlink")
}
}
return nil
}

// TODO: implement linkSpaceByGroup

func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType string, spaceID string) error {
Expand Down

0 comments on commit 0d031f3

Please sign in to comment.