Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tests-only] Refactor gateway caching #2372

Merged
merged 25 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c935e25
illustrate the problem
kobergj Dec 10, 2021
7718670
add changelog
kobergj Dec 10, 2021
f4d5b57
don't clean cache when creating new spaces
kobergj Dec 10, 2021
c9e98bc
use Contains instead HasPrefix for key invalidation
kobergj Dec 10, 2021
eccf4b9
don't cache shares for the moment
kobergj Dec 13, 2021
7f0a51f
clean by opaque and storageid seperately
kobergj Dec 13, 2021
e19bdb3
introduce caches struct
kobergj Dec 13, 2021
9370bda
move providercache to caches struct
kobergj Dec 13, 2021
70ffac8
Merge branch 'edge' into refactor-gateway-caching
kobergj Dec 14, 2021
4e63b7e
deactivate providercache
kobergj Dec 14, 2021
861d188
move create home caching
kobergj Dec 14, 2021
d62b8a4
remove etag cache
kobergj Dec 14, 2021
a67d9d2
Merge branch 'edge' into refactor-gateway-caching
kobergj Dec 14, 2021
d3445b1
Merge branch 'edge' into refactor-gateway-caching
kobergj Dec 15, 2021
4beff7b
fix linting
kobergj Dec 15, 2021
65f1bac
deactivate cashing temporarily
kobergj Dec 16, 2021
1f33fd7
reactivate statcache
kobergj Dec 16, 2021
737ee5a
refine logic for creating cache key
kobergj Dec 16, 2021
dc5e557
reactivate home cache
kobergj Dec 16, 2021
b4c4051
activate provider cache
kobergj Dec 16, 2021
ee228da
change type of Chaches struct
kobergj Dec 16, 2021
bec31cf
add comment for userkey function
kobergj Dec 17, 2021
32310fe
more PR comments
kobergj Dec 17, 2021
d9e2cda
Merge branch 'edge' into refactor-gateway-caching
kobergj Dec 23, 2021
0650343
readd wrongly deleted line
kobergj Dec 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions internal/grpc/services/gateway/authprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *svc) Authenticate(ctx context.Context, req *gateway.AuthenticateRequest
// token obtained from the updated scope in the context.
token, err := s.tokenmgr.MintToken(ctx, &u, res.TokenScope)
if err != nil {
err = errors.Wrap(err, "authsvc: error in MintToken")
res := &gateway.AuthenticateResponse{
Status: status.NewUnauthenticated(ctx, err, "error creating access token"),
}
Expand Down Expand Up @@ -151,25 +152,20 @@ func (s *svc) Authenticate(ctx context.Context, req *gateway.AuthenticateRequest
ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, token) // TODO(jfd): hardcoded metadata key. use PerRPCCredentials?

// create home directory
if _, err = s.createHomeCache.Get(res.User.Id.OpaqueId); err != nil {
createHomeRes, err := s.CreateHome(ctx, &storageprovider.CreateHomeRequest{})
if err != nil {
log.Err(err).Msg("error calling CreateHome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}
createHomeRes, err := s.CreateHome(ctx, &storageprovider.CreateHomeRequest{})
if err != nil {
log.Err(err).Msg("error calling CreateHome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}

if createHomeRes.Status.Code != rpc.Code_CODE_OK && createHomeRes.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createHomeRes.Status.Code, "gateway")
log.Err(err).Msg("error calling Createhome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}
if s.c.CreateHomeCacheTTL > 0 {
_ = s.createHomeCache.Set(res.User.Id.OpaqueId, true)
}
if createHomeRes.Status.Code != rpc.Code_CODE_OK && createHomeRes.Status.Code != rpc.Code_CODE_ALREADY_EXISTS {
err := status.NewErrorFromCode(createHomeRes.Status.Code, "gateway")
log.Err(err).Msg("error calling Createhome")
return &gateway.AuthenticateResponse{
Status: status.NewInternal(ctx, "error creating user home"),
}, nil
}

gwRes := &gateway.AuthenticateResponse{
Expand Down
49 changes: 9 additions & 40 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"fmt"
"net/url"
"strings"
"time"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/sharedconf"
Expand Down Expand Up @@ -122,14 +120,10 @@ func (c *config) init() {
}

type svc struct {
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
createHomeCache *ttlcache.Cache `mapstructure:"create_home_cache"`
providerCache *ttlcache.Cache `mapstructure:"provider_cache"`
statCache *ttlcache.Cache `mapstructure:"stat_cache"`
// mountCache *ttlcache.Cache `mapstructure:"mount_cache"`
c *config
dataGatewayURL url.URL
tokenmgr token.Manager
cache Caches
}

// New creates a new gateway svc that acts as a proxy for any grpc operation.
Expand All @@ -154,36 +148,11 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}

// if the ttl is 0, aka not set, the cache lib will default to an hour
etagCache := ttlcache.NewCache()
_ = etagCache.SetTTL(time.Duration(c.EtagCacheTTL) * time.Second)
etagCache.SkipTTLExtensionOnHit(true)

createHomeCache := ttlcache.NewCache()
_ = createHomeCache.SetTTL(time.Duration(c.CreateHomeCacheTTL) * time.Second)
createHomeCache.SkipTTLExtensionOnHit(true)

providerCache := ttlcache.NewCache()
_ = providerCache.SetTTL(time.Duration(c.ProviderCacheTTL) * time.Second)
providerCache.SkipTTLExtensionOnHit(true)

statCache := ttlcache.NewCache()
_ = statCache.SetTTL(time.Duration(c.StatCacheTTL) * time.Second)
statCache.SkipTTLExtensionOnHit(true)

// mountCache := ttlcache.NewCache()
// _ = mountCache.SetTTL(time.Duration(c.MountCacheTTL) * time.Second)
// mountCache.SkipTTLExtensionOnHit(true)

s := &svc{
c: c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
createHomeCache: createHomeCache,
providerCache: providerCache,
statCache: statCache,
// mountCache: mountCache,
c: c,
dataGatewayURL: *u,
tokenmgr: tokenManager,
cache: NewCaches(c.StatCacheTTL, c.CreateHomeCacheTTL, c.ProviderCacheTTL),
}

return s, nil
Expand All @@ -194,7 +163,7 @@ func (s *svc) Register(ss *grpc.Server) {
}

func (s *svc) Close() error {
s.etagCache.Close()
s.cache.Close()
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/grpc/services/gateway/publicshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *svc) CreatePublicShare(ctx context.Context, req *link.CreatePublicShare
return nil, err
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
return res, nil
}

Expand All @@ -60,7 +60,7 @@ func (s *svc) RemovePublicShare(ctx context.Context, req *link.RemovePublicShare
return nil, err
}
// TODO: How to find out the resourceId?
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), nil)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), nil)
return res, nil
}

Expand Down Expand Up @@ -138,6 +138,6 @@ func (s *svc) UpdatePublicShare(ctx context.Context, req *link.UpdatePublicShare
if err != nil {
return nil, errors.Wrap(err, "error updating share")
}
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.Share.ResourceId)
return res, nil
}
88 changes: 31 additions & 57 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag
}
}

