Skip to content

Commit

Permalink
concurrent stat requests when listing shares
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Aug 16, 2024
1 parent eae71ca commit b53d5bd
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 39 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/sharesstorageprovider-concurrent-stat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: concurrent stat requests when listing shares

The sharesstorageprovider now concurrently stats the accepted shares when listing the share jail. The default number of 5 workers can be changed by setting the `max_concurrency` value in the config map.

https://github.com/cs3org/reva/pull/4812
149 changes: 110 additions & 39 deletions internal/grpc/services/sharesstorageprovider/sharesstorageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"

"github.com/cs3org/reva/v2/pkg/storagespace"
"golang.org/x/sync/errgroup"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
Expand Down Expand Up @@ -60,11 +61,13 @@ func init() {
type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
MaxConcurrency int `mapstructure:"max_concurrency"`
}

type service struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient]
maxConcurrency int
}

func (s *service) Close() error {
Expand Down Expand Up @@ -98,14 +101,19 @@ func NewDefault(m map[string]interface{}, _ *grpc.Server) (rgrpc.Service, error)
return nil, errors.Wrap(err, "sharesstorageprovider: error getting UserShareProvider client")
}

return New(gatewaySelector, sharingCollaborationSelector)
if c.MaxConcurrency <= 0 {
c.MaxConcurrency = 5
}

return New(gatewaySelector, sharingCollaborationSelector, c.MaxConcurrency)
}

// New returns a new instance of the SharesStorageProvider service
func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient]) (rgrpc.Service, error) {
func New(gatewaySelector pool.Selectable[gateway.GatewayAPIClient], sharingCollaborationSelector pool.Selectable[collaboration.CollaborationAPIClient], maxConcurrency int) (rgrpc.Service, error) {
s := &service{
gatewaySelector: gatewaySelector,
sharingCollaborationSelector: sharingCollaborationSelector,
maxConcurrency: maxConcurrency,
}
return s, nil
}
Expand Down Expand Up @@ -399,7 +407,7 @@ func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStora
var shareInfo map[string]*provider.ResourceInfo
var err error
if fetchShares {
receivedShares, shareInfo, err = s.fetchShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ })
receivedShares, shareInfo, err = s.fetchAcceptedShares(ctx, req.Opaque, []string{}, &fieldmaskpb.FieldMask{ /*TODO mtime and etag only?*/ })
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
Expand Down Expand Up @@ -710,7 +718,7 @@ func (s *service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
if !ok {
return nil, fmt.Errorf("missing user in context")
}
receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -806,7 +814,7 @@ func (s *service) ListContainer(ctx context.Context, req *provider.ListContainer
// The root is empty, it is filled by mountpoints
// so, when accessing the root via /dav/spaces, we need to list the accepted shares with their mountpoint

receivedShares, shareMd, err := s.fetchShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
receivedShares, shareMd, err := s.fetchAcceptedShares(ctx, req.Opaque, req.ArbitraryMetadataKeys, req.FieldMask)
if err != nil {
return nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
}
Expand Down Expand Up @@ -1143,14 +1151,21 @@ func (s *service) rejectReceivedShare(ctx context.Context, receivedShare *collab
return errtypes.NewErrtypeFromStatus(res.Status)
}

func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
func (s *service) fetchAcceptedShares(ctx context.Context, opaque *typesv1beta1.Opaque, arbitraryMetadataKeys []string, fieldMask *field_mask.FieldMask) ([]*collaboration.ReceivedShare, map[string]*provider.ResourceInfo, error) {
sharingCollaborationClient, err := s.sharingCollaborationSelector.Next()
if err != nil {
return nil, nil, err
}

lsRes, err := sharingCollaborationClient.ListReceivedShares(ctx, &collaboration.ListReceivedSharesRequest{
// FIXME filter by received shares for resource id - listing all shares is tooo expensive!
Filters: []*collaboration.Filter{
{
Type: collaboration.Filter_TYPE_STATE,
Term: &collaboration.Filter_State{
State: collaboration.ShareState_SHARE_STATE_ACCEPTED,
},
},
},
})
if err != nil {
return nil, nil, errors.Wrap(err, "sharesstorageprovider: error calling ListReceivedSharesRequest")
Expand All @@ -1159,42 +1174,98 @@ func (s *service) fetchShares(ctx context.Context, opaque *typesv1beta1.Opaque,
return nil, nil, fmt.Errorf("sharesstorageprovider: error calling ListReceivedSharesRequest")
}

gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
return nil, nil, err
numWorkers := s.maxConcurrency
if len(lsRes.Shares) < numWorkers {
numWorkers = len(lsRes.Shares)
}
type res struct {
shareid string
info *provider.ResourceInfo
}
work := make(chan *collaboration.ReceivedShare, len(lsRes.Shares))
results := make(chan res, len(lsRes.Shares))

shareMetaData := make(map[string]*provider.ResourceInfo, len(lsRes.Shares))
for _, rs := range lsRes.Shares {
// only stat accepted shares
if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
continue
}
if rs.Share.ResourceId.SpaceId == "" {
// convert backwards compatible share id
rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId)
g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, share := range lsRes.Shares {
select {
case work <- share:
case <-ctx.Done():
return ctx.Err()
}
}
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: opaque,
Ref: &provider.Reference{ResourceId: rs.Share.ResourceId},
ArbitraryMetadataKeys: arbitraryMetadataKeys,
FieldMask: fieldMask,
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for rs := range work {

// only stat accepted shares
if rs.State != collaboration.ShareState_SHARE_STATE_ACCEPTED {
continue
}
if rs.Share.ResourceId.SpaceId == "" {
// convert backwards compatible share id
rs.Share.ResourceId.StorageId, rs.Share.ResourceId.SpaceId = storagespace.SplitStorageID(rs.Share.ResourceId.StorageId)
}

gatewayClient, err := s.gatewaySelector.Next()
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to select next gateway client")
return err
}
sRes, err := gatewayClient.Stat(ctx, &provider.StatRequest{
Opaque: opaque,
Ref: &provider.Reference{ResourceId: rs.Share.ResourceId},
ArbitraryMetadataKeys: arbitraryMetadataKeys,
FieldMask: fieldMask,
})
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to make stat call")
return err
}
if sRes.Status.Code != rpc.Code_CODE_OK {
appctx.GetLogger(ctx).Debug().
Interface("resourceID", rs.Share.ResourceId).
Interface("status", sRes.Status).
Msg("ListRecievedShares: failed to stat the resource")
continue
}
select {
case results <- res{shareid: rs.Share.Id.OpaqueId, info: sRes.Info}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
if err != nil {
appctx.GetLogger(ctx).Error().
Err(err).
Interface("resourceID", rs.Share.ResourceId).
Msg("ListRecievedShares: failed to make stat call")
continue
}
if sRes.Status.Code != rpc.Code_CODE_OK {
appctx.GetLogger(ctx).Debug().
Interface("resourceID", rs.Share.ResourceId).
Interface("status", sRes.Status).
Msg("ListRecievedShares: failed to stat the resource")
continue
}
shareMetaData[rs.Share.Id.OpaqueId] = sRes.Info
}

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

// some results might have been skipped, so we cannot preallocate the map
shareMetaData := make(map[string]*provider.ResourceInfo)
for r := range results {
shareMetaData[r.shareid] = r.info
}

if err := g.Wait(); err != nil {
return nil, nil, err
}

return lsRes.Shares, shareMetaData, nil
Expand Down

0 comments on commit b53d5bd

Please sign in to comment.