Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add user ID cache warmup to EOS storage driver #1774

Merged
merged 4 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/unreleased/eos-cache-warmup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Enhancement: Add user ID cache warmup to EOS storage driver

https://github.com/cs3org/reva/pull/1774
10 changes: 10 additions & 0 deletions pkg/storage/utils/eosfs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,14 @@ type Config struct {
// URI of the EOS MGM grpc server
// Default is empty
GrpcURI string `mapstructure:"master_grpc_uri"`

// Size of the cache used to store user ID and UID resolution.
// Default value is 1000000.
UserIDCacheSize int `mapstructure:"user_id_cache_size"`

// The depth, starting from root, that we'll parse directories to lookup the
// owner and warm up the cache. For example, for a layout of {{substr 0 1 .Username}}/{{.Username}}
// and a depth of 2, we'll lookup each user's home directory.
// Default value is 2.
UserIDCacheWarmupDepth int `mapstructure:"user_id_cache_warmup_depth"`
}
84 changes: 58 additions & 26 deletions pkg/storage/utils/eosfs/eosfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ import (
"net/url"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"

"github.com/bluele/gcache"
grouppb "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -119,6 +118,14 @@ func (c *Config) init() {
c.UserLayout = "{{.Username}}" // TODO set better layout
}

if c.UserIDCacheSize == 0 {
c.UserIDCacheSize = 1000000
}

if c.UserIDCacheWarmupDepth == 0 {
c.UserIDCacheWarmupDepth = 2
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
}

c.GatewaySvc = sharedconf.GetGatewaySVC(c.GatewaySvc)
}

Expand All @@ -128,7 +135,7 @@ type eosfs struct {
chunkHandler *chunking.ChunkHandler
singleUserUID string
singleUserGID string
userIDCache sync.Map
userIDCache gcache.Cache
}

// NewEOSFS returns a storage.FS interface implementation that connects to an EOS instance
Expand Down Expand Up @@ -179,12 +186,35 @@ func NewEOSFS(c *Config) (storage.FS, error) {
c: eosClient,
conf: c,
chunkHandler: chunking.NewChunkHandler(c.CacheDirectory),
userIDCache: sync.Map{},
userIDCache: gcache.New(c.UserIDCacheSize).LFU().Build(),
}

go eosfs.userIDcacheWarmup()

return eosfs, nil
}

func (fs *eosfs) userIDcacheWarmup() {
if !fs.conf.EnableHome {
ctx := context.Background()
paths := []string{fs.wrap(ctx, "/")}
uid, gid, _ := fs.getRootUIDAndGID(ctx)

for i := 0; i < fs.conf.UserIDCacheWarmupDepth; i++ {
var newPaths []string
for _, fn := range paths {
if eosFileInfos, err := fs.c.List(ctx, uid, gid, fn); err == nil {
for _, f := range eosFileInfos {
_, _ = fs.getUserIDGateway(ctx, strconv.FormatUint(f.UID, 10))
newPaths = append(newPaths, f.File)
}
}
}
paths = newPaths
}
}
}

func (fs *eosfs) Shutdown(ctx context.Context) error {
// TODO(labkode): in a grpc implementation we can close connections.
return nil
Expand Down Expand Up @@ -579,7 +609,7 @@ func (fs *eosfs) GetMD(ctx context.Context, ref *provider.Reference, mdKeys []st
return nil, err
}

return fs.convertToResourceInfo(ctx, eosFileInfo, false)
return fs.convertToResourceInfo(ctx, eosFileInfo)
}

func (fs *eosfs) getMDShareFolder(ctx context.Context, p string, mdKeys []string) (*provider.ResourceInfo, error) {
Expand All @@ -602,7 +632,7 @@ func (fs *eosfs) getMDShareFolder(ctx context.Context, p string, mdKeys []string
// TODO(labkode): diff between root (dir) and children (ref)

if fs.isShareFolderRoot(ctx, p) {
return fs.convertToResourceInfo(ctx, eosFileInfo, false)
return fs.convertToResourceInfo(ctx, eosFileInfo)
}
return fs.convertToFileReference(ctx, eosFileInfo)
}
Expand Down Expand Up @@ -644,10 +674,6 @@ func (fs *eosfs) listWithNominalHome(ctx context.Context, p string) (finfos []*p
}

fn := fs.wrap(ctx, p)
virtualView := false
if !fs.conf.EnableHome && filepath.Dir(fn) == filepath.Clean(fs.conf.Namespace) {
virtualView = true
}

eosFileInfos, err := fs.c.List(ctx, uid, gid, fn)
if err != nil {
Expand All @@ -665,7 +691,7 @@ func (fs *eosfs) listWithNominalHome(ctx context.Context, p string) (finfos []*p
}

// Remove the hidden folders in the topmost directory
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo, virtualView); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
finfos = append(finfos, finfo)
}
}
Expand Down Expand Up @@ -719,7 +745,7 @@ func (fs *eosfs) listHome(ctx context.Context, home string) ([]*provider.Resourc
}
}

