Skip to content

Commit

Permalink
concurrently invalidate mtime cache in jsoncs3 share manager
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Jun 5, 2023
1 parent e841425 commit 14c8741
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 35 deletions.
3 changes: 3 additions & 0 deletions changelog/unreleased/jsoncs3-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Bugfix: concurrently invalidate mtime cache in jsoncs3 share manager

https://github.com/cs3org/reva/pull/3933
125 changes: 90 additions & 35 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"

gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
Expand Down Expand Up @@ -115,6 +116,7 @@ func init() {

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
ProviderAddr string `mapstructure:"provider_addr"`
ServiceUserID string `mapstructure:"service_user_id"`
ServiceUserIdp string `mapstructure:"service_user_idp"`
Expand Down Expand Up @@ -145,6 +147,8 @@ type Manager struct {

initialized bool

MaxConcurrency int

gateway gatewayv1beta1.GatewayAPIClient
eventStream events.Stream
}
Expand Down Expand Up @@ -703,7 +707,6 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
m.Lock()
defer m.Unlock()

var rss []*collaboration.ReceivedShare
user := ctxpkg.ContextMustGetUser(ctx)

ssids := map[string]*receivedsharecache.Space{}
Expand Down Expand Up @@ -750,46 +753,98 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
}

for ssid, rspace := range ssids {
storageID, spaceID, _ := shareid.Decode(ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
for shareID, state := range rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
numWorkers := m.MaxConcurrency
if len(ssids) < numWorkers {
numWorkers = len(ssids)
}

type w struct {
ssid string
rspace *receivedsharecache.Space
}
work := make(chan w)
results := make(chan *collaboration.ReceivedShare)

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for ssid, rspace := range ssids {
select {
case work <- w{ssid, rspace}:
case <-ctx.Done():
return ctx.Err()
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for w := range work {
storageID, spaceID, _ := shareid.Decode(w.ssid)
err := m.Cache.Sync(ctx, storageID, spaceID)
if err != nil {
continue
}
continue
}
for shareID, state := range w.rspace.States {
s := m.Cache.Get(storageID, spaceID, shareID)
if s == nil {
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
log.Error().Err(err).
Msg("failed to unshare expired share")
}
if err := events.Publish(m.eventStream, events.ShareExpired{
ShareOwner: s.GetOwner(),
ItemID: s.GetResourceId(),
ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())),
GranteeUserID: s.GetGrantee().GetUserId(),
GranteeGroupID: s.GetGrantee().GetGroupId(),
}); err != nil {
log.Error().Err(err).
Msg("failed to publish share expired event")
}
continue
}

if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
if share.IsGrantedToUser(s, user) {
if share.MatchesFiltersWithState(s, state.State, filters) {
rs := &collaboration.ReceivedShare{
Share: s,
State: state.State,
MountPoint: state.MountPoint,
}
select {
case results <- rs:
case <-ctx.Done():
return ctx.Err()
}
}
}
rss = append(rss, rs)
}
}
}
return nil
})
}

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

rss := []*collaboration.ReceivedShare{}
for n := range results {
rss = append(rss, n)
}

if err := g.Wait(); err != nil {
return nil, err
}

return rss, nil
Expand Down

0 comments on commit 14c8741

Please sign in to comment.