Skip to content

Commit

Permalink
always find unique providers
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Jul 16, 2024
1 parent ab146ae commit af606e7
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 72 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/always-find-unique-providers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: Always find unique providers

The gateway will now always try to find a single unique provider. It has stopped aggregating multiple ListContainer responses when we removed any business logic from it.

https://github.com/cs3org/reva/pull/4741
https://github.com/cs3org/reva/pull/4740
https://github.com/cs3org/reva/pull/2394
74 changes: 4 additions & 70 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (s *svc) Unlock(ctx context.Context, req *provider.UnlockRequest) (*provide

// Stat returns the Resoure info for a given resource by forwarding the request to the responsible provider.
func (s *svc) Stat(ctx context.Context, req *provider.StatRequest) (*provider.StatResponse, error) {
c, _, ref, err := s.findAndUnwrapUnique(ctx, req.Ref)
c, _, ref, err := s.findAndUnwrap(ctx, req.Ref)
if err != nil {
return &provider.StatResponse{
Status: status.NewNotFound(ctx, fmt.Sprintf("gateway could not find space for ref=%+v", req.Ref)),
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func (s *svc) GetQuota(ctx context.Context, req *gateway.GetQuotaRequest) (*prov
// - contains the provider path, which is the mount point of the provider
// - may contain a list of storage spaces with their id and space path
func (s *svc) find(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, *registry.ProviderInfo, error) {
p, err := s.findSpaces(ctx, ref)
p, err := s.findSingleSpace(ctx, ref)
if err != nil {
return nil, nil, err
}
Expand All @@ -1028,32 +1028,20 @@ func (s *svc) find(ctx context.Context, ref *provider.Reference) (provider.Provi
return client, p[0], err
}

// findSpacesProvider looks up the provider that is responsible for the given request
// findSpacesProvider looks up the spaces provider that is responsible for the given request
// It will return a client that the caller can use to make the call, as well as the ProviderInfo. It:
// - contains the provider path, which is the mount point of the provider
// - may contain a list of storage spaces with their id and space path
func (s *svc) findSpacesProvider(ctx context.Context, ref *provider.Reference) (provider.SpacesAPIClient, *registry.ProviderInfo, error) {
p, err := s.findSpaces(ctx, ref)
if err != nil {
return nil, nil, err
}

client, err := s.getSpacesProviderClient(ctx, p[0])
return client, p[0], err
}

func (s *svc) findUnique(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, *registry.ProviderInfo, error) {
p, err := s.findSingleSpace(ctx, ref)
if err != nil {
return nil, nil, err
}

client, err := s.getStorageProviderClient(ctx, p[0])
client, err := s.getSpacesProviderClient(ctx, p[0])
return client, p[0], err
}

// FIXME findAndUnwrap currently just returns the first provider ... which may not be what is needed.
// for the ListRecycle call we need an exact match, for Stat and List we need to query all related providers
func (s *svc) findAndUnwrap(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, *registry.ProviderInfo, *provider.Reference, error) {
c, p, err := s.find(ctx, ref)
if err != nil {
Expand All @@ -1075,27 +1063,6 @@ func (s *svc) findAndUnwrap(ctx context.Context, ref *provider.Reference) (provi
return c, p, relativeReference, nil
}

func (s *svc) findAndUnwrapUnique(ctx context.Context, ref *provider.Reference) (provider.ProviderAPIClient, *registry.ProviderInfo, *provider.Reference, error) {
c, p, err := s.findUnique(ctx, ref)
if err != nil {
return nil, nil, nil, err
}

var (
root *provider.ResourceId
mountPath string
)
for _, space := range decodeSpaces(p) {
mountPath = decodePath(space)
root = space.Root
break // TODO can there be more than one space for a path?
}

relativeReference := unwrap(ref, mountPath, root)

return c, p, relativeReference, nil
}

func (s *svc) getSpacesProviderClient(_ context.Context, p *registry.ProviderInfo) (provider.SpacesAPIClient, error) {
c, err := pool.GetSpacesProviderServiceClient(p.Address)
if err != nil {
Expand Down Expand Up @@ -1131,39 +1098,6 @@ func (s *svc) getStorageRegistryClient(_ context.Context, address string) (regis
}, nil
}

func (s *svc) findSpaces(ctx context.Context, ref *provider.Reference) ([]*registry.ProviderInfo, error) {
switch {
case ref == nil:
return nil, errtypes.BadRequest("missing reference")
case ref.ResourceId != nil:
if ref.ResourceId.OpaqueId == "" {
ref.ResourceId.OpaqueId = ref.ResourceId.SpaceId
}
case ref.Path != "": // TODO implement a mount path cache in the registry?
// nothing to do here either
default:
return nil, errtypes.BadRequest("invalid reference, at least path or id must be set")
}

filters := map[string]string{
"mask": "root", // we only need the root for routing
"path": ref.Path,
}
if ref.ResourceId != nil {
filters["storage_id"] = ref.ResourceId.StorageId
filters["space_id"] = ref.ResourceId.SpaceId
filters["opaque_id"] = ref.ResourceId.OpaqueId
}

listReq := &registry.ListStorageProvidersRequest{
Opaque: &typesv1beta1.Opaque{Map: make(map[string]*typesv1beta1.OpaqueEntry)},
}

sdk.EncodeOpaqueMap(listReq.Opaque, filters)

return s.findProvider(ctx, listReq)
}

func (s *svc) findSingleSpace(ctx context.Context, ref *provider.Reference) ([]*registry.ProviderInfo, error) {
switch {
case ref == nil:
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa
// matches = /foo/bar <=> /foo/bar -> list(spaceid, .)
// below = /foo/bar/bif <=> /foo/bar -> list(spaceid, ./bif)
func (r *registry) ListProviders(ctx context.Context, filters map[string]string) ([]*registrypb.ProviderInfo, error) {
b, _ := strconv.ParseBool(filters["unique"])
unique, _ := strconv.ParseBool(filters["unique"])
unrestricted, _ := strconv.ParseBool(filters["unrestricted"])
mask := filters["mask"]
switch {
Expand All @@ -284,7 +284,7 @@ func (r *registry) ListProviders(ctx context.Context, filters map[string]string)

return r.findProvidersForResource(ctx, id, findMountpoint, findGrant, unrestricted, mask), nil
case filters["path"] != "":
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], b, unrestricted, mask), nil
return r.findProvidersForAbsolutePathReference(ctx, filters["path"], unique, unrestricted, mask), nil
case len(filters) == 0:
// return all providers
return r.findAllProviders(ctx, mask), nil
Expand Down

0 comments on commit af606e7

Please sign in to comment.