Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add by group index to decomposedfs #3826

Merged
merged 2 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/decomposedfs-by-group-index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: Add by group index to decomposedfs

https://github.com/cs3org/reva/pull/3826
3 changes: 3 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/templates"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/jellydator/ttlcache/v2"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -95,6 +96,8 @@ type Decomposedfs struct {
chunkHandler *chunking.ChunkHandler
stream events.Stream
cache cache.StatCache

UserCache *ttlcache.Cache
}

// NewDefault returns an instance with default components
Expand Down
32 changes: 23 additions & 9 deletions pkg/storage/utils/decomposedfs/grants.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
return err
}

spaceGrant := ctx.Value(utils.SpaceGrant)

var attr string
if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
attr = prefixes.GrantGroupAcePrefix + g.Grantee.GetGroupId().OpaqueId
Expand All @@ -214,18 +212,34 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference
return err
}

// TODO we need an index for groups
if spaceGrant != nil && g.Grantee.Type != provider.GranteeType_GRANTEE_TYPE_GROUP {
// remove from user index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", g.Grantee.GetUserId().OpaqueId, grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
if isShareGrant(ctx) {
// do not invalidate by user or group indexes
// FIXME we should invalidate the by-type index, but that requires reference counting
} else {
// invalidate space grant
switch {
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
// remove from user index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", g.Grantee.GetUserId().GetOpaqueId(), grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
}
case g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
// remove from group index
userIDPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", g.Grantee.GetGroupId().GetOpaqueId(), grantNode.SpaceID)
if err := os.Remove(userIDPath); err != nil {
return err
}
}
}

return fs.tp.Propagate(ctx, grantNode, 0)
}

func isShareGrant(ctx context.Context) bool {
return ctx.Value(utils.SpaceGrant) == nil
}

