diff --git a/go.mod b/go.mod index 7c0a0b58bb8..f43e43c8439 100644 --- a/go.mod +++ b/go.mod @@ -328,3 +328,5 @@ require ( ) replace github.com/cs3org/go-cs3apis => github.com/2403905/go-cs3apis v0.0.0-20230517122726-727045414fd1 + +replace github.com/cs3org/reva/v2 => github.com/dragotin/reva/v2 v2.0.0-20230606110628-a2480a54cfc9 diff --git a/go.sum b/go.sum index 1fadeecb701..6ed5a6328d9 100644 --- a/go.sum +++ b/go.sum @@ -629,8 +629,6 @@ github.com/crewjam/httperr v0.2.0 h1:b2BfXR8U3AlIHwNeFFvZ+BV1LFvKLlzMjzaTnZMybNo github.com/crewjam/httperr v0.2.0/go.mod h1:Jlz+Sg/XqBQhyMjdDiC+GNNRzZTD7x39Gu3pglZ5oH4= github.com/crewjam/saml v0.4.13 h1:TYHggH/hwP7eArqiXSJUvtOPNzQDyQ7vwmwEqlFWhMc= github.com/crewjam/saml v0.4.13/go.mod h1:igEejV+fihTIlHXYP8zOec3V5A8y3lws5bQBFsTm4gA= -github.com/cs3org/reva/v2 v2.14.0 h1:X5da4chnEPzqUb76y/DDDawFloRAG7Gy/BMpeYh7vu8= -github.com/cs3org/reva/v2 v2.14.0/go.mod h1:vMQqSn30fEPHO/GKC2WmGimlOPqvfSy4gdhRSpbvrWc= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= @@ -663,6 +661,8 @@ github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyG github.com/dnsimple/dnsimple-go v0.63.0/go.mod h1:O5TJ0/U6r7AfT8niYNlmohpLbCSG+c71tQlGr9SeGrg= github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dragotin/reva/v2 v2.0.0-20230606110628-a2480a54cfc9 h1:dVSaoDTforscYbbOncgwnLqcXF2ukiuj3yPr4fRFZoM= +github.com/dragotin/reva/v2 v2.0.0-20230606110628-a2480a54cfc9/go.mod h1:vMQqSn30fEPHO/GKC2WmGimlOPqvfSy4gdhRSpbvrWc= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e h1:rcHHSQqzCgvlwP0I/fQ8rQMn/MpHE5gWSLdtpxtP6KQ= diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go index 2d301ba830c..e87b4be6a9a 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/jsoncs3.go @@ -33,6 +33,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" gatewayv1beta1 "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" @@ -109,12 +110,16 @@ import ( - if the mtime changed we download the file to update the local cache */ +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "jsoncs3" + func init() { registry.Register("jsoncs3", NewDefault) } type config struct { GatewayAddr string `mapstructure:"gateway_addr"` + MaxConcurrency int `mapstructure:"max_concurrency"` ProviderAddr string `mapstructure:"provider_addr"` ServiceUserID string `mapstructure:"service_user_id"` ServiceUserIdp string `mapstructure:"service_user_idp"` @@ -145,6 +150,8 @@ type Manager struct { initialized bool + MaxConcurrency int + gateway gatewayv1beta1.GatewayAPIClient eventStream events.Stream } @@ -205,11 +212,11 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { } } - return New(s, gc, c.CacheTTL, es) + return New(s, gc, c.CacheTTL, es, c.MaxConcurrency) } // New returns a new manager instance. -func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream) (*Manager, error) { +func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second return &Manager{ Cache: providercache.New(s, ttl), @@ -219,6 +226,7 @@ func New(s metadata.Storage, gc gatewayv1beta1.GatewayAPIClient, ttlSeconds int, storage: s, gateway: gc, eventStream: es, + MaxConcurrency: maxconcurrency, }, nil } @@ -259,6 +267,8 @@ func (m *Manager) initialize() error { // Share creates a new share func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") + defer span.End() if err := m.initialize(); err != nil { return nil, err } @@ -282,6 +292,9 @@ func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *colla m.Lock() defer m.Unlock() + + m.invalidateCache(ctx, user) + _, err := m.getByKey(ctx, key) if err == nil { // share already exists @@ -413,32 +426,20 @@ func (m *Manager) get(ctx context.Context, ref *collaboration.ShareReference) (s // GetShare gets the information for a share by the given ref. func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "GetShare") + defer span.End() if err := m.initialize(); err != nil { return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() s, err := m.get(ctx, ref) if err != nil { return nil, err } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareID: s.GetId(), - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } + return nil, errors.Errorf("share is expired: %s", s.GetId()) } // check if we are the creator or the grantee // TODO allow manager to get shares in a space created by other users @@ -463,6 +464,9 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc // Unshare deletes a share func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Unshare") + defer span.End() + if err := m.initialize(); err != nil { return err } @@ -471,6 +475,8 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference defer m.Unlock() user := ctxpkg.ContextMustGetUser(ctx) + m.invalidateCache(ctx, user) + s, err := m.get(ctx, ref) if err != nil { return err @@ -486,6 +492,9 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference // UpdateShare updates the mode of the given share. func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareReference, p *collaboration.SharePermissions, updated *collaboration.Share, fieldMask *field_mask.FieldMask) (*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateShare") + defer span.End() + if err := m.initialize(); err != nil { return nil, err } @@ -493,6 +502,10 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer m.Lock() defer m.Unlock() + user := ctxpkg.ContextMustGetUser(ctx) + + m.invalidateCache(ctx, user) + var toUpdate *collaboration.Share if ref != nil { @@ -522,7 +535,6 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer } } - user := ctxpkg.ContextMustGetUser(ctx) if !share.IsCreatedByUser(toUpdate, user) { req := &provider.StatRequest{ Ref: &provider.Reference{ResourceId: toUpdate.ResourceId}, @@ -565,12 +577,15 @@ func (m *Manager) UpdateShare(ctx context.Context, ref *collaboration.ShareRefer // ListShares returns the shares created by the user func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListShares") + defer span.End() + if err := m.initialize(); err != nil { return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() user := ctxpkg.ContextMustGetUser(ctx) @@ -582,6 +597,9 @@ func (m *Manager) ListShares(ctx context.Context, filters []*collaboration.Filte } func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listSharesByIDs") + defer span.End() + providerSpaces := make(map[string]map[string]struct{}) for _, f := range share.FilterFiltersByType(filters, collaboration.Filter_TYPE_RESOURCE_ID) { storageID := f.GetResourceId().GetStorageId() @@ -605,20 +623,6 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f for _, s := range shares.Shares { if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } continue } if !share.MatchesFilters(s, filters) { @@ -649,6 +653,9 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f } func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, filters []*collaboration.Filter) ([]*collaboration.Share, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "listCreatedShares") + defer span.End() + var ss []*collaboration.Share if err := m.CreatedCache.Sync(ctx, user.Id.OpaqueId); err != nil { @@ -667,20 +674,6 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, continue } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } continue } if utils.UserEqual(user.GetId(), s.GetCreator()) { @@ -694,16 +687,75 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, return ss, nil } +// call in locked context (write lock) +func (m *Manager) invalidateCache(ctx context.Context, user *userv1beta1.User) { + ssids := map[string]*receivedsharecache.Space{} + + for _, group := range user.Groups { + if err := m.GroupReceivedCache.Sync(ctx, group); err != nil { + continue // ignore error, cache will be updated on next read + } + + for ssid, spaceShareIDs := range m.GroupReceivedCache.List(group) { + // add a pending entry, the state will be updated + // when reading the received shares below if they have already been accepted or denied + var rs *receivedsharecache.Space + var ok bool + if rs, ok = ssids[ssid]; !ok { + rs = &receivedsharecache.Space{ + Mtime: spaceShareIDs.Mtime, + States: make(map[string]*receivedsharecache.State, len(spaceShareIDs.IDs)), + } + ssids[ssid] = rs + } + + for shareid := range spaceShareIDs.IDs { + rs.States[shareid] = &receivedsharecache.State{ + State: collaboration.ShareState_SHARE_STATE_PENDING, + } + } + } + } + + for _, s := range ssids { + for sid := range s.States { + sh, err := m.getByID(ctx, &collaboration.ShareId{ + OpaqueId: sid, + }) + + if err != nil && share.IsExpired(sh) { + if err := m.removeShare(ctx, sh); err != nil { + log.Error().Err(err). + Msg("failed to unshare expired share") + } + if err := events.Publish(m.eventStream, events.ShareExpired{ + ShareID: sh.GetId(), + ShareOwner: sh.GetOwner(), + ItemID: sh.GetResourceId(), + ExpiredAt: time.Unix(int64(sh.GetExpiration().GetSeconds()), int64(sh.GetExpiration().GetNanos())), + GranteeUserID: sh.GetGrantee().GetUserId(), + GranteeGroupID: sh.GetGrantee().GetGroupId(), + }); err != nil { + log.Error().Err(err). + Msg("failed to publish share expired event") + } + } + } + } +} + // ListReceivedShares returns the list of shares the user has access to. func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaboration.Filter) ([]*collaboration.ReceivedShare, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListReceivedShares") + defer span.End() + if err := m.initialize(); err != nil { return nil, err } - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() - var rss []*collaboration.ReceivedShare user := ctxpkg.ContextMustGetUser(ctx) ssids := map[string]*receivedsharecache.Space{} @@ -750,46 +802,84 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } } - for ssid, rspace := range ssids { - storageID, spaceID, _ := shareid.Decode(ssid) - err := m.Cache.Sync(ctx, storageID, spaceID) - if err != nil { - continue - } - for shareID, state := range rspace.States { - s := m.Cache.Get(storageID, spaceID, shareID) - if s == nil { - continue + numWorkers := m.MaxConcurrency + if numWorkers == 0 || len(ssids) < numWorkers { + numWorkers = len(ssids) + } + + type w struct { + ssid string + rspace *receivedsharecache.Space + } + work := make(chan w) + results := make(chan *collaboration.ReceivedShare) + + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for ssid, rspace := range ssids { + select { + case work <- w{ssid, rspace}: + case <-ctx.Done(): + return ctx.Err() } - if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for w := range work { + storageID, spaceID, _ := shareid.Decode(w.ssid) + err := m.Cache.Sync(ctx, storageID, spaceID) + if err != nil { + continue } - continue - } + for shareID, state := range w.rspace.States { + s := m.Cache.Get(storageID, spaceID, shareID) + if s == nil { + continue + } + if share.IsExpired(s) { + continue + } - if share.IsGrantedToUser(s, user) { - if share.MatchesFiltersWithState(s, state.State, filters) { - rs := &collaboration.ReceivedShare{ - Share: s, - State: state.State, - MountPoint: state.MountPoint, + if share.IsGrantedToUser(s, user) { + if share.MatchesFiltersWithState(s, state.State, filters) { + rs := &collaboration.ReceivedShare{ + Share: s, + State: state.State, + MountPoint: state.MountPoint, + } + select { + case results <- rs: + case <-ctx.Done(): + return ctx.Err() + } + } } - rss = append(rss, rs) } } - } + return nil + }) + } + + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + rss := []*collaboration.ReceivedShare{} + for n := range results { + rss = append(rss, n) + } + + if err := g.Wait(); err != nil { + return nil, err } return rss, nil @@ -797,6 +887,9 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati // convert must be called in a lock-controlled block. func (m *Manager) convert(ctx context.Context, userID string, s *collaboration.Share) *collaboration.ReceivedShare { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "convert") + defer span.End() + rs := &collaboration.ReceivedShare{ Share: s, State: collaboration.ShareState_SHARE_STATE_PENDING, @@ -823,8 +916,11 @@ func (m *Manager) GetReceivedShare(ctx context.Context, ref *collaboration.Share } func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareReference) (*collaboration.ReceivedShare, error) { - m.Lock() - defer m.Unlock() + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getReceived") + defer span.End() + + m.RLock() + defer m.RUnlock() s, err := m.get(ctx, ref) if err != nil { return nil, err @@ -833,27 +929,19 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer if !share.IsGrantedToUser(s, user) { return nil, errtypes.NotFound(ref.String()) } + if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { - log.Error().Err(err). - Msg("failed to unshare expired share") - } - if err := events.Publish(m.eventStream, events.ShareExpired{ - ShareOwner: s.GetOwner(), - ItemID: s.GetResourceId(), - ExpiredAt: time.Unix(int64(s.GetExpiration().GetSeconds()), int64(s.GetExpiration().GetNanos())), - GranteeUserID: s.GetGrantee().GetUserId(), - GranteeGroupID: s.GetGrantee().GetGroupId(), - }); err != nil { - log.Error().Err(err). - Msg("failed to publish share expired event") - } + return nil, errors.Errorf("share is expired: %s", s.GetId()) } + return m.convert(ctx, user.Id.GetOpaqueId(), s), nil } // UpdateReceivedShare updates the received share with share state. func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collaboration.ReceivedShare, fieldMask *field_mask.FieldMask) (*collaboration.ReceivedShare, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "UpdateReceivedShare") + defer span.End() + if err := m.initialize(); err != nil { return nil, err } @@ -865,6 +953,9 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab m.Lock() defer m.Unlock() + userID := ctxpkg.ContextMustGetUser(ctx) + + m.invalidateCache(ctx, userID) for i := range fieldMask.Paths { switch fieldMask.Paths[i] { @@ -878,9 +969,6 @@ func (m *Manager) UpdateReceivedShare(ctx context.Context, receivedShare *collab } // write back - - userID := ctxpkg.ContextMustGetUser(ctx) - err = m.UserReceivedStates.Add(ctx, userID.GetId().GetOpaqueId(), rs.Share.ResourceId.StorageId+shareid.IDDelimiter+rs.Share.ResourceId.SpaceId, rs) if _, ok := err.(errtypes.IsPreconditionFailed); ok { // when persisting fails, download, readd and persist again @@ -964,6 +1052,9 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar } func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare") + defer span.End() + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) if _, ok := err.(errtypes.IsPreconditionFailed); ok { diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go index 528922ac491..2fc4ed18262 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -33,8 +33,13 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "providercache" + // Cache holds share information structured by provider and space type Cache struct { Providers map[string]*Spaces @@ -106,6 +111,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache { // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, share *collaboration.Share) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID)) + switch { case storageID == "": return fmt.Errorf("missing storage id") @@ -122,6 +131,10 @@ func (c *Cache) Add(ctx context.Context, storageID, spaceID, shareID string, sha // Remove removes a share from the cache func (c *Cache) Remove(ctx context.Context, storageID, spaceID, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID), attribute.String("cs3.shareid", shareID)) + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { return nil @@ -150,6 +163,10 @@ func (c *Cache) ListSpace(storageID, spaceID string) *Shares { // PersistWithTime persists the data of one space if it has not been modified since the given mtime func (c *Cache) PersistWithTime(ctx context.Context, storageID, spaceID string, mtime time.Time) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "PersistWithTime") + defer span.End() + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID)) + if c.Providers[storageID] == nil || c.Providers[storageID].Spaces[spaceID] == nil { return nil } @@ -187,15 +204,20 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + + span.SetAttributes(attribute.String("cs3.storageid", storageID), attribute.String("cs3.spaceid", spaceID)) + log := appctx.GetLogger(ctx).With().Str("storageID", storageID).Str("spaceID", spaceID).Logger() - log.Debug().Msg("Syncing provider cache...") var mtime time.Time if c.Providers[storageID] != nil && c.Providers[storageID].Spaces[spaceID] != nil { mtime = c.Providers[storageID].Spaces[spaceID].Mtime if time.Now().Before(c.Providers[storageID].Spaces[spaceID].nextSync) { - log.Debug().Msg("Skipping provider cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.Providers[storageID].Spaces[spaceID].nextSync = time.Now().Add(c.ttl) @@ -207,28 +229,33 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { info, err := c.storage.Stat(ctx, jsonPath) if err != nil { if _, ok := err.(errtypes.NotFound); ok { - log.Debug().Msg("no json file, nothing to sync") + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } if _, ok := err.(*os.PathError); ok { - log.Debug().Msg("no storage dir, nothing to sync") + span.AddEvent("no dir") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the provider cache") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating provider cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the provider cache") return err } newShares := &Shares{} err = json.Unmarshal(createdBlob, newShares) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the provider cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the provider cache") return err } @@ -236,7 +263,7 @@ func (c *Cache) Sync(ctx context.Context, storageID, spaceID string) error { c.initializeIfNeeded(storageID, spaceID) c.Providers[storageID].Spaces[spaceID] = newShares } - log.Debug().Msg("Provider cache is up to date") + span.SetStatus(codes.Ok, "") return nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go index 0693b802ce2..c58e761d86f 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go @@ -21,6 +21,7 @@ package receivedsharecache import ( "context" "encoding/json" + "fmt" "path" "path/filepath" "time" @@ -31,8 +32,13 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "receivedsharecache" + // Cache stores the list of received shares and their states // It functions as an in-memory cache with a persistence layer // The storage is sharded by user @@ -74,6 +80,10 @@ func New(s metadata.Storage, ttl time.Duration) Cache { // Add adds a new entry to the cache func (c *Cache) Add(ctx context.Context, userID, spaceID string, rs *collaboration.ReceivedShare) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID)) + if c.ReceivedSpaces[userID] == nil { c.ReceivedSpaces[userID] = &Spaces{ Spaces: map[string]*Space{}, @@ -106,13 +116,17 @@ func (c *Cache) Get(userID, spaceID, shareID string) *State { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() - log.Debug().Msg("Syncing received share cache...") var mtime time.Time if c.ReceivedSpaces[userID] != nil { if time.Now().Before(c.ReceivedSpaces[userID].nextSync) { - log.Debug().Msg("Skipping received share cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.ReceivedSpaces[userID].nextSync = time.Now().Add(c.ttl) @@ -123,38 +137,47 @@ func (c *Cache) Sync(ctx context.Context, userID string) error { } jsonPath := userJSONPath(userID) - info, err := c.storage.Stat(ctx, jsonPath) + info, err := c.storage.Stat(ctx, jsonPath) // TODO we only need the mtime ... use fieldmask to make the request cheaper if err != nil { if _, ok := err.(errtypes.NotFound); ok { + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the received share") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating received share cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, jsonPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the received share") return err } newSpaces := &Spaces{} err = json.Unmarshal(createdBlob, newSpaces) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the received share: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the received share") return err } newSpaces.Mtime = utils.TSToTime(info.Mtime) c.ReceivedSpaces[userID] = newSpaces } - log.Debug().Msg("Received share cache is up to date") + span.SetStatus(codes.Ok, "") return nil } // Persist persists the data for one user to the storage func (c *Cache) Persist(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + if c.ReceivedSpaces[userID] == nil { return nil } diff --git a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go index 7fdd7e392de..78ed5aacca0 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/sharecache/sharecache.go @@ -21,6 +21,7 @@ package sharecache import ( "context" "encoding/json" + "fmt" "path" "path/filepath" "time" @@ -30,8 +31,13 @@ import ( "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/shareid" "github.com/cs3org/reva/v2/pkg/storage/utils/metadata" "github.com/cs3org/reva/v2/pkg/utils" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "sharecache" + // Cache caches the list of share ids for users/groups // It functions as an in-memory cache with a persistence layer // The storage is sharded by user/group @@ -71,6 +77,10 @@ func New(s metadata.Storage, namespace, filename string, ttl time.Duration) Cach // Add adds a share to the cache func (c *Cache) Add(ctx context.Context, userid, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID)) + storageid, spaceid, _ := shareid.Decode(shareID) ssid := storageid + shareid.IDDelimiter + spaceid @@ -94,6 +104,10 @@ func (c *Cache) Add(ctx context.Context, userid, shareID string) error { // Remove removes a share for the given user func (c *Cache) Remove(ctx context.Context, userid, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Remove") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid), attribute.String("cs3.shareid", shareID)) + storageid, spaceid, _ := shareid.Decode(shareID) ssid := storageid + shareid.IDDelimiter + spaceid @@ -133,14 +147,18 @@ func (c *Cache) List(userid string) map[string]SpaceShareIDs { // Sync updates the in-memory data with the data from the storage if it is outdated func (c *Cache) Sync(ctx context.Context, userID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Sync") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + log := appctx.GetLogger(ctx).With().Str("userID", userID).Logger() - log.Debug().Msg("Syncing share cache...") var mtime time.Time // - do we have a cached list of created shares for the user in memory? if usc := c.UserShares[userID]; usc != nil { if time.Now().Before(c.UserShares[userID].nextSync) { - log.Debug().Msg("Skipping share cache sync, it was just recently synced...") + span.AddEvent("skip sync") + span.SetStatus(codes.Ok, "") return nil } c.UserShares[userID].nextSync = time.Now().Add(c.ttl) @@ -155,35 +173,44 @@ func (c *Cache) Sync(ctx context.Context, userID string) error { info, err := c.storage.Stat(ctx, userCreatedPath) if err != nil { if _, ok := err.(errtypes.NotFound); ok { + span.AddEvent("no file") + span.SetStatus(codes.Ok, "") return nil // Nothing to sync against } + span.SetStatus(codes.Error, fmt.Sprintf("Failed to stat the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to stat the share cache") return err } // check mtime of /users/{userid}/created.json if utils.TSToTime(info.Mtime).After(mtime) { - log.Debug().Msg("Updating share cache...") + span.AddEvent("updating cache") // - update cached list of created shares for the user in memory if changed createdBlob, err := c.storage.SimpleDownload(ctx, userCreatedPath) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to download the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to download the share cache") return err } newShareCache := &UserShareCache{} err = json.Unmarshal(createdBlob, newShareCache) if err != nil { + span.SetStatus(codes.Error, fmt.Sprintf("Failed to unmarshal the share cache: %s", err.Error())) log.Error().Err(err).Msg("Failed to unmarshal the share cache") return err } newShareCache.Mtime = utils.TSToTime(info.Mtime) c.UserShares[userID] = newShareCache } - log.Debug().Msg("Share cache is up to date") + span.SetStatus(codes.Ok, "") return nil } // Persist persists the data for one user/group to the storage func (c *Cache) Persist(ctx context.Context, userid string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Persist") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userid)) + oldMtime := c.UserShares[userid].Mtime c.UserShares[userid].Mtime = time.Now() diff --git a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go index 76cff96d122..3cbd2f1dc30 100644 --- a/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go +++ b/vendor/github.com/cs3org/reva/v2/pkg/storage/utils/metadata/cs3.go @@ -32,6 +32,7 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/rgrpc/status" @@ -40,6 +41,9 @@ import ( "google.golang.org/grpc/metadata" ) +// name is the Tracer name used to identify this instrumentation library. +const tracerName = "metadata.cs3" + // CS3 represents a metadata storage with a cs3 storage backend type CS3 struct { providerAddr string @@ -114,6 +118,9 @@ func (cs3 *CS3) Init(ctx context.Context, spaceid string) (err error) { // SimpleUpload uploads a file to the metadata storage func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []byte) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "SimpleUpload") + defer span.End() + return cs3.Upload(ctx, UploadRequest{ Path: uploadpath, Content: content, @@ -122,6 +129,9 @@ func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []b // Upload uploads a file to the metadata storage func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Upload") + defer span.End() + client, err := cs3.providerClient() if err != nil { return err @@ -185,6 +195,9 @@ func (cs3 *CS3) Upload(ctx context.Context, req UploadRequest) error { // Stat returns the metadata for the given path func (cs3 *CS3) Stat(ctx context.Context, path string) (*provider.ResourceInfo, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Stat") + defer span.End() + client, err := cs3.providerClient() if err != nil { return nil, err @@ -214,6 +227,9 @@ func (cs3 *CS3) Stat(ctx context.Context, path string) (*provider.ResourceInfo, // SimpleDownload reads a file from the metadata storage func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (content []byte, err error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "SimpleDownload") + defer span.End() + client, err := cs3.providerClient() if err != nil { return nil, err @@ -277,6 +293,9 @@ func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (conten // Delete deletes a path func (cs3 *CS3) Delete(ctx context.Context, path string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Delete") + defer span.End() + client, err := cs3.providerClient() if err != nil { return err @@ -304,6 +323,9 @@ func (cs3 *CS3) Delete(ctx context.Context, path string) error { // ReadDir returns the entries in a given directory func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ReadDir") + defer span.End() + infos, err := cs3.ListDir(ctx, path) if err != nil { return nil, err @@ -318,6 +340,9 @@ func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { // ListDir returns a list of ResourceInfos for the entries in a given directory func (cs3 *CS3) ListDir(ctx context.Context, path string) ([]*provider.ResourceInfo, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ListDir") + defer span.End() + client, err := cs3.providerClient() if err != nil { return nil, err @@ -347,6 +372,9 @@ func (cs3 *CS3) ListDir(ctx context.Context, path string) ([]*provider.ResourceI // MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "MakeDirIfNotExist") + defer span.End() + client, err := cs3.providerClient() if err != nil { return err @@ -395,6 +423,9 @@ func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error { // CreateSymlink creates a symlink func (cs3 *CS3) CreateSymlink(ctx context.Context, oldname, newname string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "CreateSymlink") + defer span.End() + if _, err := cs3.ResolveSymlink(ctx, newname); err == nil { return os.ErrExist } @@ -404,6 +435,9 @@ func (cs3 *CS3) CreateSymlink(ctx context.Context, oldname, newname string) erro // ResolveSymlink resolves a symlink func (cs3 *CS3) ResolveSymlink(ctx context.Context, name string) (string, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "ResolveSymlink") + defer span.End() + b, err := cs3.SimpleDownload(ctx, name) if err != nil { if errors.Is(err, errtypes.NotFound("")) { @@ -420,12 +454,15 @@ func (cs3 *CS3) providerClient() (provider.ProviderAPIClient, error) { } func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "getAuthContext") + defer span.End() + client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr) if err != nil { return nil, err } - authCtx := ctxpkg.ContextSetUser(context.Background(), cs3.serviceUser) + authCtx := ctxpkg.ContextSetUser(ctx, cs3.serviceUser) authRes, err := client.Authenticate(authCtx, &gateway.AuthenticateRequest{ Type: "machine", ClientId: "userid:" + cs3.serviceUser.Id.OpaqueId, diff --git a/vendor/modules.txt b/vendor/modules.txt index 2d2876e93e8..f86ac71cc1f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -352,7 +352,7 @@ github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1 github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1 github.com/cs3org/go-cs3apis/cs3/tx/v1beta1 github.com/cs3org/go-cs3apis/cs3/types/v1beta1 -# github.com/cs3org/reva/v2 v2.14.0 +# github.com/cs3org/reva/v2 v2.14.0 => github.com/dragotin/reva/v2 v2.0.0-20230606110628-a2480a54cfc9 ## explicit; go 1.19 github.com/cs3org/reva/v2/cmd/revad/internal/grace github.com/cs3org/reva/v2/cmd/revad/runtime @@ -2164,3 +2164,4 @@ stash.kopano.io/kgol/oidc-go ## explicit; go 1.13 stash.kopano.io/kgol/rndm # github.com/cs3org/go-cs3apis => github.com/2403905/go-cs3apis v0.0.0-20230517122726-727045414fd1 +# github.com/cs3org/reva/v2 => github.com/dragotin/reva/v2 v2.0.0-20230606110628-a2480a54cfc9