if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo, false); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
if finfo, err := fs.convertToResourceInfo(ctx, eosFileInfo); err == nil && finfo.Path != "/" && !strings.HasPrefix(finfo.Path, "/.") {
finfos = append(finfos, finfo)
}
}
Expand Down Expand Up @@ -1324,7 +1350,7 @@ func (fs *eosfs) convertToRecycleItem(ctx context.Context, eosDeletedItem *eoscl
}

func (fs *eosfs) convertToRevision(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.FileVersion, error) {
md, err := fs.convertToResourceInfo(ctx, eosFileInfo, false)
md, err := fs.convertToResourceInfo(ctx, eosFileInfo)
if err != nil {
return nil, err
}
Expand All @@ -1337,12 +1363,12 @@ func (fs *eosfs) convertToRevision(ctx context.Context, eosFileInfo *eosclient.F
return revision, nil
}

func (fs *eosfs) convertToResourceInfo(ctx context.Context, eosFileInfo *eosclient.FileInfo, virtualView bool) (*provider.ResourceInfo, error) {
return fs.convert(ctx, eosFileInfo, virtualView)
func (fs *eosfs) convertToResourceInfo(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
return fs.convert(ctx, eosFileInfo)
}

func (fs *eosfs) convertToFileReference(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
info, err := fs.convert(ctx, eosFileInfo, false)
info, err := fs.convert(ctx, eosFileInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1432,7 +1458,7 @@ func mergePermissions(l *provider.ResourcePermissions, r *provider.ResourcePermi
l.UpdateGrant = l.UpdateGrant || r.UpdateGrant
}

func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo, virtualView bool) (*provider.ResourceInfo, error) {
func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo) (*provider.ResourceInfo, error) {
path, err := fs.unwrap(ctx, eosFileInfo.File)
if err != nil {
return nil, err
Expand All @@ -1443,13 +1469,10 @@ func (fs *eosfs) convert(ctx context.Context, eosFileInfo *eosclient.FileInfo, v
size = eosFileInfo.TreeSize
}

owner := &userpb.UserId{}
if !virtualView {
owner, err = fs.getUserIDGateway(ctx, strconv.FormatUint(eosFileInfo.UID, 10))
if err != nil {
sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Warn().Uint64("uid", eosFileInfo.UID).Msg("could not lookup userid, leaving empty")
}
owner, err := fs.getUserIDGateway(ctx, strconv.FormatUint(eosFileInfo.UID, 10))
if err != nil {
sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Warn().Uint64("uid", eosFileInfo.UID).Msg("could not lookup userid, leaving empty")
}

var xs provider.ResourceChecksum
Expand Down Expand Up @@ -1542,9 +1565,18 @@ func (fs *eosfs) getUIDGateway(ctx context.Context, u *userpb.UserId) (string, s
}

func (fs *eosfs) getUserIDGateway(ctx context.Context, uid string) (*userpb.UserId, error) {
if userIDInterface, ok := fs.userIDCache.Load(uid); ok {
log := appctx.GetLogger(ctx)
// Handle the case of root
if uid == "0" {
return nil, errtypes.BadRequest("eosfs: cannot return root user")
}

if userIDInterface, err := fs.userIDCache.Get(uid); err == nil {
log.Debug().Msg("eosfs: found cached uid " + uid)
return userIDInterface.(*userpb.UserId), nil
}

log.Debug().Msg("eosfs: retrieving user from gateway for uid " + uid)
client, err := pool.GetGatewayServiceClient(fs.conf.GatewaySvc)
if err != nil {
return nil, errors.Wrap(err, "eos: error getting gateway grpc client")
Expand All @@ -1560,7 +1592,7 @@ func (fs *eosfs) getUserIDGateway(ctx context.Context, uid string) (*userpb.User
return nil, errors.Wrap(err, "eos: grpc get user failed")
}

fs.userIDCache.Store(uid, getUserResp.User.Id)
_ = fs.userIDCache.Set(uid, getUserResp.User.Id)
return getUserResp.User.Id, nil
}

Expand Down