Skip to content

Commit

Permalink
[tests-only] Refactor gateway caching (#2372)
Browse files Browse the repository at this point in the history
* illustrate the problem

* add changelog

* don't clean cache when creating new spaces

* use Contains instead HasPrefix for key invalidation

* don't cache shares for the moment

* clean by opaque and storageid seperately

* introduce caches struct

* move providercache to caches struct

* deactivate providercache

* move create home caching

* remove etag cache

* fix linting

* deactivate cashing temporarily

* reactivate statcache

* refine logic for creating cache key

* reactivate home cache

* activate provider cache

* change type of Chaches struct

* add comment for userkey function

* more PR comments

* readd wrongly deleted line

Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj authored Dec 23, 2021
1 parent 7f0a043 commit 06154a2
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 152 deletions.
32 changes: 14 additions & 18 deletions internal/grpc/services/gateway/authprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *svc) Authenticate(ctx context.Context, req *gateway.AuthenticateRequest
// token obtained from the updated scope in the context.
token, err := s.tokenmgr.MintToken(ctx, &u, res.TokenScope)
if err != nil {
err = errors.Wrap(err, "authsvc: error in MintToken")
res := &gateway.AuthenticateResponse{
Status: status.NewUnauthenticated(ctx, err, "error creating access token"),
}
Expand Down Expand Up @@ -151,25 +152,20 @@ func (s *svc) Authenticate(ctx context.Context, req *gateway.AuthenticateRequest
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, token) // TODO(jfd): hardcoded metadata key. use PerRPCCredentials?

// create home directory
if _, err = s.createHomeCache.Get(res.User.Id.OpaqueId); err != nil {
createHomeRes, err := s.CreateHome(ctx, &storageprovider.CreateHomeRequest{})
if err != nil {
log.Err(err).Msg("error calling CreateHome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}
createHomeRes, err := s.CreateHome(ctx, &storageprovider.CreateHomeRequest{})
if err != nil {
log.Err(err).Msg("error calling CreateHome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}

if createHomeRes.Status.Code != rpc.Code_CODE_OK && createHomeRes.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createHomeRes.Status.Code, "gateway")
log.Err(err).Msg("error calling Createhome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}
if s.c.CreateHomeCacheTTL > 0 {
_ = s.createHomeCache.Set(res.User.Id.OpaqueId, true)
}
if createHomeRes.Status.Code != rpc.Code_CODE_OK && createHomeRes.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createHomeRes.Status.Code, "gateway")
log.Err(err).Msg("error calling Createhome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}

gwRes := &gateway.AuthenticateResponse{
Expand Down
49 changes: 9 additions & 40 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"fmt"
"net/url"
"strings"
"time"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/sharedconf"
Expand Down Expand Up @@ -122,14 +120,10 @@ func (c *config) init() {
}

type svc struct {
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
createHomeCache *ttlcache.Cache `mapstructure:"create_home_cache"`
providerCache *ttlcache.Cache `mapstructure:"provider_cache"`
statCache *ttlcache.Cache `mapstructure:"stat_cache"`
// mountCache *ttlcache.Cache `mapstructure:"mount_cache"`
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
cache Caches
}

// New creates a new gateway svc that acts as a proxy for any grpc operation.
Expand All @@ -154,36 +148,11 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}

// if the ttl is 0, aka not set, the cache lib will default to an hour
etagCache := ttlcache.NewCache()
_ = etagCache.SetTTL(time.Duration(c.EtagCacheTTL) * time.Second)
etagCache.SkipTTLExtensionOnHit(true)

createHomeCache := ttlcache.NewCache()
_ = createHomeCache.SetTTL(time.Duration(c.CreateHomeCacheTTL) * time.Second)
createHomeCache.SkipTTLExtensionOnHit(true)

providerCache := ttlcache.NewCache()
_ = providerCache.SetTTL(time.Duration(c.ProviderCacheTTL) * time.Second)
providerCache.SkipTTLExtensionOnHit(true)

statCache := ttlcache.NewCache()
_ = statCache.SetTTL(time.Duration(c.StatCacheTTL) * time.Second)
statCache.SkipTTLExtensionOnHit(true)

// mountCache := ttlcache.NewCache()
// _ = mountCache.SetTTL(time.Duration(c.MountCacheTTL) * time.Second)
// mountCache.SkipTTLExtensionOnHit(true)

s := &svc{
c: c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
createHomeCache: createHomeCache,
providerCache: providerCache,
statCache: statCache,
// mountCache: mountCache,
c: c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
cache: NewCaches(c.StatCacheTTL, c.CreateHomeCacheTTL, c.ProviderCacheTTL),
}

return s, nil
Expand All @@ -194,7 +163,7 @@ func (s *svc) Register(ss *grpc.Server) {
}

func (s *svc) Close() error {
s.etagCache.Close()
s.cache.Close()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/grpc/services/gateway/publicshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *svc) CreatePublicShare(ctx context.Context, req *link.CreatePublicShare
return nil, err
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
return res, nil
}

Expand All @@ -60,7 +60,7 @@ func (s *svc) RemovePublicShare(ctx context.Context, req *link.RemovePublicShare
return nil, err
}
// TODO: How to find out the resourceId?
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), nil)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), nil)
return res, nil
}

Expand Down Expand Up @@ -138,6 +138,6 @@ func (s *svc) UpdatePublicShare(ctx context.Context, req *link.UpdatePublicShare
if err != nil {
return nil, errors.Wrap(err, "error updating share")
}
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
return res, nil
}
88 changes: 31 additions & 57 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag
}
}

