Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Jsoncs3 investigate #3935

Closed
wants to merge 14 commits into from
5 changes: 5 additions & 0 deletions changelog/unreleased/use-rlock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: We use read locks when listing shares

We use read locks when listing shares

https://github.com/cs3org/reva/pull/3935
164 changes: 81 additions & 83 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla

m.Lock()
defer m.Unlock()

m.invalidateCache(ctx, user)

_, err := m.getByKey(ctx, key)
if err == nil {
// share already exists
Expand Down Expand Up @@ -429,28 +432,14 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
return nil, err
}

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()
s, err := m.get(ctx, ref)
if err != nil {
return nil, err
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareID: s.GetId(),
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
return nil, errors.Errorf("share is expired: %s", s.GetId())
}
// check if we are the creator or the grantee
// TODO allow manager to get shares in a space created by other users
Expand Down Expand Up @@ -486,6 +475,8 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
defer m.Unlock()
user := ctxpkg.ContextMustGetUser(ctx)

m.invalidateCache(ctx, user)

s, err := m.get(ctx, ref)
if err != nil {
return err
Expand All @@ -511,6 +502,10 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
m.Lock()
defer m.Unlock()

user := ctxpkg.ContextMustGetUser(ctx)

m.invalidateCache(ctx, user)
dragonchaser marked this conversation as resolved.
Show resolved Hide resolved

var toUpdate *collaboration.Share

if ref != nil {
Expand Down Expand Up @@ -540,7 +535,6 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer
}
}

user := ctxpkg.ContextMustGetUser(ctx)
if !share.IsCreatedByUser(toUpdate, user) {
req := &provider.StatRequest{
Ref: &provider.Reference{ResourceId: toUpdate.ResourceId},
Expand Down Expand Up @@ -590,8 +584,8 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte
return nil, err
}

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

user := ctxpkg.ContextMustGetUser(ctx)

Expand Down Expand Up @@ -629,20 +623,6 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f

for _, s := range shares.Shares {
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if !share.MatchesFilters(s, filters) {
Expand Down Expand Up @@ -694,20 +674,6 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}
if utils.UserEqual(user.GetId(), s.GetCreator()) {
Expand All @@ -721,6 +687,63 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
return ss, nil
}

// call in locked context (write lock)
func (m *Manager) invalidateCache(ctx context.Context, user *userv1beta1.User) {
dragonchaser marked this conversation as resolved.
Show resolved Hide resolved
ssids := map[string]*receivedsharecache.Space{}

for _, group := range user.Groups {
if err := m.GroupReceivedCache.Sync(ctx, group); err != nil {
continue // ignore error, cache will be updated on next read
}

for ssid, spaceShareIDs := range m.GroupReceivedCache.List(group) {
// add a pending entry, the state will be updated
// when reading the received shares below if they have already been accepted or denied
var rs *receivedsharecache.Space
var ok bool
if rs, ok = ssids[ssid]; !ok {
rs = &receivedsharecache.Space{
Mtime: spaceShareIDs.Mtime,
States: make(map[string]*receivedsharecache.State, len(spaceShareIDs.IDs)),
}
ssids[ssid] = rs
}

for shareid := range spaceShareIDs.IDs {
rs.States[shareid] = &receivedsharecache.State{
State: collaboration.ShareState_SHARE_STATE_PENDING,
}
}
}
}

for _, s := range ssids {
for sid := range s.States {
sh, err := m.getByID(ctx, &collaboration.ShareId{
OpaqueId: sid,
})

if err != nil && share.IsExpired(sh) {
if err := m.removeShare(ctx, sh); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
dragonchaser marked this conversation as resolved.
Show resolved Hide resolved
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareID: sh.GetId(),
ShareOwner: sh.GetOwner(),
ItemID: sh.GetResourceId(),
ExpiredAt: time.Unix(int64(sh.GetExpiration().GetSeconds()), int64(sh.GetExpiration().GetNanos())),
GranteeUserID: sh.GetGrantee().GetUserId(),
GranteeGroupID: sh.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
}
}
}
}

// ListReceivedShares returns the list of shares the user has access to.
func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares")
Expand All @@ -730,8 +753,8 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
return nil, err
}

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()

user := ctxpkg.ContextMustGetUser(ctx)

Expand Down Expand Up @@ -821,20 +844,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}

Expand Down Expand Up @@ -910,8 +919,8 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived")
defer span.End()

m.Lock()
defer m.Unlock()
m.RLock()
defer m.RUnlock()
s, err := m.get(ctx, ref)
if err != nil {
return nil, err
kobergj marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -920,22 +929,11 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
if !share.IsGrantedToUser(s, user) {
return nil, errtypes.NotFound(ref.String())
kobergj marked this conversation as resolved.
Show resolved Hide resolved
}

if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
return nil, errors.Errorf("share is expired: %s", s.GetId())
}

return m.convert(ctx, user.Id.GetOpaqueId(), s), nil
}

Expand All @@ -955,6 +953,9 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab

m.Lock()
defer m.Unlock()
userID := ctxpkg.ContextMustGetUser(ctx)

m.invalidateCache(ctx, userID)

for i := range fieldMask.Paths {
switch fieldMask.Paths[i] {
Expand All @@ -968,9 +969,6 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab
}

// write back

userID := ctxpkg.ContextMustGetUser(ctx)

err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs)
if _, ok := err.(errtypes.IsPreconditionFailed); ok {
// when persisting fails, download, readd and persist again
Expand Down
7 changes: 5 additions & 2 deletions pkg/share/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func FilterFiltersByType(f []*collaboration.Filter, t collaboration.Filter_Type)

// IsExpired tests whether a share is expired
func IsExpired(s *collaboration.Share) bool {
expiration := time.Unix(int64(s.Expiration.GetSeconds()), int64(s.Expiration.GetNanos()))
return s.Expiration != nil && expiration.Before(time.Now())
if e := s.GetExpiration(); e != nil {
expiration := time.Unix(int64(e.Seconds), int64(e.Nanos))
return expiration.Before(time.Now())
}
return false
}