srClient, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
}
}

c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorag
Status: status.NewInternal(ctx, "error calling UpdateStorageSpace"),
}, nil
}
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), res.StorageSpace.Root)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), res.StorageSpace.Root)
return res, nil
}

Expand Down Expand Up @@ -369,14 +369,14 @@ func (s *svc) DeleteStorageSpace(ctx context.Context, req *provider.DeleteStorag
}, nil
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), &provider.ResourceId{OpaqueId: req.Id.OpaqueId})
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), &provider.ResourceId{OpaqueId: req.Id.OpaqueId})
return res, nil
}

func (s *svc) GetHome(ctx context.Context, _ *provider.GetHomeRequest) (*provider.GetHomeResponse, error) {
currentUser := ctxpkg.ContextMustGetUser(ctx)

srClient, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand Down Expand Up @@ -543,7 +543,7 @@ func (s *svc) InitiateFileUpload(ctx context.Context, req *provider.InitiateFile
}
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return &gateway.InitiateFileUploadResponse{
Opaque: storageRes.Opaque,
Status: storageRes.Status,
Expand Down Expand Up @@ -586,7 +586,7 @@ func (s *svc) CreateContainer(ctx context.Context, req *provider.CreateContainer
return nil, errors.Wrap(err, "gateway: error calling CreateContainer")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -630,7 +630,7 @@ func (s *svc) Delete(ctx context.Context, req *provider.DeleteRequest) (*provide
return nil, errors.Wrap(err, "gateway: error calling Delete")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -672,8 +672,8 @@ func (s *svc) Move(ctx context.Context, req *provider.MoveRequest) (*provider.Mo
}
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Source.ResourceId)
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Destination.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Source.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Destination.ResourceId)
return c.Move(ctx, req)
}

Expand All @@ -696,7 +696,7 @@ func (s *svc) SetArbitraryMetadata(ctx context.Context, req *provider.SetArbitra
return nil, errors.Wrap(err, "gateway: error calling SetArbitraryMetadata")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand All @@ -719,7 +719,7 @@ func (s *svc) UnsetArbitraryMetadata(ctx context.Context, req *provider.UnsetArb
return nil, errors.Wrap(err, "gateway: error calling UnsetArbitraryMetadata")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1092,7 +1092,7 @@ func (s *svc) RestoreFileVersion(ctx context.Context, req *provider.RestoreFileV
return nil, errors.Wrap(err, "gateway: error calling RestoreFileVersion")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1315,9 +1315,9 @@ func (s *svc) RestoreRecycleItem(ctx context.Context, req *provider.RestoreRecyc
return nil, errors.Wrap(err, "gateway: error calling RestoreRecycleItem")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
if req.RestoreRef != nil {
RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.RestoreRef.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.RestoreRef.ResourceId)
}

return res, nil
Expand All @@ -1340,7 +1340,7 @@ func (s *svc) PurgeRecycle(ctx context.Context, req *provider.PurgeRecycleReques
return nil, errors.Wrap(err, "gateway: error calling PurgeRecycle")
}

RemoveFromCache(s.statCache, ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
s.cache.RemoveStat(ctxpkg.ContextMustGetUser(ctx), req.Ref.ResourceId)
return res, nil
}

Expand Down Expand Up @@ -1418,7 +1418,17 @@ func (s *svc) getStorageProviderClient(_ context.Context, p *registry.ProviderIn
return nil, err
}

return Cached(c, s.statCache), nil
return s.cache.StorageProviderClient(c), nil
}

func (s *svc) getStorageRegistryClient(_ context.Context, address string) (registry.RegistryAPIClient, error) {
c, err := pool.GetStorageRegistryClient(address)
if err != nil {
err = errors.Wrap(err, "gateway: error getting a storage provider client")
return nil, err
}

return s.cache.StorageRegistryClient(c), nil
}

/*
Expand All @@ -1441,45 +1451,16 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
switch {
case ref == nil:
return nil, errtypes.BadRequest("missing reference")
case ref.ResourceId != nil: // can we use the provider cache?
// only the StorageId is used to look up the provider. the opaqueid can only be a share and as such part of a storage
if value, exists := s.providerCache.Get(ref.ResourceId.StorageId); exists == nil {
if providers, ok := value.([]*registry.ProviderInfo); ok {
return providers, nil
}
}
case ref.ResourceId != nil:
kobergj marked this conversation as resolved.
Show resolved Hide resolved
// no action needed in that case
case ref.Path != "": // TODO implement a mount path cache in the registry?
/*
// path / mount point lookup from cache
if value, exists := s.mountCache.Get(userKey(ctx)); exists == nil {
if m, ok := value.(map[string][]*registry.ProviderInfo); ok {
providers := make([]*registry.ProviderInfo, 0, len(m))
deepestMountPath := ""
for mountPath, providerInfos := range m {
switch {
case strings.HasPrefix(mountPath, ref.Path):
// and add all providers below and exactly matching the path
// requested /foo, mountPath /foo/sub
providers = append(providers, providerInfos...)
case strings.HasPrefix(ref.Path, mountPath) && len(mountPath) > len(deepestMountPath):
// eg. three providers: /foo, /foo/sub, /foo/sub/bar
// requested /foo/sub/mob
deepestMountPath = mountPath
}
}
if deepestMountPath != "" {
providers = append(providers, m[deepestMountPath]...)
}
return providers, nil
}
}
*/
// nothing to do here either
default:
return nil, errtypes.BadRequest("invalid reference, at least path or id must be set")
}

// lookup
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}
Expand All @@ -1497,7 +1478,6 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
}
sdk.EncodeOpaqueMap(listReq.Opaque, filters)
res, err := c.ListStorageProviders(ctx, listReq)

if err != nil {
return nil, errors.Wrap(err, "gateway: error calling ListStorageProviders")
}
Expand All @@ -1522,12 +1502,6 @@ func (s *svc) findProviders(ctx context.Context, ref *provider.Reference) ([]*re
return nil, errtypes.NotFound("gateway: provider is nil")
}

if ref.ResourceId != nil {
if err = s.providerCache.Set(ref.ResourceId.StorageId, res.Providers); err != nil {
appctx.GetLogger(ctx).Warn().Err(err).Interface("reference", ref).Msg("gateway: could not cache providers")
}
}

return res.Providers, nil
}

Expand Down
Loading