Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concurrent stat requests when listing shares #4812

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: concurrent stat requests when listing shares

The sharesstorageprovider now concurrently stats the accepted shares when listing the share jail. The default number of 5 workers can be changed by setting the `max_concurrency` value in the config map.

https://github.com/cs3org/reva/pull/4812
149 changes: 110 additions & 39 deletions internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"

"github.com/cs3org/reva/v2/pkg/storagespace"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
Expand Down Expand Up @@ -60,11 +61,13 @@ func init() {
type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
MaxConcurrency int `mapstructure:"max_concurrency"`
}

type service struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient]
maxConcurrency int
}

func (s *service) Close() error {
Expand Down Expand Up @@ -98,14 +101,19 @@ func NewDefault(m map[string]interface{}, _ *grpc.Server) (rgrpc.Service, error)
return nil, errors.Wrap(err, "sharesstorageprovider: error getting UserShareProvider client")
}

return New(gatewaySelector, sharingCollaborationSelector)
if c.MaxConcurrency <= 0 {
c.MaxConcurrency = 5
}

return New(gatewaySelector, sharingCollaborationSelector, c.MaxConcurrency)
}

// New returns a new instance of the SharesStorageProvider service
func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient]) (rgrpc.Service, error) {
func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient], maxConcurrency int) (rgrpc.Service, error) {
s := &service{
gatewaySelector: gatewaySelector,
sharingCollaborationSelector: sharingCollaborationSelector,
maxConcurrency: maxConcurrency,
}
return s, nil
}
Expand Down Expand Up @@ -399,7 +407,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
var shareInfo map[string]*provider.ResourceInfo
var err error
if fetchShares {
receivedShares, shareInfo, err = s.fetchShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ })
receivedShares, shareInfo, err = s.fetchAcceptedShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ })
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
Expand Down Expand Up @@ -710,7 +718,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
if !ok {
return nil, fmt.Errorf("missing user in context")
}
receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -806,7 +814,7 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
// The root is empty, it is filled by mountpoints
// so, when accessing the root via /dav/spaces, we need to list the accepted shares with their mountpoint

receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
Expand Down Expand Up @@ -1143,14 +1151,21 @@ func (s *service) rejectReceivedShare(ctx context.Context, receivedShare *collab
return errtypes.NewErrtypeFromStatus(res.Status)
}

func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
func (s *service) fetchAcceptedShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
sharingCollaborationClient, err := s.sharingCollaborationSelector.Next()
if err != nil {
return nil, nil, err
}

lsRes, err := sharingCollaborationClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
// FIXME filter by received shares for resource id - listing all shares is tooo expensive!
Filters: []*collaboration.Filter{
{
Type: collaboration.Filter_TYPE_STATE,
Term: &collaboration.Filter_State{
State: collaboration.ShareState_SHARE_STATE_ACCEPTED,
},
},
},
})
if err != nil {
return nil, nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
Expand All @@ -1159,42 +1174,98 @@ func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque,
return nil, nil, fmt.Errorf("sharesstorageprovider: error calling ListReceivedSharesRequest")
}

gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return nil, nil, err
numWorkers := s.maxConcurrency
if len(lsRes.Shares) < numWorkers {
numWorkers = len(lsRes.Shares)
}
type res struct {
shareid string
info *provider.ResourceInfo
}
work := make(chan *collaboration.ReceivedShare, len(lsRes.Shares))
results := make(chan res, len(lsRes.Shares))

shareMetaData := make(map[string]*provider.ResourceInfo, len(lsRes.Shares))
for _, rs := range lsRes.Shares {
// only stat accepted shares
if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
continue
}
if rs.Share.ResourceId.SpaceId == "" {
// convert backwards compatible share id
rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId)
g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, share := range lsRes.Shares {
select {
case work <- share:
case <-ctx.Done():
return ctx.Err()
}
}
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: opaque,
Ref: &provider.Reference{ResourceId: rs.Share.ResourceId},
ArbitraryMetadataKeys: arbitraryMetadataKeys,
FieldMask: fieldMask,
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for rs := range work {

// only stat accepted shares
if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
continue
}
if rs.Share.ResourceId.SpaceId == "" {
// convert backwards compatible share id
rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId)
}

gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to select next gateway client")
return err
}
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: opaque,
Ref: &provider.Reference{ResourceId: rs.Share.ResourceId},
ArbitraryMetadataKeys: arbitraryMetadataKeys,
FieldMask: fieldMask,
})
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to make stat call")
return err
}
if sRes.Status.Code != rpc.Code_CODE_OK {
appctx.GetLogger(ctx).Debug().
Interface("resourceID", rs.Share.ResourceId).
Interface("status", sRes.Status).
Msg("ListRecievedShares: failed to stat the resource")
continue
}
select {
case results <- res{shareid: rs.Share.Id.OpaqueId, info: sRes.Info}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to make stat call")
continue
}
if sRes.Status.Code != rpc.Code_CODE_OK {
appctx.GetLogger(ctx).Debug().
Interface("resourceID", rs.Share.ResourceId).
Interface("status", sRes.Status).
Msg("ListRecievedShares: failed to stat the resource")
continue
}
shareMetaData[rs.Share.Id.OpaqueId] = sRes.Info
}

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

// some results might have been skipped, so we cannot preallocate the map
shareMetaData := make(map[string]*provider.ResourceInfo)
for r := range results {
shareMetaData[r.shareid] = r.info
}

if err := g.Wait(); err != nil {
return nil, nil, err
}

return lsRes.Shares, shareMetaData, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ var _ = Describe("Sharesstorageprovider", func() {
})

JustBeforeEach(func() {
p, err := provider.New(gatewaySelector, sharingCollaborationSelector)
p, err := provider.New(gatewaySelector, sharingCollaborationSelector, 5)
Expect(err).ToNot(HaveOccurred())
s = p.(sprovider.ProviderAPIServer)
Expect(s).ToNot(BeNil())
Expand Down
Loading