diff --git a/changelog/unreleased/use-rlock.md b/changelog/unreleased/use-rlock.md new file mode 100644 index 0000000000..37649286a1 --- /dev/null +++ b/changelog/unreleased/use-rlock.md @@ -0,0 +1,5 @@ +Bugfix: We use read locks when listing shares + +We use read locks when listing shares + +https://github.com/cs3org/reva/pull/3935 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 6943605f08..e87b4be6a9 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -292,6 +292,9 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla m.Lock() defer m.Unlock() + + m.invalidateCache(ctx, user) + _, err := m.getByKey(ctx, key) if err == nil { // share already exists @@ -429,28 +432,14 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() s, err := m.get(ctx, ref) if err != nil { return nil, err } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareID: s.GetId(), - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } + return nil, errors.Errorf("share is expired: %s", s.GetId()) } // check if we are the creator or the grantee // TODO allow manager to get shares in a space created by other users @@ -486,6 +475,8 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference defer m.Unlock() user := ctxpkg.ContextMustGetUser(ctx) + m.invalidateCache(ctx, user) + s, err := m.get(ctx, ref) if err != nil { return err @@ -511,6 +502,10 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer m.Lock() defer m.Unlock() + user := ctxpkg.ContextMustGetUser(ctx) + + m.invalidateCache(ctx, user) + var toUpdate *collaboration.Share if ref != nil { @@ -540,7 +535,6 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer } } - user := ctxpkg.ContextMustGetUser(ctx) if !share.IsCreatedByUser(toUpdate, user) { req := &provider.StatRequest{ Ref: &provider.Reference{ResourceId: toUpdate.ResourceId}, @@ -590,8 +584,8 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() user := ctxpkg.ContextMustGetUser(ctx) @@ -629,20 +623,6 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f for _, s := range shares.Shares { if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } continue } if !share.MatchesFilters(s, filters) { @@ -694,20 +674,6 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, continue } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } continue } if utils.UserEqual(user.GetId(), s.GetCreator()) { @@ -721,6 +687,63 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, return ss, nil } +// call in locked context (write lock) +func (m *Manager) invalidateCache(ctx context.Context, user *userv1beta1.User) { + ssids := map[string]*receivedsharecache.Space{} + + for _, group := range user.Groups { + if err := m.GroupReceivedCache.Sync(ctx, group); err != nil { + continue // ignore error, cache will be updated on next read + } + + for ssid, spaceShareIDs := range m.GroupReceivedCache.List(group) { + // add a pending entry, the state will be updated + // when reading the received shares below if they have already been accepted or denied + var rs *receivedsharecache.Space + var ok bool + if rs, ok = ssids[ssid]; !ok { + rs = &receivedsharecache.Space{ + Mtime: spaceShareIDs.Mtime, + States: make(map[string]*receivedsharecache.State, len(spaceShareIDs.IDs)), + } + ssids[ssid] = rs + } + + for shareid := range spaceShareIDs.IDs { + rs.States[shareid] = &receivedsharecache.State{ + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + } + } + } + + for _, s := range ssids { + for sid := range s.States { + sh, err := m.getByID(ctx, &collaboration.ShareId{ + OpaqueId: sid, + }) + + if err != nil && share.IsExpired(sh) { + if err := m.removeShare(ctx, sh); err != nil { + log.Error().Err(err). + Msg("failed to unshare expired share") + } + if err := events.Publish(m.eventStream, events.ShareExpired{ + ShareID: sh.GetId(), + ShareOwner: sh.GetOwner(), + ItemID: sh.GetResourceId(), + ExpiredAt: time.Unix(int64(sh.GetExpiration().GetSeconds()), int64(sh.GetExpiration().GetNanos())), + GranteeUserID: sh.GetGrantee().GetUserId(), + GranteeGroupID: sh.GetGrantee().GetGroupId(), + }); err != nil { + log.Error().Err(err). + Msg("failed to publish share expired event") + } + } + } + } +} + // ListReceivedShares returns the list of shares the user has access to. func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares") @@ -730,8 +753,8 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() user := ctxpkg.ContextMustGetUser(ctx) @@ -821,20 +844,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati continue } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } continue } @@ -910,8 +919,8 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived") defer span.End() - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() s, err := m.get(ctx, ref) if err != nil { return nil, err @@ -920,22 +929,11 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer if !share.IsGrantedToUser(s, user) { return nil, errtypes.NotFound(ref.String()) } + if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } + return nil, errors.Errorf("share is expired: %s", s.GetId()) } + return m.convert(ctx, user.Id.GetOpaqueId(), s), nil } @@ -955,6 +953,9 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab m.Lock() defer m.Unlock() + userID := ctxpkg.ContextMustGetUser(ctx) + + m.invalidateCache(ctx, userID) for i := range fieldMask.Paths { switch fieldMask.Paths[i] { @@ -968,9 +969,6 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab } // write back - - userID := ctxpkg.ContextMustGetUser(ctx) - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs) if _, ok := err.(errtypes.IsPreconditionFailed); ok { // when persisting fails, download, readd and persist again diff --git a/pkg/share/share.go b/pkg/share/share.go index 67e58ab0d6..3fb48c3b09 100644 --- a/pkg/share/share.go +++ b/pkg/share/share.go @@ -242,6 +242,9 @@ func FilterFiltersByType(f []*collaboration.Filter, t collaboration.Filter_Type) // IsExpired tests whether a share is expired func IsExpired(s *collaboration.Share) bool { - expiration := time.Unix(int64(s.Expiration.GetSeconds()), int64(s.Expiration.GetNanos())) - return s.Expiration != nil && expiration.Before(time.Now()) + if e := s.GetExpiration(); e != nil { + expiration := time.Unix(int64(e.Seconds), int64(e.Nanos)) + return expiration.Before(time.Now()) + } + return false }