// UpdateGrant updates a grant on a resource
// TODO remove AddGrant or UpdateGrant grant from CS3 api, redundant? tracked in https://github.com/cs3org/cs3apis/issues/92
func (fs *Decomposedfs) UpdateGrant(ctx context.Context, ref *provider.Reference, g *provider.Grant) error {
Expand Down Expand Up @@ -319,7 +333,7 @@ func (fs *Decomposedfs) storeGrant(ctx context.Context, n *node.Node, g *provide
}

// update the indexes only after successfully setting the grant
err := fs.updateIndexes(ctx, g.GetGrantee().GetUserId().GetOpaqueId(), spaceType, n.ID)
err := fs.updateIndexes(ctx, g.GetGrantee(), spaceType, n.ID)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Option func(o *Options)
// Options defines the available options for this package.
type Options struct {

// the gateway address
GatewayAddr string `mapstructure:"gateway_addr"`

// the metadata backend to use, currently supports `xattr` or `ini`
MetadataBackend string `mapstructure:"metadata_backend"`

Expand Down Expand Up @@ -99,6 +102,8 @@ func New(m map[string]interface{}) (*Options, error) {
return nil, err
}

o.GatewayAddr = sharedconf.GetGatewaySVC(o.GatewayAddr)

if o.MetadataBackend == "" {
o.MetadataBackend = "xattrs"
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/utils/decomposedfs/spacepermissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package decomposedfs
import (
"context"

userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
v1beta11 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/utils"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -70,12 +72,12 @@ func (p Permissions) ListAllSpaces(ctx context.Context) bool {
}

// ListSpacesOfUser returns true when the user is allowed to list the spaces of the given user
func (p Permissions) ListSpacesOfUser(ctx context.Context, userid string) bool {
switch userid {
case userIDAny:
func (p Permissions) ListSpacesOfUser(ctx context.Context, userid *userv1beta1.UserId) bool {
switch {
case userid == nil:
// there is no filter
return true // TODO: is `true` actually correct here? Shouldn't we check for ListAllSpaces too?
case ctxpkg.ContextMustGetUser(ctx).GetId().GetOpaqueId():
case utils.UserIDEqual(ctxpkg.ContextMustGetUser(ctx).GetId(), userid):
return true
default:
return p.ListAllSpaces(ctx)
Expand Down
113 changes: 103 additions & 10 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/rgrpc/status"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
Expand All @@ -54,13 +55,13 @@ const (
spaceTypeShare = "share"
spaceTypeAny = "*"
spaceIDAny = "*"
userIDAny = "*"

quotaUnrestricted = 0
)

// CreateStorageSpace creates a storage space
func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (*provider.CreateStorageSpaceResponse, error) {
ctx = context.WithValue(ctx, utils.SpaceGrant, struct{}{})

// "everything is a resource" this is the unique ID for the Space resource.
spaceID := uuid.New().String()
Expand Down Expand Up @@ -147,7 +148,10 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
}

// Write index
err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
err = fs.updateIndexes(ctx, &provider.Grantee{
Type: provider.GranteeType_GRANTEE_TYPE_USER,
Id: &provider.Grantee_UserId{UserId: req.GetOwner().GetId()},
}, req.Type, root.ID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -218,7 +222,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
var (
spaceID = spaceIDAny
nodeID = spaceIDAny
requestedUserID = userIDAny
requestedUserID *userv1beta1.UserId
)

spaceTypes := map[string]struct{}{}
Expand All @@ -241,10 +245,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
case provider.ListStorageSpacesRequest_Filter_TYPE_USER:
// TODO: refactor this to GetUserId() in cs3
requestedUserID = filter[i].GetUser().GetOpaqueId()
requestedUserID = filter[i].GetUser()
case provider.ListStorageSpacesRequest_Filter_TYPE_OWNER:
// TODO: improve further by not evaluating shares
requestedUserID = filter[i].GetOwner().GetOpaqueId()
requestedUserID = filter[i].GetOwner()
}
}
if len(spaceTypes) == 0 {
Expand Down Expand Up @@ -292,8 +296,8 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide

matches := map[string]struct{}{}

if requestedUserID != userIDAny {
path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID, nodeID)
if requestedUserID != nil {
path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
Expand All @@ -305,9 +309,35 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}
matches[link] = struct{}{}
}

// get Groups for userid
user := ctxpkg.ContextMustGetUser(ctx)
// TODO the user from context may not have groups populated
if !utils.UserIDEqual(user.GetId(), requestedUserID) {
user, err = fs.UserIDToUserAndGroups(ctx, requestedUserID)
if err != nil {
return nil, err // TODO log and continue?
}
}

for _, group := range user.Groups {
path := filepath.Join(fs.o.Root, "indexes", "by-group-id", group, nodeID)
m, err := filepath.Glob(path)
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
matches[link] = struct{}{}
}
}

}

if requestedUserID == userIDAny {
if requestedUserID == nil {
for spaceType := range spaceTypes {
path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, nodeID)
m, err := filepath.Glob(path)
Expand Down Expand Up @@ -412,6 +442,31 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide

}

// UserIDToUserAndGroups converts a user ID to a user with groups
func (fs *Decomposedfs) UserIDToUserAndGroups(ctx context.Context, userid *userv1beta1.UserId) (*userv1beta1.User, error) {
user, err := fs.UserCache.Get(userid.GetOpaqueId())
if err == nil {
return user.(*userv1beta1.User), nil
}

gwConn, err := pool.GetGatewayServiceClient(fs.o.GatewayAddr)
if err != nil {
return nil, err
}
getUserResponse, err := gwConn.GetUser(ctx, &userv1beta1.GetUserRequest{
UserId: userid,
SkipFetchingUserGroups: false,
})
if err != nil {
return nil, err
}
if getUserResponse.Status.Code != v1beta11.Code_CODE_OK {
return nil, status.NewErrorFromCode(getUserResponse.Status.Code, "gateway")
}
_ = fs.UserCache.Set(userid.GetOpaqueId(), getUserResponse.GetUser())
return getUserResponse.GetUser(), nil
}

// MustCheckNodePermissions checks if permission checks are needed to be performed when user requests spaces
func (fs *Decomposedfs) MustCheckNodePermissions(ctx context.Context, unrestricted bool) bool {
// canListAllSpaces indicates if the user has the permission from the global user role
Expand Down Expand Up @@ -649,12 +704,26 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
return n.SetDTime(&dtime)
}

func (fs *Decomposedfs) updateIndexes(ctx context.Context, userID, spaceType, spaceID string) error {
func (fs *Decomposedfs) updateIndexes(ctx context.Context, grantee *provider.Grantee, spaceType, spaceID string) error {
err := fs.linkStorageSpaceType(ctx, spaceType, spaceID)
if err != nil {
return err
}
return fs.linkSpaceByUser(ctx, userID, spaceID)
if isShareGrant(ctx) {
// FIXME we should count the references for the by-type index currently removing the second share from the same
// space cannot determine if the by-type should be deletet, which is why we never delete them ...
return nil
}

// create space grant index
switch {
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER:
return fs.linkSpaceByUser(ctx, grantee.GetUserId().GetOpaqueId(), spaceID)
case grantee.Type == provider.GranteeType_GRANTEE_TYPE_GROUP:
return fs.linkSpaceByGroup(ctx, grantee.GetGroupId().GetOpaqueId(), spaceID)
default:
return errtypes.BadRequest("invalid grantee type: " + grantee.GetType().String())
}
}

func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID string) error {
Expand All @@ -681,6 +750,30 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str
return nil
}

func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID string) error {
if groupID == "" {
return nil
}
// create group index dir
// TODO: pathify groupid
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-group-id", groupID), 0700); err != nil {
return err
}

err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "indexes/by-group-id", groupID, spaceID))
if err != nil {
if isAlreadyExists(err) {
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("symlink already exists")
// FIXME: is it ok to wipe this err if the symlink already exists?
err = nil //nolint
} else {
// TODO how should we handle error cases here?
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("could not create symlink")
}
}
return nil
}

// TODO: implement linkSpaceByGroup

func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType string, spaceID string) error {
Expand Down