diff --git a/changelog/unreleased/ocs-first-req-warmup.md b/changelog/unreleased/ocs-first-req-warmup.md new file mode 100644 index 0000000000..ac3c9a9070 --- /dev/null +++ b/changelog/unreleased/ocs-first-req-warmup.md @@ -0,0 +1,3 @@ +Enhancement: Add ocs cache warmup strategy for first request from the user + +https://github.com/cs3org/reva/pull/2117 diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index 241f625bae..ed50b3913e 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -40,6 +40,7 @@ import ( _ "github.com/cs3org/reva/pkg/ocm/share/manager/loader" _ "github.com/cs3org/reva/pkg/publicshare/manager/loader" _ "github.com/cs3org/reva/pkg/rhttp/datatx/manager/loader" + _ "github.com/cs3org/reva/pkg/share/cache/loader" _ "github.com/cs3org/reva/pkg/share/manager/loader" _ "github.com/cs3org/reva/pkg/storage/fs/loader" _ "github.com/cs3org/reva/pkg/storage/registry/loader" diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index 3ea0d881f4..da46ffbd8e 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -1945,7 +1945,10 @@ func (s *svc) getPath(ctx context.Context, ref *provider.Reference, keys ...stri if ref.ResourceId != nil { req := &provider.StatRequest{Ref: ref, ArbitraryMetadataKeys: keys} res, err := s.stat(ctx, req) - if (res != nil && res.Status.Code != rpc.Code_CODE_OK) || err != nil { + if err != nil { + return "", status.NewStatusFromErrType(ctx, "getPath ref="+ref.String(), err) + } + if res != nil && res.Status.Code != rpc.Code_CODE_OK { return "", res.Status } diff --git a/internal/http/services/owncloud/ocs/cache.go b/internal/http/services/owncloud/ocs/cache.go new file mode 100644 index 0000000000..8381f6150e --- /dev/null +++ b/internal/http/services/owncloud/ocs/cache.go @@ -0,0 +1,69 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package ocs + +import ( + "context" + "net/http" + "net/http/httptest" + + "github.com/cs3org/reva/pkg/appctx" + ctxpkg "github.com/cs3org/reva/pkg/ctx" + "google.golang.org/grpc/metadata" +) + +func (s *svc) cacheWarmup(w http.ResponseWriter, r *http.Request) { + if s.warmupCacheTracker != nil { + u := ctxpkg.ContextMustGetUser(r.Context()) + tkn := ctxpkg.ContextMustGetToken(r.Context()) + log := appctx.GetLogger(r.Context()) + + // We make a copy of the context because the original one comes with its cancel channel, + // so once the initial request is finished, this ctx gets cancelled as well. + // And in most of the cases, the warmup takes a longer amount of time to complete than the original request. + // TODO: Check if we can come up with a better solution, eg, https://stackoverflow.com/a/54132324 + ctx := context.Background() + ctx = appctx.WithLogger(ctx, log) + ctx = ctxpkg.ContextSetUser(ctx, u) + ctx = ctxpkg.ContextSetToken(ctx, tkn) + ctx = metadata.AppendToOutgoingContext(ctx, ctxpkg.TokenHeader, tkn) + + req, _ := http.NewRequest("GET", "", nil) + req = req.WithContext(ctx) + req.URL = r.URL + + id := u.Id.OpaqueId + if _, err := s.warmupCacheTracker.Get(id); err != nil { + p := httptest.NewRecorder() + _ = s.warmupCacheTracker.Set(id, true) + + log.Info().Msgf("cache warmup getting created shares for user %s", id) + req.URL.Path = "/v1.php/apps/files_sharing/api/v1/shares" + s.router.ServeHTTP(p, req) + + log.Info().Msgf("cache warmup getting received shares for user %s", id) + req.URL.Path = "/v1.php/apps/files_sharing/api/v1/shares" + q := req.URL.Query() + q.Set("shared_with_me", "true") + q.Set("state", "all") + req.URL.RawQuery = q.Encode() + s.router.ServeHTTP(p, req) + } + } +} diff --git a/internal/http/services/owncloud/ocs/config/config.go b/internal/http/services/owncloud/ocs/config/config.go index 20d785c011..266c5308f6 100644 --- a/internal/http/services/owncloud/ocs/config/config.go +++ b/internal/http/services/owncloud/ocs/config/config.go @@ -38,6 +38,7 @@ type Config struct { CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"` ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"` ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"` + UserIdentifierCacheTTL int `mapstructure:"user_identifier_cache_ttl"` } // Init sets sane defaults @@ -66,5 +67,9 @@ func (c *Config) Init() { c.ResourceInfoCacheSize = 1000000 } + if c.UserIdentifierCacheTTL == 0 { + c.UserIdentifierCacheTTL = 60 + } + c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc) } diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index afc3252811..475842c242 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -100,7 +100,7 @@ func (h *Handler) Init(c *config.Config) { h.additionalInfoTemplate, _ = template.New("additionalInfo").Parse(c.AdditionalInfoAttribute) h.userIdentifierCache = ttlcache.NewCache() - _ = h.userIdentifierCache.SetTTL(time.Second * 60) + _ = h.userIdentifierCache.SetTTL(time.Second * time.Duration(c.UserIdentifierCacheTTL)) if h.resourceInfoCacheTTL > 0 { cwm, err := getCacheWarmupManager(c) @@ -111,13 +111,14 @@ func (h *Handler) Init(c *config.Config) { } func (h *Handler) startCacheWarmup(c cache.Warmup) { + time.Sleep(2 * time.Second) infos, err := c.GetResourceInfos() if err != nil { return } for _, r := range infos { key := wrapResourceID(r.Id) - _ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL) + _ = h.resourceInfoCache.SetWithExpire(key, r, h.resourceInfoCacheTTL) } } @@ -511,6 +512,7 @@ func (h *Handler) listSharesWithMe(w http.ResponseWriter, r *http.Request) { // which pending state to list stateFilter := getStateFilter(r.FormValue("state")) + log := appctx.GetLogger(r.Context()) client, err := pool.GetGatewayServiceClient(h.gatewayAddr) if err != nil { response.WriteOCSError(w, r, response.MetaServerError.StatusCode, "error getting grpc gateway client", err) @@ -1021,6 +1023,7 @@ func (h *Handler) getResourceInfo(ctx context.Context, client gateway.GatewayAPI pinfo = infoIf.(*provider.ResourceInfo) status = &rpc.Status{Code: rpc.Code_CODE_OK} } else { + logger.Debug().Msgf("cache miss for resource %+v, statting", key) statReq := &provider.StatRequest{ Ref: ref, } diff --git a/internal/http/services/owncloud/ocs/ocs.go b/internal/http/services/owncloud/ocs/ocs.go index cb2b993135..d411f4dace 100644 --- a/internal/http/services/owncloud/ocs/ocs.go +++ b/internal/http/services/owncloud/ocs/ocs.go @@ -20,7 +20,9 @@ package ocs import ( "net/http" + "time" + "github.com/ReneKroon/ttlcache/v2" "github.com/cs3org/reva/internal/http/services/owncloud/ocs/config" "github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/sharees" "github.com/cs3org/reva/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares" @@ -41,8 +43,9 @@ func init() { } type svc struct { - c *config.Config - router *chi.Mux + c *config.Config + router *chi.Mux + warmupCacheTracker *ttlcache.Cache } func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) { @@ -63,6 +66,11 @@ func New(m map[string]interface{}, log *zerolog.Logger) (global.Service, error) return nil, err } + if conf.CacheWarmupDriver == "first-request" && conf.ResourceInfoCacheTTL > 0 { + s.warmupCacheTracker = ttlcache.NewCache() + _ = s.warmupCacheTracker.SetTTL(time.Second * time.Duration(conf.ResourceInfoCacheTTL)) + } + return s, nil } @@ -138,6 +146,10 @@ func (s *svc) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { log := appctx.GetLogger(r.Context()) log.Debug().Str("path", r.URL.Path).Msg("ocs routing") + + // Warmup the share cache for the user + go s.cacheWarmup(w, r) + // unset raw path, otherwise chi uses it to route and then fails to match percent encoded path segments r.URL.RawPath = "" s.router.ServeHTTP(w, r) diff --git a/pkg/share/cache/cbox/cbox.go b/pkg/share/cache/cbox/cbox.go index 5a629b7d48..6f3d42295f 100644 --- a/pkg/share/cache/cbox/cbox.go +++ b/pkg/share/cache/cbox/cbox.go @@ -16,7 +16,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -package eos +package cbox import ( "context" @@ -24,14 +24,17 @@ import ( "fmt" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" - types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/auth/scope" ctxpkg "github.com/cs3org/reva/pkg/ctx" + "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/pkg/share/cache" "github.com/cs3org/reva/pkg/share/cache/registry" - "github.com/cs3org/reva/pkg/storage/fs/eos" + "github.com/cs3org/reva/pkg/token/manager/jwt" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" + "google.golang.org/grpc/metadata" // Provides mysql drivers _ "github.com/go-sql-driver/mysql" @@ -49,6 +52,7 @@ type config struct { DbName string `mapstructure:"db_name"` EOSNamespace string `mapstructure:"namespace"` GatewaySvc string `mapstructure:"gatewaysvc"` + JWTSecret string `mapstructure:"jwt_secret"` } type manager struct { @@ -90,6 +94,36 @@ func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) { } defer rows.Close() + tokenManager, err := jwt.New(map[string]interface{}{ + "secret": m.conf.JWTSecret, + }) + if err != nil { + return nil, err + } + + u := &userpb.User{ + Id: &userpb.UserId{ + OpaqueId: "root", + }, + UidNumber: 0, + GidNumber: 0, + } + scope, err := scope.AddOwnerScope(nil) + if err != nil { + return nil, err + } + + tkn, err := tokenManager.MintToken(context.Background(), u, scope) + if err != nil { + return nil, err + } + ctx := metadata.AppendToOutgoingContext(context.Background(), ctxpkg.TokenHeader, tkn) + + client, err := pool.GetGatewayServiceClient(m.conf.GatewaySvc) + if err != nil { + return nil, err + } + infos := []*provider.ResourceInfo{} for rows.Next() { var storageID, nodeID string @@ -97,45 +131,19 @@ func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) { continue } - eosOpts := map[string]interface{}{ - "namespace": m.conf.EOSNamespace, - "master_url": fmt.Sprintf("root://%s.cern.ch", storageID), - "version_invariant": true, - "gatewaysvc": m.conf.GatewaySvc, - } - eos, err := eos.New(eosOpts) - if err != nil { - return nil, err - } - - ctx := ctxpkg.ContextSetUser(context.Background(), &userpb.User{ - Id: &userpb.UserId{ - OpaqueId: "root", - }, - Opaque: &types.Opaque{ - Map: map[string]*types.OpaqueEntry{ - "uid": { - Decoder: "plain", - Value: []byte("0"), - }, - "gid": { - Decoder: "plain", - Value: []byte("0"), - }, - }, - }, - }) - - inf, err := eos.GetMD(ctx, &provider.Reference{ + statReq := provider.StatRequest{Ref: &provider.Reference{ ResourceId: &provider.ResourceId{ StorageId: storageID, OpaqueId: nodeID, }, - }, []string{}) - if err != nil { - return nil, err + }} + + statRes, err := client.Stat(ctx, &statReq) + if err != nil || statRes.Status.Code != rpc.Code_CODE_OK { + continue } - infos = append(infos, inf) + + infos = append(infos, statRes.Info) } if err = rows.Err(); err != nil { diff --git a/pkg/share/cache/loader/loader.go b/pkg/share/cache/loader/loader.go new file mode 100644 index 0000000000..a3d8ec7cbb --- /dev/null +++ b/pkg/share/cache/loader/loader.go @@ -0,0 +1,25 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load share cache drivers. + _ "github.com/cs3org/reva/pkg/share/cache/cbox" + // Add your own here +)