diff --git a/changelog/unreleased/jsoncs3-concurrency.md b/changelog/unreleased/jsoncs3-concurrency.md new file mode 100644 index 0000000000..4c5d5128b9 --- /dev/null +++ b/changelog/unreleased/jsoncs3-concurrency.md @@ -0,0 +1,3 @@ +Bugfix: concurrently invalidate mtime cache in jsoncs3 share manager + +https://github.com/cs3org/reva/pull/3933 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index 2d301ba830..5c3e7a62b5 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -33,6 +33,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" @@ -115,6 +116,7 @@ func init() { type config struct { GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` ProviderAddr string `mapstructure:"provider_addr"` ServiceUserID string `mapstructure:"service_user_id"` ServiceUserIdp string `mapstructure:"service_user_idp"` @@ -145,6 +147,8 @@ type Manager struct { initialized bool + MaxConcurrency int + gateway gatewayv1beta1.GatewayAPIClient eventStream events.Stream } @@ -703,7 +707,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati m.Lock() defer m.Unlock() - var rss []*collaboration.ReceivedShare user := ctxpkg.ContextMustGetUser(ctx) ssids := map[string]*receivedsharecache.Space{} @@ -750,46 +753,98 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } } - for ssid, rspace := range ssids { - storageID, spaceID, _ := shareid.Decode(ssid) - err := m.Cache.Sync(ctx, storageID, spaceID) - if err != nil { - continue - } - for shareID, state := range rspace.States { - s := m.Cache.Get(storageID, spaceID, shareID) - if s == nil { - continue + numWorkers := m.MaxConcurrency + if len(ssids) < numWorkers { + numWorkers = len(ssids) + } + + type w struct { + ssid string + rspace *receivedsharecache.Space + } + work := make(chan w) + results := make(chan *collaboration.ReceivedShare) + + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for ssid, rspace := range ssids { + select { + case work <- w{ssid, rspace}: + case <-ctx.Done(): + return ctx.Err() } - if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for w := range work { + storageID, spaceID, _ := shareid.Decode(w.ssid) + err := m.Cache.Sync(ctx, storageID, spaceID) + if err != nil { + continue } - continue - } + for shareID, state := range w.rspace.States { + s := m.Cache.Get(storageID, spaceID, shareID) + if s == nil { + continue + } + if share.IsExpired(s) { + if err := m.removeShare(ctx, s); err != nil { + log.Error().Err(err). + Msg("failed to unshare expired share") + } + if err := events.Publish(m.eventStream, events.ShareExpired{ + ShareOwner: s.GetOwner(), + ItemID: s.GetResourceId(), + ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), + GranteeUserID: s.GetGrantee().GetUserId(), + GranteeGroupID: s.GetGrantee().GetGroupId(), + }); err != nil { + log.Error().Err(err). + Msg("failed to publish share expired event") + } + continue + } - if share.IsGrantedToUser(s, user) { - if share.MatchesFiltersWithState(s, state.State, filters) { - rs := &collaboration.ReceivedShare{ - Share: s, - State: state.State, - MountPoint: state.MountPoint, + if share.IsGrantedToUser(s, user) { + if share.MatchesFiltersWithState(s, state.State, filters) { + rs := &collaboration.ReceivedShare{ + Share: s, + State: state.State, + MountPoint: state.MountPoint, + } + select { + case results <- rs: + case <-ctx.Done(): + return ctx.Err() + } + } } - rss = append(rss, rs) } } - } + return nil + }) + } + + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + rss := []*collaboration.ReceivedShare{} + for n := range results { + rss = append(rss, n) + } + + if err := g.Wait(); err != nil { + return nil, err } return rss, nil