diff --git a/changelog/unreleased/jsoncs3-share-manager.md b/changelog/unreleased/jsoncs3-share-manager.md new file mode 100644 index 0000000000..b725babbe5 --- /dev/null +++ b/changelog/unreleased/jsoncs3-share-manager.md @@ -0,0 +1,6 @@ +Enhancement: Add new jsoncs3 share manager + +We've added a new jsoncs3 share manager which splits the json file per storage +space and caches data locally. + +https://github.com/cs3org/reva/pull/3148 diff --git a/go.mod b/go.mod index 8d6638dabd..aa3da95c40 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 + github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 github.com/dgraph-io/ristretto v0.1.0 github.com/emvi/iso-639-1 v1.0.1 diff --git a/go.sum b/go.sum index 4e5a200695..28dc9bdb01 100644 --- a/go.sum +++ b/go.sum @@ -170,8 +170,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJffz4pz0o1WuQxJ28+5x5JgaHD8= github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= -github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64 h1:cFnankJOCWndnOns4sKRG7yzH61ammK2Am6rEGWCK40= -github.com/cs3org/go-cs3apis v0.0.0-20220719130120-361e9f987d64/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d h1:toyZ7IsXlUdEPZ/IG8fg7hbM8HcLPY0bkX4FKBmgLVI= +github.com/cs3org/go-cs3apis v0.0.0-20220818202316-e92afdddac6d/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index db2c091cb3..6f4d1dc7f2 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -302,26 +302,33 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate if err != nil { return nil, err } + switch sRes.Status.Code { + case rpc.Code_CODE_OK, rpc.Code_CODE_NOT_FOUND: + // Just continue with a normal upload + default: + return &provider.InitiateFileUploadResponse{ + Status: sRes.Status, + }, nil + } metadata := map[string]string{} ifMatch := req.GetIfMatch() if ifMatch != "" { - switch sRes.Status.Code { - case rpc.Code_CODE_OK: - if sRes.Info.Etag != ifMatch { - return &provider.InitiateFileUploadResponse{ - Status: status.NewAborted(ctx, errors.New("etag mismatch"), "etag mismatch"), - }, nil - } - case rpc.Code_CODE_NOT_FOUND: - // Just continue with a normal upload - default: + if sRes.Info.Etag != ifMatch { return &provider.InitiateFileUploadResponse{ - Status: sRes.Status, + Status: status.NewFailedPrecondition(ctx, errors.New("etag mismatch"), "etag mismatch"), }, nil } metadata["if-match"] = ifMatch } + ifUnmodifiedSince := req.GetIfUnmodifiedSince() + if ifUnmodifiedSince != nil { + if utils.LaterTS(sRes.Info.Mtime, ifUnmodifiedSince) == sRes.Info.Mtime { + return &provider.InitiateFileUploadResponse{ + Status: status.NewFailedPrecondition(ctx, errors.New("resource has been modified"), "resource has been modified"), + }, nil + } + } ctx = ctxpkg.ContextSetLockID(ctx, req.LockId) diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go new file mode 100644 index 0000000000..0d93272f3a --- /dev/null +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -0,0 +1,699 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package jsoncs3 + +import ( + "context" + "sync" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "google.golang.org/genproto/protobuf/field_mask" + + userv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/share" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" + "github.com/cs3org/reva/v2/pkg/share/manager/registry" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" // nolint:staticcheck // we need the legacy package to convert V1 to V2 messages + "github.com/cs3org/reva/v2/pkg/utils" +) + +/* + The sharded json driver splits the json file per storage space. Similar to fileids shareids are prefixed with the spaceid for easier lookup. + In addition to the space json the share manager keeps lists for users and groups to cache their lists of created and received shares + and to hold the state of received shares. + + FAQ + Q: Why not split shares by user and have a list per user? + A: While shares are created by users, they are persisted as grants on a file. + If we persist shares by their creator/owner they would vanish if a user is deprovisioned: shares + in project spaces could not be managed collaboratively. + By splitting by space, we are in fact not only splitting by user, but more granular, per space. + + + File structure in the jsoncs3 space: + + /storages/{storageid}/{spaceid.json} // contains the share information of all shares in that space + /users/{userid}/created.json // points to the spaces the user created shares in, including the list of shares + /users/{userid}/received.json // holds the accepted/pending state and mount point of received shares for users + /groups/{groupid}/received.json // points to the spaces the group has received shares in including the list of shares + + Example: + ├── groups + │ └── group1 + │ └── received.json + ├── storages + │ └── storageid + │ └── spaceid.json + └── users + ├── admin + │ └── created.json + └── einstein + └── received.json + + Whenever a share is created, the share manager has to + 1. update the /storages/{storageid}/{spaceid}.json file, + 2. create /users/{userid}/created.json if it doesn't exist yet and add the space/share + 3. create /users/{userid}/received.json or /groups/{groupid}/received.json if it doesn exist yet and add the space/share + + When updating shares /storages/{storageid}/{spaceid}.json is updated accordingly. The mtime is used to invalidate in-memory caches: + - TODO the upload is tried with an if-unmodified-since header + - TODO when if fails, the {spaceid}.json file is downloaded, the changes are reapplied and the upload is retried with the new mtime + + When updating received shares the mountpoint and state are updated in /users/{userid}/received.json (for both user and group shares). + + When reading the list of received shares the /users/{userid}/received.json file and the /groups/{groupid}/received.json files are statted. + - if the mtime changed we download the file to update the local cache + + When reading the list of created shares the /users/{userid}/created.json file is statted + - if the mtime changed we download the file to update the local cache +*/ + +func init() { + registry.Register("jsoncs3", NewDefault) +} + +type config struct { + GatewayAddr string `mapstructure:"gateway_addr"` + ProviderAddr string `mapstructure:"provider_addr"` + ServiceUserID string `mapstructure:"service_user_id"` + ServiceUserIdp string `mapstructure:"service_user_idp"` + MachineAuthAPIKey string `mapstructure:"machine_auth_apikey"` +} + +// Manager implements a share manager using a cs3 storage backend with local caching +type Manager struct { + sync.RWMutex + + Cache providercache.Cache // holds all shares, sharded by provider id and space id + CreatedCache sharecache.Cache // holds the list of shares a user has created, sharded by user id + GroupReceivedCache sharecache.Cache // holds the list of shares a group has access to, sharded by group id + UserReceivedStates receivedsharecache.Cache // holds the state of shares a user has received, sharded by user id + + storage metadata.Storage + SpaceRoot *provider.ResourceId + + initialized bool +} + +// NewDefault returns a new manager instance with default dependencies +func NewDefault(m map[string]interface{}) (share.Manager, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error creating a new manager") + return nil, err + } + + s, err := metadata.NewCS3Storage(c.GatewayAddr, c.ProviderAddr, c.ServiceUserID, c.ServiceUserIdp, c.MachineAuthAPIKey) + if err != nil { + return nil, err + } + + return New(s) +} + +// New returns a new manager instance. +func New(s metadata.Storage) (*Manager, error) { + return &Manager{ + Cache: providercache.New(s), + CreatedCache: sharecache.New(s, "users", "created.json"), + UserReceivedStates: receivedsharecache.New(s), + GroupReceivedCache: sharecache.New(s, "groups", "received.json"), + storage: s, + }, nil +} + +func (m *Manager) initialize() error { + if m.initialized { + return nil + } + + m.Lock() + defer m.Unlock() + + if m.initialized { // check if initialization happened while grabbing the lock + return nil + } + + ctx := context.Background() + err := m.storage.Init(ctx, "jsoncs3-share-manager-metadata") + if err != nil { + return err + } + + err = m.storage.MakeDirIfNotExist(ctx, "storages") + if err != nil { + return err + } + err = m.storage.MakeDirIfNotExist(ctx, "users") + if err != nil { + return err + } + err = m.storage.MakeDirIfNotExist(ctx, "groups") + if err != nil { + return err + } + + m.initialized = true + return nil +} + +// Share creates a new share +func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + user := ctxpkg.ContextMustGetUser(ctx) + ts := utils.TSNow() + + // do not allow share to myself or the owner if share is for a user + // TODO: should this not already be caught at the gw level? + if g.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER && + (utils.UserEqual(g.Grantee.GetUserId(), user.Id) || utils.UserEqual(g.Grantee.GetUserId(), md.Owner)) { + return nil, errors.New("json: owner/creator and grantee are the same") + } + + // check if share already exists. + key := &collaboration.ShareKey{ + //Owner: md.Owner, owner not longer matters as it belongs to the space + ResourceId: md.Id, + Grantee: g.Grantee, + } + + m.Lock() + defer m.Unlock() + _, err := m.getByKey(ctx, key) + if err == nil { + // share already exists + return nil, errtypes.AlreadyExists(key.String()) + } + + shareID := shareid.Encode(md.GetId().GetStorageId(), md.GetId().GetSpaceId(), uuid.NewString()) + s := &collaboration.Share{ + Id: &collaboration.ShareId{ + OpaqueId: shareID, + }, + ResourceId: md.Id, + Permissions: g.Permissions, + Grantee: g.Grantee, + Owner: md.Owner, + Creator: user.Id, + Ctime: ts, + Mtime: ts, + } + + err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.Cache.Sync(ctx, md.Id.StorageId, md.Id.SpaceId); err != nil { + return nil, err + } + err = m.Cache.Add(ctx, md.Id.StorageId, md.Id.SpaceId, shareID, s) + // TODO try more often? + } + if err != nil { + return nil, err + } + + err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + return nil, err + } + err = m.CreatedCache.Add(ctx, s.GetCreator().GetOpaqueId(), shareID) + // TODO try more often? + } + if err != nil { + return nil, err + } + + spaceID := md.Id.StorageId + "^" + md.Id.SpaceId + // set flag for grantee to have access to share + switch g.Grantee.Type { + case provider.GranteeType_GRANTEE_TYPE_USER: + userid := g.Grantee.GetUserId().GetOpaqueId() + + rs := &collaboration.ReceivedShare{ + Share: s, + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.UserReceivedStates.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + return nil, err + } + err = m.UserReceivedStates.Add(ctx, userid, spaceID, rs) + // TODO try more often? + } + if err != nil { + return nil, err + } + case provider.GranteeType_GRANTEE_TYPE_GROUP: + groupid := g.Grantee.GetGroupId().GetOpaqueId() + err := m.GroupReceivedCache.Add(ctx, groupid, shareID) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.GroupReceivedCache.Sync(ctx, groupid); err != nil { + return nil, err + } + err = m.GroupReceivedCache.Add(ctx, groupid, shareID) + // TODO try more often? + } + if err != nil { + return nil, err + } + } + + return s, nil +} + +// getByID must be called in a lock-controlled block. +func (m *Manager) getByID(id *collaboration.ShareId) (*collaboration.Share, error) { + storageID, spaceID, _ := shareid.Decode(id.OpaqueId) + // sync cache, maybe our data is outdated + err := m.Cache.Sync(context.Background(), storageID, spaceID) + if err != nil { + return nil, err + } + + share := m.Cache.Get(storageID, spaceID, id.OpaqueId) + if share == nil { + return nil, errtypes.NotFound(id.String()) + } + return share, nil +} + +// getByKey must be called in a lock-controlled block. +func (m *Manager) getByKey(ctx context.Context, key *collaboration.ShareKey) (*collaboration.Share, error) { + err := m.Cache.Sync(ctx, key.ResourceId.StorageId, key.ResourceId.SpaceId) + if err != nil { + return nil, err + } + + spaceShares := m.Cache.ListSpace(key.ResourceId.StorageId, key.ResourceId.SpaceId) + for _, share := range spaceShares.Shares { + if utils.GranteeEqual(key.Grantee, share.Grantee) && utils.ResourceIDEqual(share.ResourceId, key.ResourceId) { + return share, nil + } + } + return nil, errtypes.NotFound(key.String()) +} + +// get must be called in a lock-controlled block. +func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s *collaboration.Share, err error) { + switch { + case ref.GetId() != nil: + s, err = m.getByID(ref.GetId()) + case ref.GetKey() != nil: + s, err = m.getByKey(ctx, ref.GetKey()) + default: + err = errtypes.NotFound(ref.String()) + } + return +} + +// GetShare gets the information for a share by the given ref. +func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + m.Lock() + defer m.Unlock() + s, err := m.get(ctx, ref) + if err != nil { + return nil, err + } + // check if we are the creator or the grantee + // TODO allow manager to get shares in a space created by other users + user := ctxpkg.ContextMustGetUser(ctx) + if share.IsCreatedByUser(s, user) || share.IsGrantedToUser(s, user) { + return s, nil + } + // we return not found to not disclose information + return nil, errtypes.NotFound(ref.String()) +} + +// Unshare deletes a share +func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference) error { + if err := m.initialize(); err != nil { + return err + } + + m.Lock() + defer m.Unlock() + user := ctxpkg.ContextMustGetUser(ctx) + + s, err := m.get(ctx, ref) + if err != nil { + return err + } + // TODO allow manager to unshare shares in a space created by other users + if !share.IsCreatedByUser(s, user) { + // TODO why not permission denied? + return errtypes.NotFound(ref.String()) + } + + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) + err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.Cache.Sync(ctx, storageID, spaceID); err != nil { + return err + } + err = m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + // TODO try more often? + } + if err != nil { + return err + } + + // remove from created cache + err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + if err := m.CreatedCache.Sync(ctx, s.GetCreator().GetOpaqueId()); err != nil { + return err + } + err = m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) + // TODO try more often? + } + if err != nil { + return err + } + + // TODO remove from grantee cache + + return nil +} + +// UpdateShare updates the mode of the given share. +func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareReference, p *collaboration.SharePermissions) (*collaboration.Share, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + m.Lock() + defer m.Unlock() + s, err := m.get(ctx, ref) + if err != nil { + return nil, err + } + + user := ctxpkg.ContextMustGetUser(ctx) + if !share.IsCreatedByUser(s, user) { + return nil, errtypes.NotFound(ref.String()) + } + + s.Permissions = p + s.Mtime = utils.TSNow() + + // Update provider cache + err = m.Cache.Persist(ctx, s.ResourceId.StorageId, s.ResourceId.SpaceId) + // when persisting fails + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + // reupdate + s, err = m.get(ctx, ref) // does an implicit sync + if err != nil { + return nil, err + } + s.Permissions = p + s.Mtime = utils.TSNow() + + // persist again + err = m.Cache.Persist(ctx, s.ResourceId.StorageId, s.ResourceId.SpaceId) + // TODO try more often? + } + if err != nil { + return nil, err + } + + return s, nil +} + +// ListShares returns the shares created by the user +func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + m.Lock() + defer m.Unlock() + + user := ctxpkg.ContextMustGetUser(ctx) + grouped := share.GroupFiltersByType(filters) + + if len(grouped[collaboration.Filter_TYPE_RESOURCE_ID]) > 0 { + return m.listSharesByIDs(ctx, user, filters) + } + + return m.listCreatedShares(ctx, user, filters) +} + +func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + var ss []*collaboration.Share + + grouped := share.GroupFiltersByType(filters) + providerSpaces := map[string]map[string]bool{} + for _, f := range grouped[collaboration.Filter_TYPE_RESOURCE_ID] { + storageID := f.GetResourceId().GetStorageId() + spaceID := f.GetResourceId().GetSpaceId() + if providerSpaces[storageID] == nil { + providerSpaces[storageID] = map[string]bool{} + } + providerSpaces[storageID][spaceID] = true + } + + for providerID, spaces := range providerSpaces { + for spaceID := range spaces { + err := m.Cache.Sync(ctx, providerID, spaceID) + if err != nil { + return nil, err + } + + shares := m.Cache.ListSpace(providerID, spaceID) + for _, s := range shares.Shares { + if share.MatchesFilters(s, filters) { + ss = append(ss, s) + } + } + } + } + return ss, nil +} + +func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + var ss []*collaboration.Share + + if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil { + return ss, err + } + for ssid, spaceShareIDs := range m.CreatedCache.List(user.Id.OpaqueId) { + storageID, spaceID, _ := shareid.Decode(ssid) + err := m.Cache.Sync(ctx, storageID, spaceID) + if err != nil { + continue + } + spaceShares := m.Cache.ListSpace(storageID, spaceID) + for shareid := range spaceShareIDs.IDs { + s := spaceShares.Shares[shareid] + if s == nil { + continue + } + if utils.UserEqual(user.GetId(), s.GetCreator()) { + if share.MatchesFilters(s, filters) { + ss = append(ss, s) + } + } + } + } + + return ss, nil +} + +// ListReceivedShares returns the list of shares the user has access to. +func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + m.Lock() + defer m.Unlock() + + var rss []*collaboration.ReceivedShare + user := ctxpkg.ContextMustGetUser(ctx) + + ssids := map[string]*receivedsharecache.Space{} + + // first collect all spaceids the user has access to as a group member + for _, group := range user.Groups { + if err := m.GroupReceivedCache.Sync(ctx, group); err != nil { + continue // ignore error, cache will be updated on next read + } + for ssid, spaceShareIDs := range m.GroupReceivedCache.List(group) { + // add a pending entry, the state will be updated + // when reading the received shares below if they have already been accepted or denied + rs := receivedsharecache.Space{ + Mtime: spaceShareIDs.Mtime, + States: make(map[string]*receivedsharecache.State, len(spaceShareIDs.IDs)), + } + + for shareid := range spaceShareIDs.IDs { + rs.States[shareid] = &receivedsharecache.State{ + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + } + ssids[ssid] = &rs + } + } + + // add all spaces the user has receved shares for, this includes mount points and share state for groups + _ = m.UserReceivedStates.Sync(ctx, user.Id.OpaqueId) // ignore error, cache will be updated on next read + + if m.UserReceivedStates.ReceivedSpaces[user.Id.OpaqueId] != nil { + for ssid, rspace := range m.UserReceivedStates.ReceivedSpaces[user.Id.OpaqueId].Spaces { + if rs, ok := ssids[ssid]; ok { + for shareid, state := range rspace.States { + // overwrite state + rs.States[shareid] = state + } + } else { + ssids[ssid] = rspace + } + } + } + + for ssid, rspace := range ssids { + storageID, spaceID, _ := shareid.Decode(ssid) + err := m.Cache.Sync(ctx, storageID, spaceID) + if err != nil { + continue + } + for shareID, state := range rspace.States { + s := m.Cache.Get(storageID, spaceID, shareID) + if s == nil { + continue + } + + if share.IsGrantedToUser(s, user) { + if share.MatchesFilters(s, filters) { + rs := &collaboration.ReceivedShare{ + Share: s, + State: state.State, + MountPoint: state.MountPoint, + } + rss = append(rss, rs) + } + } + } + } + + return rss, nil +} + +// convert must be called in a lock-controlled block. +func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.Share) *collaboration.ReceivedShare { + rs := &collaboration.ReceivedShare{ + Share: s, + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) + + _ = m.UserReceivedStates.Sync(ctx, userID) // ignore error, cache will be updated on next read + state := m.UserReceivedStates.Get(userID, storageID+"^"+spaceID, s.Id.GetOpaqueId()) + if state != nil { + rs.State = state.State + rs.MountPoint = state.MountPoint + } + return rs +} + +// GetReceivedShare returns the information for a received share. +func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + return m.getReceived(ctx, ref) +} + +func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { + m.Lock() + defer m.Unlock() + s, err := m.get(ctx, ref) + if err != nil { + return nil, err + } + user := ctxpkg.ContextMustGetUser(ctx) + if !share.IsGrantedToUser(s, user) { + return nil, errtypes.NotFound(ref.String()) + } + return m.convert(ctx, user.Id.GetOpaqueId(), s), nil +} + +// UpdateReceivedShare updates the received share with share state. +func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) { + if err := m.initialize(); err != nil { + return nil, err + } + + rs, err := m.getReceived(ctx, &collaboration.ShareReference{Spec: &collaboration.ShareReference_Id{Id: receivedShare.Share.Id}}) + if err != nil { + return nil, err + } + + m.Lock() + defer m.Unlock() + + for i := range fieldMask.Paths { + switch fieldMask.Paths[i] { + case "state": + rs.State = receivedShare.State + case "mount_point": + rs.MountPoint = receivedShare.MountPoint + default: + return nil, errtypes.NotSupported("updating " + fieldMask.Paths[i] + " is not supported") + } + } + + // write back + + userID := ctxpkg.ContextMustGetUser(ctx) + + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs) + if _, ok := err.(errtypes.IsPreconditionFailed); ok { + // when persisting fails, download, readd and persist again + if err := m.UserReceivedStates.Sync(ctx, userID.GetId().GetOpaqueId()); err != nil { + return nil, err + } + err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+"^"+rs.Share.ResourceId.SpaceId, rs) + // TODO try more often? + } + if err != nil { + return nil, err + } + + return rs, nil +} diff --git a/pkg/share/manager/jsoncs3/jsoncs3_suite_test.go b/pkg/share/manager/jsoncs3/jsoncs3_suite_test.go new file mode 100644 index 0000000000..d51fd31569 --- /dev/null +++ b/pkg/share/manager/jsoncs3/jsoncs3_suite_test.go @@ -0,0 +1,31 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package jsoncs3_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestJson(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Json Suite") +} diff --git a/pkg/share/manager/jsoncs3/jsoncs3_test.go b/pkg/share/manager/jsoncs3/jsoncs3_test.go new file mode 100644 index 0000000000..ba65b96037 --- /dev/null +++ b/pkg/share/manager/jsoncs3/jsoncs3_test.go @@ -0,0 +1,945 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package jsoncs3_test + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + "time" + + groupv1beta1 "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/internal/http/services/owncloud/ocs/conversions" + ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + "google.golang.org/protobuf/types/known/fieldmaskpb" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Jsoncs3", func() { + var ( + user1 = &userpb.User{ + Id: &userpb.UserId{ + Idp: "https://localhost:9200", + OpaqueId: "admin", + }, + } + user2 = &userpb.User{ + Id: &userpb.UserId{ + Idp: "https://localhost:9200", + OpaqueId: "einstein", + }, + Groups: []string{"group1"}, + } + + sharedResource = &providerv1beta1.ResourceInfo{ + Id: &providerv1beta1.ResourceId{ + StorageId: "storageid", + SpaceId: "spaceid", + OpaqueId: "opaqueid", + }, + } + + sharedResource2 = &providerv1beta1.ResourceInfo{ + Id: &providerv1beta1.ResourceId{ + StorageId: "storageid2", + SpaceId: "spaceid2", + OpaqueId: "opaqueid2", + }, + } + + grantee = &userpb.User{ + Id: user2.Id, + Groups: []string{"users"}, + } + grant *collaboration.ShareGrant + + groupGrant = &collaboration.ShareGrant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_GROUP, + Id: &provider.Grantee_GroupId{GroupId: &groupv1beta1.GroupId{ + OpaqueId: "group1", + }}, + }, + Permissions: &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: false, + }, + }, + } + storage metadata.Storage + tmpdir string + m *jsoncs3.Manager + + ctx = ctxpkg.ContextSetUser(context.Background(), user1) + granteeCtx = ctxpkg.ContextSetUser(context.Background(), user2) + otherCtx = ctxpkg.ContextSetUser(context.Background(), &userpb.User{Id: &userpb.UserId{OpaqueId: "otheruser"}}) + + // helper functions + shareBykey = func(key *collaboration.ShareKey) *collaboration.Share { + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: key, + }, + }) + ExpectWithOffset(1, err).ToNot(HaveOccurred()) + ExpectWithOffset(1, s).ToNot(BeNil()) + return s + } + ) + + BeforeEach(func() { + grant = &collaboration.ShareGrant{ + Grantee: &provider.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &provider.Grantee_UserId{UserId: grantee.GetId()}, + }, + Permissions: &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: false, + }, + }, + } + + var err error + tmpdir, err = ioutil.TempDir("", "jsoncs3-test") + Expect(err).ToNot(HaveOccurred()) + + err = os.MkdirAll(tmpdir, 0755) + Expect(err).ToNot(HaveOccurred()) + + storage, err = metadata.NewDiskStorage(tmpdir) + Expect(err).ToNot(HaveOccurred()) + + m, err = jsoncs3.New(storage) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + if tmpdir != "" { + os.RemoveAll(tmpdir) + } + }) + + Describe("Share", func() { + It("fails if the share already exists", func() { + _, err := m.Share(ctx, sharedResource, grant) + Expect(err).ToNot(HaveOccurred()) + _, err = m.Share(ctx, sharedResource, grant) + Expect(err).To(HaveOccurred()) + }) + + It("creates a user share", func() { + share, err := m.Share(ctx, sharedResource, grant) + Expect(err).ToNot(HaveOccurred()) + + Expect(share).ToNot(BeNil()) + Expect(share.ResourceId).To(Equal(sharedResource.Id)) + }) + + It("creates a group share", func() { + share, err := m.Share(ctx, sharedResource, groupGrant) + Expect(err).ToNot(HaveOccurred()) + + Expect(share).ToNot(BeNil()) + Expect(share.ResourceId).To(Equal(sharedResource.Id)) + }) + + It("persists the share", func() { + _, err := m.Share(ctx, sharedResource, grant) + Expect(err).ToNot(HaveOccurred()) + + s := shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s).ToNot(BeNil()) + + m, err = jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + s = shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s).ToNot(BeNil()) + }) + }) + + Context("with a space manager", func() { + var ( + share *collaboration.Share + + manager = &userpb.User{ + Id: &userpb.UserId{ + Idp: "https://localhost:9200", + OpaqueId: "spacemanager", + }, + } + managerCtx context.Context + ) + + BeforeEach(func() { + managerCtx = ctxpkg.ContextSetUser(context.Background(), manager) + + var err error + share, err = m.Share(ctx, sharedResource, grant) + Expect(err).ToNot(HaveOccurred()) + + _, err = m.Share(ctx, &providerv1beta1.ResourceInfo{ + Id: &providerv1beta1.ResourceId{ + StorageId: "storageid", + SpaceId: "spaceid", + OpaqueId: "spaceid", + }, + }, &collaboration.ShareGrant{ + Grantee: &providerv1beta1.Grantee{ + Type: provider.GranteeType_GRANTEE_TYPE_USER, + Id: &providerv1beta1.Grantee_UserId{UserId: manager.Id}, + }, + Permissions: &collaboration.SharePermissions{ + Permissions: conversions.NewManagerRole().CS3ResourcePermissions(), + }, + }) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("ListShares", func() { + It("returns the share requested by id even though it's not owned or created by the manager", func() { + shares, err := m.ListShares(managerCtx, []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: sharedResource.Id, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(shares).To(HaveLen(1)) + Expect(shares[0].Id).To(Equal(share.Id)) + }) + }) + }) + + Context("with an existing share", func() { + var ( + share *collaboration.Share + shareRef *collaboration.ShareReference + ) + + BeforeEach(func() { + var err error + share, err = m.Share(ctx, sharedResource, grant) + Expect(err).ToNot(HaveOccurred()) + + shareRef = &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + } + }) + + Describe("GetShare", func() { + It("handles unknown ids", func() { + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: "unknown-id", + }, + }, + }) + Expect(s).To(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("handles unknown keys", func() { + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: &providerv1beta1.ResourceId{ + OpaqueId: "unknown", + }, + Grantee: grant.Grantee, + }, + }, + }) + Expect(s).To(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("considers the resource id part of the key", func() { + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: &providerv1beta1.ResourceId{ + StorageId: "storageid", + SpaceId: "spaceid", + OpaqueId: "unknown", + }, + Grantee: grant.Grantee, + }, + }, + }) + Expect(s).To(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("retrieves an existing share by id", func() { + s, err := m.GetShare(ctx, shareRef) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + Expect(share.ResourceId).To(Equal(sharedResource.Id)) + }) + + It("retrieves an existing share by key", func() { + s := shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.ResourceId).To(Equal(sharedResource.Id)) + Expect(s.Id.OpaqueId).To(Equal(share.Id.OpaqueId)) + }) + + It("reloads the provider cache when it is outdated", func() { + s, err := m.GetShare(ctx, shareRef) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + Expect(s.Permissions.Permissions.InitiateFileUpload).To(BeFalse()) + + // Change providercache on disk + cache := m.Cache.Providers["storageid"].Spaces["spaceid"] + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = true + bytes, err := json.Marshal(cache) + Expect(err).ToNot(HaveOccurred()) + Expect(storage.SimpleUpload(context.Background(), "storages/storageid/spaceid.json", bytes)).To(Succeed()) + Expect(err).ToNot(HaveOccurred()) + + // Reset providercache in memory + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = false + + // Set local cache mtime to something later then on disk + m.Cache.Providers["storageid"].Spaces["spaceid"].Mtime = time.Now().Add(time.Hour) + s, err = m.GetShare(ctx, shareRef) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + Expect(s.Permissions.Permissions.InitiateFileUpload).To(BeFalse()) + + // Set local cache mtime to something earlier then on disk + m.Cache.Providers["storageid"].Spaces["spaceid"].Mtime = time.Now().Add(-time.Hour) + s, err = m.GetShare(ctx, shareRef) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + Expect(s.Permissions.Permissions.InitiateFileUpload).To(BeTrue()) + }) + + It("loads the cache when it doesn't have an entry", func() { + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + s, err := m.GetShare(ctx, shareRef) + Expect(err).ToNot(HaveOccurred()) + Expect(s).ToNot(BeNil()) + }) + + It("does not return other users' shares", func() { + s, err := m.GetShare(otherCtx, shareRef) + Expect(err).To(HaveOccurred()) + Expect(s).To(BeNil()) + }) + }) + + Describe("UnShare", func() { + It("does not remove shares of other users", func() { + err := m.Unshare(otherCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }) + + Expect(err).To(HaveOccurred()) + }) + + It("removes an existing share", func() { + err := m.Unshare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }, + }, + }) + Expect(err).To(HaveOccurred()) + Expect(s).To(BeNil()) + }) + + It("removes an existing share from the storage", func() { + err := m.Unshare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + m, err = jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + s, err := m.GetShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Key{ + Key: &collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }, + }, + }) + Expect(err).To(HaveOccurred()) + Expect(s).To(BeNil()) + }) + }) + + Describe("UpdateShare", func() { + It("does not update shares of other users", func() { + _, err := m.UpdateShare(otherCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }, &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: true, + }, + }) + Expect(err).To(HaveOccurred()) + }) + + It("updates an existing share", func() { + s := shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.GetPermissions().GetPermissions().InitiateFileUpload).To(BeFalse()) + + // enhance privileges + us, err := m.UpdateShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }, &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: true, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(us).ToNot(BeNil()) + Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) + + s = shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) + + // reduce privileges + us, err = m.UpdateShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }, &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: false, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(us).ToNot(BeNil()) + Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeFalse()) + + s = shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.GetPermissions().GetPermissions().InitiateFileUpload).To(BeFalse()) + }) + + It("persists the change", func() { + s := shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.GetPermissions().GetPermissions().InitiateFileUpload).To(BeFalse()) + + // enhance privileges + us, err := m.UpdateShare(ctx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: &collaboration.ShareId{ + OpaqueId: share.Id.OpaqueId, + }, + }, + }, &collaboration.SharePermissions{ + Permissions: &providerv1beta1.ResourcePermissions{ + InitiateFileUpload: true, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(us).ToNot(BeNil()) + Expect(us.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) + + m, err = jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + s = shareBykey(&collaboration.ShareKey{ + ResourceId: sharedResource.Id, + Grantee: grant.Grantee, + }) + Expect(s.GetPermissions().GetPermissions().InitiateFileUpload).To(BeTrue()) + }) + }) + + Describe("ListShares", func() { + It("lists an existing share", func() { + shares, err := m.ListShares(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(shares).To(HaveLen(1)) + + Expect(shares[0].Id).To(Equal(share.Id)) + }) + + It("syncronizes the provider cache before listing", func() { + shares, err := m.ListShares(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(len(shares)).To(Equal(1)) + Expect(shares[0].Id.OpaqueId).To(Equal(share.Id.OpaqueId)) + Expect(shares[0].Permissions.Permissions.InitiateFileUpload).To(BeFalse()) + + // Change providercache on disk + cache := m.Cache.Providers["storageid"].Spaces["spaceid"] + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = true + bytes, err := json.Marshal(cache) + Expect(err).ToNot(HaveOccurred()) + Expect(storage.SimpleUpload(context.Background(), "storages/storageid/spaceid.json", bytes)).To(Succeed()) + Expect(err).ToNot(HaveOccurred()) + + // Reset providercache in memory + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = false + + m.Cache.Providers["storageid"].Spaces["spaceid"].Mtime = time.Time{} // trigger reload + shares, err = m.ListShares(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(len(shares)).To(Equal(1)) + Expect(shares[0].Id.OpaqueId).To(Equal(share.Id.OpaqueId)) + Expect(shares[0].Permissions.Permissions.InitiateFileUpload).To(BeTrue()) + }) + + It("syncronizes the share cache before listing", func() { + shares, err := m.ListShares(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(len(shares)).To(Equal(1)) + + // Add a second cache to the provider cache so it can be referenced + Expect(m.Cache.Add(ctx, "storageid", "spaceid", "storageid^spaceid°secondshare", &collaboration.Share{ + Creator: user1.Id, + })).To(Succeed()) + + cache := sharecache.UserShareCache{ + Mtime: time.Now(), + UserShares: map[string]*sharecache.SpaceShareIDs{ + "storageid^spaceid": { + Mtime: time.Now(), + IDs: map[string]struct{}{ + shares[0].Id.OpaqueId: {}, + "storageid^spaceid°secondshare": {}, + }, + }, + }, + } + bytes, err := json.Marshal(cache) + Expect(err).ToNot(HaveOccurred()) + err = os.WriteFile(filepath.Join(tmpdir, "users/admin/created.json"), bytes, 0x755) + Expect(err).ToNot(HaveOccurred()) + + m.CreatedCache.UserShares["admin"].Mtime = time.Time{} // trigger reload + shares, err = m.ListShares(ctx, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(len(shares)).To(Equal(2)) + }) + + It("filters by resource id", func() { + shares, err := m.ListShares(ctx, []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: &providerv1beta1.ResourceId{ + StorageId: "storageid", + SpaceId: "spaceid", + OpaqueId: "somethingelse", + }, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(shares).To(HaveLen(0)) + }) + }) + + Describe("ListReceivedShares", func() { + It("lists the received shares", func() { + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + Expect(received[0].Share.ResourceId).To(Equal(sharedResource.Id)) + Expect(received[0].State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + }) + + It("syncronizes the provider cache before listing", func() { + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + Expect(received[0].Share.Permissions.Permissions.InitiateFileUpload).To(BeFalse()) + + // Change providercache on disk + cache := m.Cache.Providers["storageid"].Spaces["spaceid"] + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = true + bytes, err := json.Marshal(cache) + Expect(err).ToNot(HaveOccurred()) + Expect(storage.SimpleUpload(context.Background(), "storages/storageid/spaceid.json", bytes)).To(Succeed()) + Expect(err).ToNot(HaveOccurred()) + + // Reset providercache in memory + cache.Shares[share.Id.OpaqueId].Permissions.Permissions.InitiateFileUpload = false + + m.Cache.Providers["storageid"].Spaces["spaceid"].Mtime = time.Time{} // trigger reload + received, err = m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + Expect(received[0].Share.Permissions.Permissions.InitiateFileUpload).To(BeTrue()) + }) + + It("syncronizes the user received cache before listing", func() { + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + }) + + It("filters by resource id", func() { + share2, err := m.Share(ctx, sharedResource2, grant) + Expect(err).ToNot(HaveOccurred()) + + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(2)) + + received, err = m.ListReceivedShares(granteeCtx, []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: sharedResource.Id, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + Expect(received[0].Share.ResourceId).To(Equal(sharedResource.Id)) + Expect(received[0].State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + Expect(received[0].Share.Id).To(Equal(share.Id)) + + received, err = m.ListReceivedShares(granteeCtx, []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_RESOURCE_ID, + Term: &collaboration.Filter_ResourceId{ + ResourceId: sharedResource2.Id, + }, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(1)) + Expect(received[0].Share.ResourceId).To(Equal(sharedResource2.Id)) + Expect(received[0].State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + Expect(received[0].Share.Id).To(Equal(share2.Id)) + }) + + Context("with a group share", func() { + var ( + gshare *collaboration.Share + ) + + BeforeEach(func() { + var err error + gshare, err = m.Share(ctx, sharedResource, groupGrant) + Expect(err).ToNot(HaveOccurred()) + }) + + It("lists the group share", func() { + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(2)) + ids := []string{} + for _, s := range received { + ids = append(ids, s.Share.Id.OpaqueId) + } + Expect(ids).To(ConsistOf(share.Id.OpaqueId, gshare.Id.OpaqueId)) + }) + + It("syncronizes the group received cache before listing", func() { + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(2)) + ids := []string{} + for _, s := range received { + ids = append(ids, s.Share.Id.OpaqueId) + } + Expect(ids).To(ConsistOf(share.Id.OpaqueId, gshare.Id.OpaqueId)) + }) + + It("merges the user state with the group share", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + rs.State = collaboration.ShareState_SHARE_STATE_ACCEPTED + _, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"state"}}) + Expect(err).ToNot(HaveOccurred()) + + received, err := m.ListReceivedShares(granteeCtx, []*collaboration.Filter{}) + Expect(err).ToNot(HaveOccurred()) + Expect(len(received)).To(Equal(2)) + }) + }) + }) + + Describe("GetReceivedShare", func() { + It("gets the state", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + }) + + It("syncs the cache", func() { + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + }) + + Context("with a group share", func() { + var ( + gshare *collaboration.Share + ) + + BeforeEach(func() { + var err error + gshare, err = m.Share(ctx, sharedResource, groupGrant) + Expect(err).ToNot(HaveOccurred()) + }) + + It("gets the group share", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs).ToNot(BeNil()) + }) + + It("syncs the cache", func() { + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs).ToNot(BeNil()) + }) + }) + }) + + Describe("UpdateReceivedShare", func() { + It("updates the state", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + + rs.State = collaboration.ShareState_SHARE_STATE_ACCEPTED + rs, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"state"}}) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + + rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + }) + + It("updates the mountpoint", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.MountPoint).To(BeNil()) + + rs.MountPoint = &providerv1beta1.Reference{ + Path: "newMP", + } + rs, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"mount_point"}}) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.MountPoint.Path).To(Equal("newMP")) + + rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.MountPoint.Path).To(Equal("newMP")) + }) + + It("handles invalid field masks", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: share.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + + _, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"invalid"}}) + Expect(err).To(HaveOccurred()) + }) + + Context("with a group share", func() { + var ( + gshare *collaboration.Share + ) + + BeforeEach(func() { + var err error + gshare, err = m.Share(ctx, sharedResource, groupGrant) + Expect(err).ToNot(HaveOccurred()) + }) + + It("updates the received group share", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + + rs.State = collaboration.ShareState_SHARE_STATE_ACCEPTED + rs, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"state"}}) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + + rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + }) + + It("persists the change", func() { + rs, err := m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_PENDING)) + + rs.State = collaboration.ShareState_SHARE_STATE_ACCEPTED + rs, err = m.UpdateReceivedShare(granteeCtx, rs, &fieldmaskpb.FieldMask{Paths: []string{"state"}}) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + + m, err := jsoncs3.New(storage) // Reset in-memory cache + Expect(err).ToNot(HaveOccurred()) + + rs, err = m.GetReceivedShare(granteeCtx, &collaboration.ShareReference{ + Spec: &collaboration.ShareReference_Id{ + Id: gshare.Id, + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(rs.State).To(Equal(collaboration.ShareState_SHARE_STATE_ACCEPTED)) + }) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go new file mode 100644 index 0000000000..83210d270b --- /dev/null +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -0,0 +1,234 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package providercache + +import ( + "context" + "encoding/json" + "path" + "path/filepath" + "time" + + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + "github.com/cs3org/reva/v2/pkg/utils" +) + +// Cache holds share information structured by provider and space +type Cache struct { + Providers map[string]*Spaces + + storage metadata.Storage +} + +// Spaces holds the share information for provider +type Spaces struct { + Spaces map[string]*Shares +} + +// Shares holds the share information of one space +type Shares struct { + Shares map[string]*collaboration.Share + Mtime time.Time +} + +// UnmarshalJSON overrides the default unmarshaling +// Shares are tricky to unmarshal because they contain an interface (Grantee) which makes the json Unmarshal bail out +// To work around that problem we unmarshal into json.RawMessage in a first step and then try to manually unmarshal +// into the specific types in a second step. +func (s *Shares) UnmarshalJSON(data []byte) error { + tmp := struct { + Shares map[string]json.RawMessage + Mtime time.Time + }{} + + err := json.Unmarshal(data, &tmp) + if err != nil { + return err + } + + s.Mtime = tmp.Mtime + s.Shares = make(map[string]*collaboration.Share, len(tmp.Shares)) + for id, genericShare := range tmp.Shares { + userShare := &collaboration.Share{ + Grantee: &provider.Grantee{Id: &provider.Grantee_UserId{}}, + } + err = json.Unmarshal(genericShare, userShare) // is this a user share? + if err == nil && userShare.Grantee.Type == provider.GranteeType_GRANTEE_TYPE_USER { + s.Shares[id] = userShare + continue + } + + groupShare := &collaboration.Share{ + Grantee: &provider.Grantee{Id: &provider.Grantee_GroupId{}}, + } + err = json.Unmarshal(genericShare, groupShare) // try to unmarshal to a group share if the user share unmarshalling failed + if err != nil { + return err + } + s.Shares[id] = groupShare + } + + return nil +} + +// New returns a new Cache instance +func New(s metadata.Storage) Cache { + return Cache{ + Providers: map[string]*Spaces{}, + storage: s, + } +} + +// Add adds a share to the cache +func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error { + c.initializeIfNeeded(storageID, spaceID) + c.Providers[storageID].Spaces[spaceID].Shares[shareID] = share + + return c.Persist(ctx, storageID, spaceID) +} + +// Remove removes a share from the cache +func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error { + if c.Providers[storageID] == nil || + c.Providers[storageID].Spaces[spaceID] == nil { + return nil + } + delete(c.Providers[storageID].Spaces[spaceID].Shares, shareID) + + return c.Persist(ctx, storageID, spaceID) +} + +// Get returns one entry from the cache +func (c *Cache) Get(storageID, spaceID, shareID string) *collaboration.Share { + if c.Providers[storageID] == nil || + c.Providers[storageID].Spaces[spaceID] == nil { + return nil + } + return c.Providers[storageID].Spaces[spaceID].Shares[shareID] +} + +// ListSpace returns the list of shares in a given space +func (c *Cache) ListSpace(storageID, spaceID string) *Shares { + if c.Providers[storageID] == nil { + return &Shares{} + } + return c.Providers[storageID].Spaces[spaceID] +} + +// PersistWithTime persists the data of one space if it has not been modified since the given mtime +func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error { + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { + return nil + } + + oldMtime := c.Providers[storageID].Spaces[spaceID].Mtime + c.Providers[storageID].Spaces[spaceID].Mtime = mtime + + // FIXME there is a race when between this time now and the below Uploed another process also updates the file -> we need a lock + createdBytes, err := json.Marshal(c.Providers[storageID].Spaces[spaceID]) + if err != nil { + c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime + return err + } + jsonPath := spaceJSONPath(storageID, spaceID) + if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil { + c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime + return err + } + + if err = c.storage.Upload(ctx, metadata.UploadRequest{ + Path: jsonPath, + Content: createdBytes, + IfUnmodifiedSince: c.Providers[storageID].Spaces[spaceID].Mtime, + }); err != nil { + c.Providers[storageID].Spaces[spaceID].Mtime = oldMtime + return err + } + return nil +} + +// Persist persists the data of one space +func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error { + return c.PersistWithTime(ctx, storageID, spaceID, time.Now()) +} + +// Sync updates the in-memory data with the data from the storage if it is outdated +func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { + log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger() + log.Debug().Msg("Syncing provider cache...") + + var mtime time.Time + if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil { + mtime = c.Providers[storageID].Spaces[spaceID].Mtime + // - y: set If-Modified-Since header to only download if it changed + } else { + mtime = time.Time{} // Set zero time so that data from storage always takes precedence + } + + jsonPath := spaceJSONPath(storageID, spaceID) + info, err := c.storage.Stat(ctx, jsonPath) + if err != nil { + if _, ok := err.(errtypes.NotFound); ok { + return nil // Nothing to sync against + } + log.Error().Err(err).Msg("Failed to stat the provider cache") + return err + } + // check mtime of /users/{userid}/created.json + if utils.TSToTime(info.Mtime).After(mtime) { + log.Debug().Msg("Updating provider cache...") + // - update cached list of created shares for the user in memory if changed + createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) + if err != nil { + log.Error().Err(err).Msg("Failed to download the provider cache") + return err + } + newShares := &Shares{} + err = json.Unmarshal(createdBlob, newShares) + if err != nil { + log.Error().Err(err).Msg("Failed to unmarshal the provider cache") + return err + } + c.initializeIfNeeded(storageID, spaceID) + c.Providers[storageID].Spaces[spaceID] = newShares + } + log.Debug().Msg("Provider cache is up to date") + return nil +} + +func (c *Cache) initializeIfNeeded(storageID, spaceID string) { + if c.Providers[storageID] == nil { + c.Providers[storageID] = &Spaces{ + Spaces: map[string]*Shares{}, + } + } + if c.Providers[storageID].Spaces[spaceID] == nil { + c.Providers[storageID].Spaces[spaceID] = &Shares{ + Shares: map[string]*collaboration.Share{}, + } + } +} + +func spaceJSONPath(storageID, spaceID string) string { + return filepath.Join("/storages", storageID, spaceID+".json") +} diff --git a/pkg/share/manager/jsoncs3/providercache/providercache_suite_test.go b/pkg/share/manager/jsoncs3/providercache/providercache_suite_test.go new file mode 100644 index 0000000000..eb4a39b90e --- /dev/null +++ b/pkg/share/manager/jsoncs3/providercache/providercache_suite_test.go @@ -0,0 +1,31 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package providercache_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestProvidercache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Providercache Suite") +} diff --git a/pkg/share/manager/jsoncs3/providercache/providercache_test.go b/pkg/share/manager/jsoncs3/providercache/providercache_test.go new file mode 100644 index 0000000000..9fdfdbc8f9 --- /dev/null +++ b/pkg/share/manager/jsoncs3/providercache/providercache_test.go @@ -0,0 +1,194 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package providercache_test + +import ( + "context" + "io/ioutil" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" +) + +var _ = Describe("Cache", func() { + var ( + c providercache.Cache + storage metadata.Storage + + storageID = "storageid" + spaceID = "spaceid" + shareID = "storageid$spaceid!share1" + share1 = &collaboration.Share{ + Id: &collaboration.ShareId{ + OpaqueId: "share1", + }, + } + ctx context.Context + tmpdir string + ) + + BeforeEach(func() { + ctx = context.Background() + + var err error + tmpdir, err = ioutil.TempDir("", "providercache-test") + Expect(err).ToNot(HaveOccurred()) + + err = os.MkdirAll(tmpdir, 0755) + Expect(err).ToNot(HaveOccurred()) + + storage, err = metadata.NewDiskStorage(tmpdir) + Expect(err).ToNot(HaveOccurred()) + + c = providercache.New(storage) + Expect(c).ToNot(BeNil()) + }) + + AfterEach(func() { + if tmpdir != "" { + os.RemoveAll(tmpdir) + } + }) + + Describe("Add", func() { + It("adds a share", func() { + s := c.Get(storageID, spaceID, shareID) + Expect(s).To(BeNil()) + + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + + s = c.Get(storageID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + Expect(s).To(Equal(share1)) + }) + + It("sets the mtime", func() { + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + Expect(c.Providers[storageID].Spaces[spaceID].Mtime).ToNot(Equal(time.Time{})) + }) + + It("updates the mtime", func() { + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + old := c.Providers[storageID].Spaces[spaceID].Mtime + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + Expect(c.Providers[storageID].Spaces[spaceID].Mtime).ToNot(Equal(old)) + }) + }) + + Context("with an existing entry", func() { + BeforeEach(func() { + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + }) + + Describe("Get", func() { + It("returns the entry", func() { + s := c.Get(storageID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + }) + }) + + Describe("Remove", func() { + It("removes the entry", func() { + s := c.Get(storageID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + Expect(s).To(Equal(share1)) + + Expect(c.Remove(ctx, storageID, spaceID, shareID)).To(Succeed()) + + s = c.Get(storageID, spaceID, shareID) + Expect(s).To(BeNil()) + }) + + It("updates the mtime", func() { + Expect(c.Add(ctx, storageID, spaceID, shareID, share1)).To(Succeed()) + old := c.Providers[storageID].Spaces[spaceID].Mtime + Expect(c.Remove(ctx, storageID, spaceID, shareID)).To(Succeed()) + Expect(c.Providers[storageID].Spaces[spaceID].Mtime).ToNot(Equal(old)) + }) + }) + + Describe("Persist", func() { + It("handles non-existent storages", func() { + Expect(c.Persist(ctx, "foo", "bar")).To(Succeed()) + }) + It("handles non-existent spaces", func() { + Expect(c.Persist(ctx, storageID, "bar")).To(Succeed()) + }) + + It("persists", func() { + Expect(c.Persist(ctx, storageID, spaceID)).To(Succeed()) + }) + + It("updates the mtime", func() { + oldMtime := c.Providers[storageID].Spaces[spaceID].Mtime + + Expect(c.Persist(ctx, storageID, spaceID)).To(Succeed()) + Expect(c.Providers[storageID].Spaces[spaceID].Mtime).ToNot(Equal(oldMtime)) + }) + + }) + + Describe("PersistWithTime", func() { + It("does not persist if the mtime on disk is more recent", func() { + Expect(c.PersistWithTime(ctx, storageID, spaceID, time.Now().Add(-3*time.Hour))).ToNot(Succeed()) + }) + }) + + Describe("Sync", func() { + BeforeEach(func() { + Expect(c.Persist(ctx, storageID, spaceID)).To(Succeed()) + // reset in-memory cache + c = providercache.New(storage) + }) + + It("downloads if needed", func() { + s := c.Get(storageID, spaceID, shareID) + Expect(s).To(BeNil()) + + Expect(c.Sync(ctx, storageID, spaceID)).To(Succeed()) + + s = c.Get(storageID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + }) + + It("does not download if not needed", func() { + s := c.Get(storageID, spaceID, shareID) + Expect(s).To(BeNil()) + + c.Providers[storageID] = &providercache.Spaces{ + Spaces: map[string]*providercache.Shares{ + spaceID: { + Mtime: time.Now(), + }, + }, + } + Expect(c.Sync(ctx, storageID, spaceID)).To(Succeed()) // Sync from disk won't happen because in-memory mtime is later than on disk + + s = c.Get(storageID, spaceID, shareID) + Expect(s).To(BeNil()) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedShareCache_suite_test.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedShareCache_suite_test.go new file mode 100644 index 0000000000..5877755c37 --- /dev/null +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedShareCache_suite_test.go @@ -0,0 +1,31 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package receivedsharecache_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestReceivedShareCache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ReceivedShareCache Suite") +} diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go new file mode 100644 index 0000000000..d2302f8ae1 --- /dev/null +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go @@ -0,0 +1,178 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package receivedsharecache + +import ( + "context" + "encoding/json" + "path" + "path/filepath" + "time" + + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + "github.com/cs3org/reva/v2/pkg/utils" +) + +// Cache stores the list of received shares and their states +// It functions as an in-memory cache with a persistence layer +// The storage is sharded by user +type Cache struct { + ReceivedSpaces map[string]*Spaces + + storage metadata.Storage +} + +// Spaces holds the received shares of one user per space +type Spaces struct { + Mtime time.Time + Spaces map[string]*Space +} + +// Space holds the received shares of one user in one space +type Space struct { + Mtime time.Time + States map[string]*State +} + +// State holds the state information of a received share +type State struct { + State collaboration.ShareState + MountPoint *provider.Reference +} + +// New returns a new Cache instance +func New(s metadata.Storage) Cache { + return Cache{ + ReceivedSpaces: map[string]*Spaces{}, + storage: s, + } +} + +// Add adds a new entry to the cache +func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error { + if c.ReceivedSpaces[userID] == nil { + c.ReceivedSpaces[userID] = &Spaces{ + Spaces: map[string]*Space{}, + } + } + if c.ReceivedSpaces[userID].Spaces[spaceID] == nil { + c.ReceivedSpaces[userID].Spaces[spaceID] = &Space{} + } + + receivedSpace := c.ReceivedSpaces[userID].Spaces[spaceID] + receivedSpace.Mtime = time.Now() + if receivedSpace.States == nil { + receivedSpace.States = map[string]*State{} + } + receivedSpace.States[rs.Share.Id.GetOpaqueId()] = &State{ + State: rs.State, + MountPoint: rs.MountPoint, + } + + return c.Persist(ctx, userID) +} + +// Get returns one entry from the cache +func (c *Cache) Get(userID, spaceID, shareID string) *State { + if c.ReceivedSpaces[userID] == nil || c.ReceivedSpaces[userID].Spaces[spaceID] == nil { + return nil + } + return c.ReceivedSpaces[userID].Spaces[spaceID].States[shareID] +} + +// Sync updates the in-memory data with the data from the storage if it is outdated +func (c *Cache) Sync(ctx context.Context, userID string) error { + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() + log.Debug().Msg("Syncing received share cache...") + + var mtime time.Time + if c.ReceivedSpaces[userID] != nil { + mtime = c.ReceivedSpaces[userID].Mtime + } else { + mtime = time.Time{} // Set zero time so that data from storage always takes precedence + } + + jsonPath := userJSONPath(userID) + info, err := c.storage.Stat(ctx, jsonPath) + if err != nil { + if _, ok := err.(errtypes.NotFound); ok { + return nil // Nothing to sync against + } + log.Error().Err(err).Msg("Failed to stat the received share") + return err + } + // check mtime of /users/{userid}/created.json + if utils.TSToTime(info.Mtime).After(mtime) { + log.Debug().Msg("Updating received share cache...") + // - update cached list of created shares for the user in memory if changed + createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) + if err != nil { + log.Error().Err(err).Msg("Failed to download the received share") + return err + } + newSpaces := &Spaces{} + err = json.Unmarshal(createdBlob, newSpaces) + if err != nil { + log.Error().Err(err).Msg("Failed to unmarshal the received share") + return err + } + c.ReceivedSpaces[userID] = newSpaces + } + log.Debug().Msg("Received share cache is up to date") + return nil +} + +// Persist persists the data for one user to the storage +func (c *Cache) Persist(ctx context.Context, userID string) error { + if c.ReceivedSpaces[userID] == nil { + return nil + } + + oldMtime := c.ReceivedSpaces[userID].Mtime + c.ReceivedSpaces[userID].Mtime = time.Now() + + createdBytes, err := json.Marshal(c.ReceivedSpaces[userID]) + if err != nil { + c.ReceivedSpaces[userID].Mtime = oldMtime + return err + } + jsonPath := userJSONPath(userID) + if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil { + c.ReceivedSpaces[userID].Mtime = oldMtime + return err + } + + if err = c.storage.Upload(ctx, metadata.UploadRequest{ + Path: jsonPath, + Content: createdBytes, + IfUnmodifiedSince: c.ReceivedSpaces[userID].Mtime, + }); err != nil { + c.ReceivedSpaces[userID].Mtime = oldMtime + return err + } + return nil +} + +func userJSONPath(userID string) string { + return filepath.Join("/users", userID, "received.json") +} diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go new file mode 100644 index 0000000000..bd3fcee34a --- /dev/null +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go @@ -0,0 +1,134 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package receivedsharecache_test + +import ( + "context" + "io/ioutil" + "os" + + collaboration "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + collaborationv1beta1 "github.com/cs3org/go-cs3apis/cs3/sharing/collaboration/v1beta1" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Cache", func() { + var ( + c receivedsharecache.Cache + storage metadata.Storage + + userID = "user" + spaceID = "spaceid" + shareID = "storageid$spaceid!share1" + share = &collaboration.Share{ + Id: &collaborationv1beta1.ShareId{ + OpaqueId: shareID, + }, + } + ctx context.Context + tmpdir string + ) + + BeforeEach(func() { + ctx = context.Background() + + var err error + tmpdir, err = ioutil.TempDir("", "providercache-test") + Expect(err).ToNot(HaveOccurred()) + + err = os.MkdirAll(tmpdir, 0755) + Expect(err).ToNot(HaveOccurred()) + + storage, err = metadata.NewDiskStorage(tmpdir) + Expect(err).ToNot(HaveOccurred()) + + c = receivedsharecache.New(storage) + Expect(c).ToNot(BeNil()) + }) + + AfterEach(func() { + if tmpdir != "" { + os.RemoveAll(tmpdir) + } + }) + + Describe("Add", func() { + It("adds an entry", func() { + rs := &collaboration.ReceivedShare{ + Share: share, + State: collaborationv1beta1.ShareState_SHARE_STATE_PENDING, + } + err := c.Add(ctx, userID, spaceID, rs) + Expect(err).ToNot(HaveOccurred()) + + s := c.Get(userID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + }) + + It("persists the new entry", func() { + rs := &collaboration.ReceivedShare{ + Share: share, + State: collaborationv1beta1.ShareState_SHARE_STATE_PENDING, + } + err := c.Add(ctx, userID, spaceID, rs) + Expect(err).ToNot(HaveOccurred()) + + c = receivedsharecache.New(storage) + Expect(c.Sync(ctx, userID)).To(Succeed()) + s := c.Get(userID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + }) + }) + + Describe("with an existing entry", func() { + BeforeEach(func() { + rs := &collaboration.ReceivedShare{ + Share: share, + State: collaborationv1beta1.ShareState_SHARE_STATE_PENDING, + } + Expect(c.Add(ctx, userID, spaceID, rs)).To(Succeed()) + }) + + Describe("Get", func() { + It("handles unknown users", func() { + s := c.Get("something", spaceID, shareID) + Expect(s).To(BeNil()) + }) + + It("handles unknown spaces", func() { + s := c.Get(userID, "something", shareID) + Expect(s).To(BeNil()) + }) + + It("handles unknown shares", func() { + s := c.Get(userID, spaceID, "something") + Expect(s).To(BeNil()) + }) + + It("gets the entry", func() { + s := c.Get(userID, spaceID, shareID) + Expect(s).ToNot(BeNil()) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/pkg/share/manager/jsoncs3/sharecache/sharecache.go new file mode 100644 index 0000000000..82cb19b27e --- /dev/null +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -0,0 +1,198 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sharecache + +import ( + "context" + "encoding/json" + "path" + "path/filepath" + "time" + + "github.com/cs3org/reva/v2/pkg/appctx" + "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" + "github.com/cs3org/reva/v2/pkg/utils" +) + +// Cache caches the list of share ids for users/groups +// It functions as an in-memory cache with a persistence layer +// The storage is sharded by user/group +type Cache struct { + UserShares map[string]*UserShareCache + + storage metadata.Storage + namespace string + filename string +} + +// UserShareCache holds the space/share map for one user +type UserShareCache struct { + Mtime time.Time + UserShares map[string]*SpaceShareIDs +} + +// SpaceShareIDs holds the unique list of share ids for a space +type SpaceShareIDs struct { + Mtime time.Time + IDs map[string]struct{} +} + +// New returns a new Cache instance +func New(s metadata.Storage, namespace, filename string) Cache { + return Cache{ + UserShares: map[string]*UserShareCache{}, + storage: s, + namespace: namespace, + filename: filename, + } +} + +// Add adds a share to the cache +func (c *Cache) Add(ctx context.Context, userid, shareID string) error { + storageid, spaceid, _ := shareid.Decode(shareID) + ssid := storageid + "^" + spaceid + + now := time.Now() + if c.UserShares[userid] == nil { + c.UserShares[userid] = &UserShareCache{ + UserShares: map[string]*SpaceShareIDs{}, + } + } + if c.UserShares[userid].UserShares[ssid] == nil { + c.UserShares[userid].UserShares[ssid] = &SpaceShareIDs{ + IDs: map[string]struct{}{}, + } + } + // add share id + c.UserShares[userid].Mtime = now + c.UserShares[userid].UserShares[ssid].Mtime = now + c.UserShares[userid].UserShares[ssid].IDs[shareID] = struct{}{} + return c.Persist(ctx, userid) +} + +// Remove removes a share for the given user +func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { + storageid, spaceid, _ := shareid.Decode(shareID) + ssid := storageid + "^" + spaceid + + if c.UserShares[userid] != nil { + if c.UserShares[userid].UserShares[ssid] != nil { + // remove share id + now := time.Now() + c.UserShares[userid].Mtime = now + c.UserShares[userid].UserShares[ssid].Mtime = now + delete(c.UserShares[userid].UserShares[ssid].IDs, shareID) + } + } + + return c.Persist(ctx, userid) +} + +// List return the list of spaces/shares for the given user/group +func (c *Cache) List(userid string) map[string]SpaceShareIDs { + r := map[string]SpaceShareIDs{} + if c.UserShares[userid] == nil { + return r + } + + for ssid, cached := range c.UserShares[userid].UserShares { + r[ssid] = SpaceShareIDs{ + Mtime: cached.Mtime, + IDs: cached.IDs, + } + } + return r +} + +// Sync updates the in-memory data with the data from the storage if it is outdated +func (c *Cache) Sync(ctx context.Context, userID string) error { + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() + log.Debug().Msg("Syncing share cache...") + + var mtime time.Time + // - do we have a cached list of created shares for the user in memory? + if usc := c.UserShares[userID]; usc != nil { + mtime = usc.Mtime + // - y: set If-Modified-Since header to only download if it changed + } else { + mtime = time.Time{} // Set zero time so that data from storage always takes precedence + } + + userCreatedPath := c.userCreatedPath(userID) + info, err := c.storage.Stat(ctx, userCreatedPath) + if err != nil { + if _, ok := err.(errtypes.NotFound); ok { + return nil // Nothing to sync against + } + log.Error().Err(err).Msg("Failed to stat the share cache") + return err + } + // check mtime of /users/{userid}/created.json + if utils.TSToTime(info.Mtime).After(mtime) { + log.Debug().Msg("Updating share cache...") + // - update cached list of created shares for the user in memory if changed + createdBlob, err := c.storage.SimpleDownload(ctx, userCreatedPath) + if err != nil { + log.Error().Err(err).Msg("Failed to download the share cache") + return err + } + newShareCache := &UserShareCache{} + err = json.Unmarshal(createdBlob, newShareCache) + if err != nil { + log.Error().Err(err).Msg("Failed to unmarshal the share cache") + return err + } + c.UserShares[userID] = newShareCache + } + log.Debug().Msg("Share cache is up to date") + return nil +} + +// Persist persists the data for one user/group to the storage +func (c *Cache) Persist(ctx context.Context, userid string) error { + oldMtime := c.UserShares[userid].Mtime + c.UserShares[userid].Mtime = time.Now() + + createdBytes, err := json.Marshal(c.UserShares[userid]) + if err != nil { + c.UserShares[userid].Mtime = oldMtime + return err + } + jsonPath := c.userCreatedPath(userid) + if err := c.storage.MakeDirIfNotExist(ctx, path.Dir(jsonPath)); err != nil { + c.UserShares[userid].Mtime = oldMtime + return err + } + + if err = c.storage.Upload(ctx, metadata.UploadRequest{ + Path: jsonPath, + Content: createdBytes, + IfUnmodifiedSince: c.UserShares[userid].Mtime, + }); err != nil { + c.UserShares[userid].Mtime = oldMtime + return err + } + return nil +} + +func (c *Cache) userCreatedPath(userid string) string { + return filepath.Join("/", c.namespace, userid, c.filename) +} diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache_suite_test.go b/pkg/share/manager/jsoncs3/sharecache/sharecache_suite_test.go new file mode 100644 index 0000000000..44026265b3 --- /dev/null +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache_suite_test.go @@ -0,0 +1,31 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sharecache_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSharecache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Sharecache Suite") +} diff --git a/pkg/share/manager/jsoncs3/sharecache/sharecache_test.go b/pkg/share/manager/jsoncs3/sharecache/sharecache_test.go new file mode 100644 index 0000000000..f6bd3b91f9 --- /dev/null +++ b/pkg/share/manager/jsoncs3/sharecache/sharecache_test.go @@ -0,0 +1,83 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sharecache_test + +import ( + "context" + "io/ioutil" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache" + "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" +) + +var _ = Describe("Sharecache", func() { + var ( + c sharecache.Cache + storage metadata.Storage + + userid = "user" + shareID = "storageid$spaceid!share1" + ctx context.Context + tmpdir string + ) + + BeforeEach(func() { + ctx = context.Background() + + var err error + tmpdir, err = ioutil.TempDir("", "providercache-test") + Expect(err).ToNot(HaveOccurred()) + + err = os.MkdirAll(tmpdir, 0755) + Expect(err).ToNot(HaveOccurred()) + + storage, err = metadata.NewDiskStorage(tmpdir) + Expect(err).ToNot(HaveOccurred()) + + c = sharecache.New(storage, "users", "created.json") + Expect(c).ToNot(BeNil()) + }) + + AfterEach(func() { + if tmpdir != "" { + os.RemoveAll(tmpdir) + } + }) + + Describe("Persist", func() { + Context("with an existing entry", func() { + BeforeEach(func() { + Expect(c.Add(ctx, userid, shareID)).To(Succeed()) + }) + + It("updates the mtime", func() { + oldMtime := c.UserShares[userid].Mtime + Expect(oldMtime).ToNot(Equal(time.Time{})) + + Expect(c.Persist(ctx, userid)).To(Succeed()) + Expect(c.UserShares[userid]).ToNot(Equal(oldMtime)) + }) + }) + }) +}) diff --git a/pkg/share/manager/jsoncs3/shareid/shareid.go b/pkg/share/manager/jsoncs3/shareid/shareid.go new file mode 100644 index 0000000000..65d774d198 --- /dev/null +++ b/pkg/share/manager/jsoncs3/shareid/shareid.go @@ -0,0 +1,43 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package shareid + +import "strings" + +// Encode encodes a share id +func Encode(providerID, spaceID, shareID string) string { + return providerID + "^" + spaceID + "°" + shareID +} + +// Decode decodes an encoded shareid +// share ids are of the format ^° +func Decode(id string) (string, string, string) { + parts := strings.SplitN(id, "^", 2) + if len(parts) == 1 { + return "", "", parts[0] + } + + storageid := parts[0] + parts = strings.SplitN(parts[1], "°", 2) + if len(parts) == 1 { + return storageid, parts[0], "" + } + + return storageid, parts[0], parts[1] +} diff --git a/pkg/share/manager/loader/loader.go b/pkg/share/manager/loader/loader.go index b29c6ad31a..e67719976d 100644 --- a/pkg/share/manager/loader/loader.go +++ b/pkg/share/manager/loader/loader.go @@ -22,6 +22,7 @@ import ( // Load core share manager drivers. _ "github.com/cs3org/reva/v2/pkg/share/manager/cs3" _ "github.com/cs3org/reva/v2/pkg/share/manager/json" + _ "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3" _ "github.com/cs3org/reva/v2/pkg/share/manager/memory" _ "github.com/cs3org/reva/v2/pkg/share/manager/owncloudsql" // Add your own here diff --git a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go index dd083b899c..a7a4d2b1cc 100644 --- a/pkg/storage/fs/nextcloud/nextcloud_server_mock.go +++ b/pkg/storage/fs/nextcloud/nextcloud_server_mock.go @@ -111,6 +111,7 @@ var responses = map[string]Response{ `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetPathByID {"storage_id":"00000000-0000-0000-0000-000000000000","opaque_id":"fileid-/some/path"} EMPTY`: {200, "/subdir", serverStateEmpty}, + `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/GetMD {"ref":{"path":"/file"},"mdKeys":null}`: {404, ``, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"path":"/file"},"uploadLength":0,"metadata":{"providerID":""}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, `POST /apps/sciencemesh/~f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c/api/storage/InitiateUpload {"ref":{"resource_id":{"storage_id":"f7fbf8c8-139b-4376-b307-cf0a8c2d0d9c"},"path":"/versionedFile"},"uploadLength":0,"metadata":{}}`: {200, `{"simple": "yes","tus": "yes"}`, serverStateEmpty}, diff --git a/pkg/storage/utils/decomposedfs/grants.go b/pkg/storage/utils/decomposedfs/grants.go index c9268989bd..cca3996d20 100644 --- a/pkg/storage/utils/decomposedfs/grants.go +++ b/pkg/storage/utils/decomposedfs/grants.go @@ -139,6 +139,10 @@ func (fs *Decomposedfs) RemoveGrant(ctx context.Context, ref *provider.Reference return err } + if grant == nil { + return errtypes.NotFound("grant not found") + } + // you are allowed to remove grants if you created them yourself or have the proper permission if !utils.UserEqual(grant.Creator, ctxpkg.ContextMustGetUser(ctx).GetId()) { ok, err := fs.p.HasPermission(ctx, node, func(rp *provider.ResourcePermissions) bool { diff --git a/pkg/storage/utils/metadata/cs3.go b/pkg/storage/utils/metadata/cs3.go index f8ae1157ef..45f1401bb2 100644 --- a/pkg/storage/utils/metadata/cs3.go +++ b/pkg/storage/utils/metadata/cs3.go @@ -25,6 +25,7 @@ import ( "io/ioutil" "net/http" "os" + "time" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" @@ -113,6 +114,14 @@ func (cs3 *CS3) Init(ctx context.Context, spaceid string) (err error) { // SimpleUpload uploads a file to the metadata storage func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []byte) error { + return cs3.Upload(ctx, UploadRequest{ + Path: uploadpath, + Content: content, + }) +} + +// Upload uploads a file to the metadata storage +func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) error { client, err := cs3.providerClient() if err != nil { return err @@ -122,14 +131,25 @@ func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []b return err } - ref := provider.InitiateFileUploadRequest{ + ifuReq := &provider.InitiateFileUploadRequest{ Ref: &provider.Reference{ ResourceId: cs3.SpaceRoot, - Path: utils.MakeRelativePath(uploadpath), + Path: utils.MakeRelativePath(req.Path), }, } - res, err := client.InitiateFileUpload(ctx, &ref) + if req.IfMatchEtag != "" { + ifuReq.Options = &provider.InitiateFileUploadRequest_IfMatch{ + IfMatch: req.IfMatchEtag, + } + } + if req.IfUnmodifiedSince != (time.Time{}) { + ifuReq.Options = &provider.InitiateFileUploadRequest_IfUnmodifiedSince{ + IfUnmodifiedSince: utils.TimeToTS(req.IfUnmodifiedSince), + } + } + + res, err := client.InitiateFileUpload(ctx, ifuReq) if err != nil { return err } @@ -149,20 +169,49 @@ func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []b return errors.New("metadata storage doesn't support the simple upload protocol") } - req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(content)) + httpReq, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(req.Content)) if err != nil { return err } md, _ := metadata.FromOutgoingContext(ctx) - req.Header.Add(ctxpkg.TokenHeader, md.Get(ctxpkg.TokenHeader)[0]) - resp, err := cs3.dataGatewayClient.Do(req) + httpReq.Header.Add(ctxpkg.TokenHeader, md.Get(ctxpkg.TokenHeader)[0]) + resp, err := cs3.dataGatewayClient.Do(httpReq) if err != nil { return err } return resp.Body.Close() } +// Stat returns the metadata for the given path +func (cs3 *CS3) Stat(ctx context.Context, path string) (*provider.ResourceInfo, error) { + client, err := cs3.providerClient() + if err != nil { + return nil, err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return nil, err + } + + req := provider.StatRequest{ + Ref: &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: utils.MakeRelativePath(path), + }, + } + + res, err := client.Stat(ctx, &req) + if err != nil { + return nil, err + } + if res.Status.Code != rpc.Code_CODE_OK { + return nil, errtypes.NewErrtypeFromStatus(res.Status) + } + + return res.Info, nil +} + // SimpleDownload reads a file from the metadata storage func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (content []byte, err error) { client, err := cs3.providerClient() @@ -255,6 +304,20 @@ func (cs3 *CS3) Delete(ctx context.Context, path string) error { // ReadDir returns the entries in a given directory func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { + infos, err := cs3.ListDir(ctx, path) + if err != nil { + return nil, err + } + + entries := []string{} + for _, ri := range infos { + entries = append(entries, ri.Path) + } + return entries, nil +} + +// ListDir returns a list of ResourceInfos for the entries in a given directory +func (cs3 *CS3) ListDir(ctx context.Context, path string) ([]*provider.ResourceInfo, error) { client, err := cs3.providerClient() if err != nil { return nil, err @@ -279,11 +342,7 @@ func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { return nil, errtypes.NewErrtypeFromStatus(res.Status) } - entries := []string{} - for _, ri := range res.Infos { - entries = append(entries, ri.Path) - } - return entries, nil + return res.Infos, nil } // MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. diff --git a/pkg/storage/utils/metadata/disk.go b/pkg/storage/utils/metadata/disk.go index 81062562ce..816177a774 100644 --- a/pkg/storage/utils/metadata/disk.go +++ b/pkg/storage/utils/metadata/disk.go @@ -20,10 +20,17 @@ package metadata import ( "context" + "errors" + "fmt" "io/fs" "io/ioutil" "os" "path" + "time" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/v2/pkg/errtypes" ) // Disk represents a disk metadata storage @@ -48,9 +55,64 @@ func (disk *Disk) Backend() string { return "disk" } +// Stat returns the metadata for the given path +func (disk *Disk) Stat(ctx context.Context, path string) (*provider.ResourceInfo, error) { + info, err := os.Stat(disk.targetPath(path)) + if err != nil { + return nil, err + } + entry := &provider.ResourceInfo{ + Type: provider.ResourceType_RESOURCE_TYPE_FILE, + Path: "./" + info.Name(), + Name: info.Name(), + Mtime: &typesv1beta1.Timestamp{Seconds: uint64(info.ModTime().Unix()), Nanos: uint32(info.ModTime().Nanosecond())}, + } + if info.IsDir() { + entry.Type = provider.ResourceType_RESOURCE_TYPE_CONTAINER + } + entry.Etag, err = calcEtag(info.ModTime(), info.Size()) + if err != nil { + return nil, err + } + return entry, nil +} + // SimpleUpload stores a file on disk -func (disk *Disk) SimpleUpload(_ context.Context, uploadpath string, content []byte) error { - return os.WriteFile(disk.targetPath(uploadpath), content, 0644) +func (disk *Disk) SimpleUpload(ctx context.Context, uploadpath string, content []byte) error { + return disk.Upload(ctx, UploadRequest{ + Path: uploadpath, + Content: content, + }) +} + +// Upload stores a file on disk +func (disk *Disk) Upload(_ context.Context, req UploadRequest) error { + p := disk.targetPath(req.Path) + if req.IfMatchEtag != "" { + info, err := os.Stat(p) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } else if err == nil { + etag, err := calcEtag(info.ModTime(), info.Size()) + if err != nil { + return err + } + if etag != req.IfMatchEtag { + return errtypes.PreconditionFailed("etag mismatch") + } + } + } + if req.IfUnmodifiedSince != (time.Time{}) { + info, err := os.Stat(p) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } else if err == nil { + if info.ModTime().After(req.IfUnmodifiedSince) { + return errtypes.PreconditionFailed(fmt.Sprintf("resource has been modified, mtime: %s > since %s", info.ModTime(), req.IfUnmodifiedSince)) + } + } + } + return os.WriteFile(p, req.Content, 0644) } // SimpleDownload reads a file from disk @@ -80,6 +142,32 @@ func (disk *Disk) ReadDir(_ context.Context, p string) ([]string, error) { return entries, nil } +// ListDir returns a list of ResourceInfos for the entries in a given directory +func (disk *Disk) ListDir(ctx context.Context, path string) ([]*provider.ResourceInfo, error) { + infos, err := ioutil.ReadDir(disk.targetPath(path)) + if err != nil { + if _, ok := err.(*fs.PathError); ok { + return []*provider.ResourceInfo{}, nil + } + return nil, err + } + + entries := make([]*provider.ResourceInfo, 0, len(infos)) + for _, info := range infos { + entry := &provider.ResourceInfo{ + Type: provider.ResourceType_RESOURCE_TYPE_FILE, + Path: "./" + info.Name(), + Name: info.Name(), + Mtime: &typesv1beta1.Timestamp{Seconds: uint64(info.ModTime().Unix()), Nanos: uint32(info.ModTime().Nanosecond())}, + } + if info.IsDir() { + entry.Type = provider.ResourceType_RESOURCE_TYPE_CONTAINER + } + entries = append(entries, entry) + } + return entries, nil +} + // MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. func (disk *Disk) MakeDirIfNotExist(_ context.Context, path string) error { return os.MkdirAll(disk.targetPath(path), 0777) diff --git a/pkg/storage/utils/metadata/mocks/Storage.go b/pkg/storage/utils/metadata/mocks/Storage.go index f783ed274a..e4ad15e87b 100644 --- a/pkg/storage/utils/metadata/mocks/Storage.go +++ b/pkg/storage/utils/metadata/mocks/Storage.go @@ -23,8 +23,11 @@ package mocks import ( context "context" + metadata "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" mock "github.com/stretchr/testify/mock" + providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + testing "testing" ) @@ -89,6 +92,29 @@ func (_m *Storage) Init(ctx context.Context, name string) error { return r0 } +// ListDir provides a mock function with given fields: ctx, path +func (_m *Storage) ListDir(ctx context.Context, path string) ([]*providerv1beta1.ResourceInfo, error) { + ret := _m.Called(ctx, path) + + var r0 []*providerv1beta1.ResourceInfo + if rf, ok := ret.Get(0).(func(context.Context, string) []*providerv1beta1.ResourceInfo); ok { + r0 = rf(ctx, path) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*providerv1beta1.ResourceInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, path) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // MakeDirIfNotExist provides a mock function with given fields: ctx, name func (_m *Storage) MakeDirIfNotExist(ctx context.Context, name string) error { ret := _m.Called(ctx, name) @@ -184,6 +210,43 @@ func (_m *Storage) SimpleUpload(ctx context.Context, uploadpath string, content return r0 } +// Stat provides a mock function with given fields: ctx, path +func (_m *Storage) Stat(ctx context.Context, path string) (*providerv1beta1.ResourceInfo, error) { + ret := _m.Called(ctx, path) + + var r0 *providerv1beta1.ResourceInfo + if rf, ok := ret.Get(0).(func(context.Context, string) *providerv1beta1.ResourceInfo); ok { + r0 = rf(ctx, path) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*providerv1beta1.ResourceInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, path) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Upload provides a mock function with given fields: ctx, req +func (_m *Storage) Upload(ctx context.Context, req metadata.UploadRequest) error { + ret := _m.Called(ctx, req) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, metadata.UploadRequest) error); ok { + r0 = rf(ctx, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewStorage creates a new instance of Storage. It also registers a cleanup function to assert the mocks expectations. func NewStorage(t testing.TB) *Storage { mock := &Storage{} diff --git a/pkg/storage/utils/metadata/storage.go b/pkg/storage/utils/metadata/storage.go index dda9bc6726..be695374b3 100644 --- a/pkg/storage/utils/metadata/storage.go +++ b/pkg/storage/utils/metadata/storage.go @@ -20,23 +20,52 @@ package metadata import ( "context" + "crypto/md5" + "encoding/binary" + "fmt" + "time" + + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ) //go:generate make --no-print-directory -C ../../../.. mockery NAME=Storage +// UploadRequest represents an upload request and its options +type UploadRequest struct { + Path string + Content []byte + + IfMatchEtag string + IfUnmodifiedSince time.Time +} + // Storage is the interface to maintain metadata in a storage type Storage interface { Backend() string Init(ctx context.Context, name string) (err error) + Upload(ctx context.Context, req UploadRequest) error SimpleUpload(ctx context.Context, uploadpath string, content []byte) error SimpleDownload(ctx context.Context, path string) ([]byte, error) Delete(ctx context.Context, path string) error + Stat(ctx context.Context, path string) (*provider.ResourceInfo, error) ReadDir(ctx context.Context, path string) ([]string, error) + ListDir(ctx context.Context, path string) ([]*provider.ResourceInfo, error) CreateSymlink(ctx context.Context, oldname, newname string) error ResolveSymlink(ctx context.Context, name string) (string, error) MakeDirIfNotExist(ctx context.Context, name string) error } + +func calcEtag(mtime time.Time, size int64) (string, error) { + h := md5.New() + if err := binary.Write(h, binary.BigEndian, mtime.UnixNano()); err != nil { + return "", err + } + if err := binary.Write(h, binary.BigEndian, size); err != nil { + return "", err + } + return fmt.Sprintf("%x", h.Sum(nil)), nil +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index f8f1cde42d..0d99af4939 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -143,6 +143,14 @@ func TSToTime(ts *types.Timestamp) time.Time { return time.Unix(int64(ts.Seconds), int64(ts.Nanos)) } +// TimeToTS converts Go's time.Time to a protobuf Timestamp. +func TimeToTS(t time.Time) *types.Timestamp { + return &types.Timestamp{ + Seconds: uint64(t.Unix()), // implicitly returns UTC + Nanos: uint32(t.Nanosecond()), + } +} + // LaterTS returns the timestamp which occurs later. func LaterTS(t1 *types.Timestamp, t2 *types.Timestamp) *types.Timestamp { if TSToUnixNano(t1) > TSToUnixNano(t2) {