diff --git a/changelog/unreleased/sharesstorageprovider-concurrent-stat.md b/changelog/unreleased/sharesstorageprovider-concurrent-stat.md new file mode 100644 index 0000000000..843039bc54 --- /dev/null +++ b/changelog/unreleased/sharesstorageprovider-concurrent-stat.md @@ -0,0 +1,5 @@ +Enhancement: concurrent stat requests when listing shares + +The sharesstorageprovider now concurrently stats the accepted shares when listing the share jail. The default number of 5 workers can be changed by setting the `max_concurrency` value in the config map. + +https://github.com/cs3org/reva/pull/4812 diff --git a/internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go b/internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go index 97de593b6d..61efbacefa 100644 --- a/internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go +++ b/internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/cs3org/reva/v2/pkg/storagespace" + "golang.org/x/sync/errgroup" "google.golang.org/genproto/protobuf/field_mask" "google.golang.org/grpc" codes "google.golang.org/grpc/codes" @@ -60,11 +61,13 @@ func init() { type config struct { GatewayAddr string `mapstructure:"gateway_addr"` UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"` + MaxConcurrency int `mapstructure:"max_concurrency"` } type service struct { gatewaySelector pool.Selectable[gateway.GatewayAPIClient] sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient] + maxConcurrency int } func (s *service) Close() error { @@ -98,14 +101,19 @@ func NewDefault(m map[string]interface{}, _ *grpc.Server) (rgrpc.Service, error) return nil, errors.Wrap(err, "sharesstorageprovider: error getting UserShareProvider client") } - return New(gatewaySelector, sharingCollaborationSelector) + if c.MaxConcurrency <= 0 { + c.MaxConcurrency = 5 + } + + return New(gatewaySelector, sharingCollaborationSelector, c.MaxConcurrency) } // New returns a new instance of the SharesStorageProvider service -func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient]) (rgrpc.Service, error) { +func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient], maxConcurrency int) (rgrpc.Service, error) { s := &service{ gatewaySelector: gatewaySelector, sharingCollaborationSelector: sharingCollaborationSelector, + maxConcurrency: maxConcurrency, } return s, nil } @@ -399,7 +407,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora var shareInfo map[string]*provider.ResourceInfo var err error if fetchShares { - receivedShares, shareInfo, err = s.fetchShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ }) + receivedShares, shareInfo, err = s.fetchAcceptedShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ }) if err != nil { return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest") } @@ -710,7 +718,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide if !ok { return nil, fmt.Errorf("missing user in context") } - receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask) + receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask) if err != nil { return nil, err } @@ -806,7 +814,7 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer // The root is empty, it is filled by mountpoints // so, when accessing the root via /dav/spaces, we need to list the accepted shares with their mountpoint - receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask) + receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask) if err != nil { return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest") } @@ -1143,14 +1151,21 @@ func (s *service) rejectReceivedShare(ctx context.Context, receivedShare *collab return errtypes.NewErrtypeFromStatus(res.Status) } -func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) { +func (s *service) fetchAcceptedShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) { sharingCollaborationClient, err := s.sharingCollaborationSelector.Next() if err != nil { return nil, nil, err } lsRes, err := sharingCollaborationClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{ - // FIXME filter by received shares for resource id - listing all shares is tooo expensive! + Filters: []*collaboration.Filter{ + { + Type: collaboration.Filter_TYPE_STATE, + Term: &collaboration.Filter_State{ + State: collaboration.ShareState_SHARE_STATE_ACCEPTED, + }, + }, + }, }) if err != nil { return nil, nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest") @@ -1159,42 +1174,98 @@ func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque, return nil, nil, fmt.Errorf("sharesstorageprovider: error calling ListReceivedSharesRequest") } - gatewayClient, err := s.gatewaySelector.Next() - if err != nil { - return nil, nil, err + numWorkers := s.maxConcurrency + if len(lsRes.Shares) < numWorkers { + numWorkers = len(lsRes.Shares) + } + type res struct { + shareid string + info *provider.ResourceInfo } + work := make(chan *collaboration.ReceivedShare, len(lsRes.Shares)) + results := make(chan res, len(lsRes.Shares)) - shareMetaData := make(map[string]*provider.ResourceInfo, len(lsRes.Shares)) - for _, rs := range lsRes.Shares { - // only stat accepted shares - if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED { - continue - } - if rs.Share.ResourceId.SpaceId == "" { - // convert backwards compatible share id - rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId) + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for _, share := range lsRes.Shares { + select { + case work <- share: + case <-ctx.Done(): + return ctx.Err() + } } - sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{ - Opaque: opaque, - Ref: &provider.Reference{ResourceId: rs.Share.ResourceId}, - ArbitraryMetadataKeys: arbitraryMetadataKeys, - FieldMask: fieldMask, + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for rs := range work { + + // only stat accepted shares + if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED { + continue + } + if rs.Share.ResourceId.SpaceId == "" { + // convert backwards compatible share id + rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId) + } + + gatewayClient, err := s.gatewaySelector.Next() + if err != nil { + appctx.GetLogger(ctx).Error(). + Err(err). + Interface("resourceID", rs.Share.ResourceId). + Msg("ListRecievedShares: failed to select next gateway client") + return err + } + sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{ + Opaque: opaque, + Ref: &provider.Reference{ResourceId: rs.Share.ResourceId}, + ArbitraryMetadataKeys: arbitraryMetadataKeys, + FieldMask: fieldMask, + }) + if err != nil { + appctx.GetLogger(ctx).Error(). + Err(err). + Interface("resourceID", rs.Share.ResourceId). + Msg("ListRecievedShares: failed to make stat call") + return err + } + if sRes.Status.Code != rpc.Code_CODE_OK { + appctx.GetLogger(ctx).Debug(). + Interface("resourceID", rs.Share.ResourceId). + Interface("status", sRes.Status). + Msg("ListRecievedShares: failed to stat the resource") + continue + } + select { + case results <- res{shareid: rs.Share.Id.OpaqueId, info: sRes.Info}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil }) - if err != nil { - appctx.GetLogger(ctx).Error(). - Err(err). - Interface("resourceID", rs.Share.ResourceId). - Msg("ListRecievedShares: failed to make stat call") - continue - } - if sRes.Status.Code != rpc.Code_CODE_OK { - appctx.GetLogger(ctx).Debug(). - Interface("resourceID", rs.Share.ResourceId). - Interface("status", sRes.Status). - Msg("ListRecievedShares: failed to stat the resource") - continue - } - shareMetaData[rs.Share.Id.OpaqueId] = sRes.Info + } + + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + // some results might have been skipped, so we cannot preallocate the map + shareMetaData := make(map[string]*provider.ResourceInfo) + for r := range results { + shareMetaData[r.shareid] = r.info + } + + if err := g.Wait(); err != nil { + return nil, nil, err } return lsRes.Shares, shareMetaData, nil diff --git a/internal/grpc/services/sharesstorageprovider/sharesstorageprovider_test.go b/internal/grpc/services/sharesstorageprovider/sharesstorageprovider_test.go index 6c303bf155..5ebb75e9a0 100644 --- a/internal/grpc/services/sharesstorageprovider/sharesstorageprovider_test.go +++ b/internal/grpc/services/sharesstorageprovider/sharesstorageprovider_test.go @@ -361,7 +361,7 @@ var _ = Describe("Sharesstorageprovider", func() { }) JustBeforeEach(func() { - p, err := provider.New(gatewaySelector, sharingCollaborationSelector) + p, err := provider.New(gatewaySelector, sharingCollaborationSelector, 5) Expect(err).ToNot(HaveOccurred()) s = p.(sprovider.ProviderAPIServer) Expect(s).ToNot(BeNil())