srClient, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
}
}

c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorag
Status: status.NewInternal(ctx, "error calling UpdateStorageSpace"),
}, nil
}
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.StorageSpace.Root)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.StorageSpace.Root)
return res, nil
}

Expand Down Expand Up @@ -369,14 +369,14 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
}, nil
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), &provider.ResourceId{OpaqueId: req.Id.OpaqueId})
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), &provider.ResourceId{OpaqueId: req.Id.OpaqueId})
return res, nil
}

func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
currentUser := ctxpkg.ContextMustGetUser(ctx)

srClient, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile
}
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return &gateway.InitiateFileUploadResponse{
Opaque: storageRes.Opaque,
Status: storageRes.Status,
Expand Down Expand Up @@ -586,7 +586,7 @@ func (s *svc) CreateContainer(ctx context.Context, req *provider.CreateContainer
return nil, errors.Wrap(err, "gateway: error calling CreateContainer")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -630,7 +630,7 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide
return nil, errors.Wrap(err, "gateway: error calling Delete")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -672,8 +672,8 @@ func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
}
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Source.ResourceId)
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Destination.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Source.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Destination.ResourceId)
return c.Move(ctx, req)
}

Expand All @@ -696,7 +696,7 @@ func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitra
return nil, errors.Wrap(err, "gateway: error calling SetArbitraryMetadata")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand All @@ -719,7 +719,7 @@ func (s *svc) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArb
return nil, errors.Wrap(err, "gateway: error calling UnsetArbitraryMetadata")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1092,7 +1092,7 @@ func (s *svc) RestoreFileVersion(ctx context.Context, req *provider.RestoreFileV
return nil, errors.Wrap(err, "gateway: error calling RestoreFileVersion")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1315,9 +1315,9 @@ func (s *svc) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecyc
return nil, errors.Wrap(err, "gateway: error calling RestoreRecycleItem")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
if req.RestoreRef != nil {
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.RestoreRef.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.RestoreRef.ResourceId)
}

return res, nil
Expand All @@ -1340,7 +1340,7 @@ func (s *svc) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleReques
return nil, errors.Wrap(err, "gateway: error calling PurgeRecycle")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1418,7 +1418,17 @@ func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderIn
return nil, err
}

return Cached(c, s.statCache), nil
return s.cache.StorageProviderClient(c), nil
}

func (s *svc) getStorageRegistryClient(_ context.Context, address string) (registry.RegistryAPIClient, error) {
c, err := pool.GetStorageRegistryClient(address)
if err != nil {
err = errors.Wrap(err, "gateway: error getting a storage provider client")
return nil, err
}

return s.cache.StorageRegistryClient(c), nil
}

/*
Expand All @@ -1441,45 +1451,16 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
switch {
case ref == nil:
return nil, errtypes.BadRequest("missing reference")
case ref.ResourceId != nil: // can we use the provider cache?
// only the StorageId is used to look up the provider. the opaqueid can only be a share and as such part of a storage
if value, exists := s.providerCache.Get(ref.ResourceId.StorageId); exists == nil {
if providers, ok := value.([]*registry.ProviderInfo); ok {
return providers, nil
}
}
case ref.ResourceId != nil:
// no action needed in that case
case ref.Path != "": // TODO implement a mount path cache in the registry?
/*
// path / mount point lookup from cache
if value, exists := s.mountCache.Get(userKey(ctx)); exists == nil {
if m, ok := value.(map[string][]*registry.ProviderInfo); ok {
providers := make([]*registry.ProviderInfo, 0, len(m))
deepestMountPath := ""
for mountPath, providerInfos := range m {
switch {
case strings.HasPrefix(mountPath, ref.Path):
// and add all providers below and exactly matching the path
// requested /foo, mountPath /foo/sub
providers = append(providers, providerInfos...)
case strings.HasPrefix(ref.Path, mountPath) && len(mountPath) > len(deepestMountPath):
// eg. three providers: /foo, /foo/sub, /foo/sub/bar
// requested /foo/sub/mob
deepestMountPath = mountPath
}
}
if deepestMountPath != "" {
providers = append(providers, m[deepestMountPath]...)
}
return providers, nil
}
}
*/
// nothing to do here either
default:
return nil, errtypes.BadRequest("invalid reference, at least path or id must be set")
}

// lookup
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand All @@ -1497,7 +1478,6 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
}
sdk.EncodeOpaqueMap(listReq.Opaque, filters)
res, err := c.ListStorageProviders(ctx, listReq)

if err != nil {
return nil, errors.Wrap(err, "gateway: error calling ListStorageProviders")
}
Expand All @@ -1522,12 +1502,6 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
return nil, errtypes.NotFound("gateway: provider is nil")
}

if ref.ResourceId != nil {
if err = s.providerCache.Set(ref.ResourceId.StorageId, res.Providers); err != nil {
appctx.GetLogger(ctx).Warn().Err(err).Interface("reference", ref).Msg("gateway: could not cache providers")
}
}

return res.Providers, nil
}

Expand Down
Loading

0 comments on commit 06154a2

Please sign in to comment.