From 539c44e78c0b3433a9d4b10d9d70832a593e8745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Fri, 16 Jun 2023 15:00:17 +0200 Subject: [PATCH 1/6] Add mtimesyncedcache package --- .../decomposedfs/mtimesyncedcache/map.go | 36 ++++++ .../mtimesyncedcache/mtimesyncedcache.go | 55 ++++++++ .../mtimesyncedcache_suite_test.go | 13 ++ .../mtimesyncedcache/mtimesyncedcache_test.go | 120 ++++++++++++++++++ 4 files changed, 224 insertions(+) create mode 100644 pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go create mode 100644 pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go create mode 100644 pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_suite_test.go create mode 100644 pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go new file mode 100644 index 0000000000..830634fc23 --- /dev/null +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/map.go @@ -0,0 +1,36 @@ +package mtimesyncedcache + +import "sync" + +type Map[K comparable, V any] struct { + m sync.Map +} + +func (m *Map[K, V]) Delete(key K) { m.m.Delete(key) } + +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.m.Load(key) + if !ok { + return value, ok + } + return v.(V), ok +} + +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.m.LoadAndDelete(key) + if !loaded { + return value, loaded + } + return v.(V), loaded +} + +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + a, loaded := m.m.LoadOrStore(key, value) + return a.(V), loaded +} + +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) }) +} + +func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) } diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go new file mode 100644 index 0000000000..54e8d94959 --- /dev/null +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go @@ -0,0 +1,55 @@ +package mtimesyncedcache + +import ( + "sync" + "time" +) + +type Cache[K comparable, T any] struct { + entries Map[K, *entry[T]] +} + +type entry[T any] struct { + mtime time.Time + value T + + mu sync.Mutex +} + +func New[K comparable, T any]() Cache[K, T] { + return Cache[K, T]{ + entries: Map[K, *entry[T]]{}, + } +} + +func (c *Cache[K, T]) Store(key K, mtime time.Time, value T) error { + c.entries.Store(key, &entry[T]{ + mtime: mtime, + value: value, + }) + return nil +} + +func (c *Cache[K, T]) Load(key K) T { + entry, _ := c.entries.Load(key) + return entry.value +} + +func (c *Cache[K, T]) LoadOrStore(key K, mtime time.Time, f func() (T, error)) (T, error) { + e, _ := c.entries.LoadOrStore(key, &entry[T]{}) + + e.mu.Lock() + defer e.mu.Unlock() + if mtime.After(e.mtime) { + e.mtime = mtime + v, err := f() + if err != nil { + var t T + return t, err + } + e.value = v + c.entries.Store(key, e) + } + + return e.value, nil +} diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_suite_test.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_suite_test.go new file mode 100644 index 0000000000..a4092721ed --- /dev/null +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_suite_test.go @@ -0,0 +1,13 @@ +package mtimesyncedcache_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestMtimesyncedcache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Mtimesyncedcache Suite") +} diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go new file mode 100644 index 0000000000..b6c8adfa46 --- /dev/null +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go @@ -0,0 +1,120 @@ +package mtimesyncedcache_test + +import ( + "errors" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache" +) + +var _ = Describe("Mtimesyncedcache", func() { + var ( + cache mtimesyncedcache.Cache[string, string] + + key = "key" + value = "value" + ) + + BeforeEach(func() { + cache = mtimesyncedcache.New[string, string]() + }) + + Describe("Store", func() { + It("stores a value", func() { + time := time.Now() + + err := cache.Store(key, time, value) + Expect(err).ToNot(HaveOccurred()) + }) + + PIt("returns an error when the mtime is older") + }) + + Describe("Load", func() { + It("loads the stored value", func() { + err := cache.Store(key, time.Now(), value) + Expect(err).ToNot(HaveOccurred()) + + v := cache.Load(key) + Expect(v).To(Equal(value)) + }) + + PIt("fails the value doesn't exist", func() { + v := cache.Load(key) + Expect(v).To(Equal(value)) + }) + }) + + Describe("LoadOrStore", func() { + It("does not update the cache if the cache is up to date", func() { + cachedTime := time.Now().Add(-1 * time.Hour) + err := cache.Store(key, cachedTime, value) + Expect(err).ToNot(HaveOccurred()) + + newvalue := "yaaay" + v, err := cache.LoadOrStore(key, cachedTime, func() (string, error) { + return newvalue, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(value)) + + v, err = cache.LoadOrStore(key, time.Now().Add(-2*time.Hour), func() (string, error) { + return newvalue, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(value)) + }) + + It("updates the cache if the cache is outdated", func() { + outdatedTime := time.Now().Add(-1 * time.Hour) + err := cache.Store(key, outdatedTime, value) + Expect(err).ToNot(HaveOccurred()) + + newvalue := "yaaay" + v, err := cache.LoadOrStore(key, time.Now(), func() (string, error) { + return newvalue, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(newvalue)) + }) + + It("stores the value if the key doesn't exist yet", func() { + newvalue := "yaaay" + v, err := cache.LoadOrStore(key, time.Now(), func() (string, error) { + return newvalue, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(newvalue)) + }) + + It("sets the mtime when storing the value", func() { + newTime := time.Now() + + newvalue := "yaaay" + v, err := cache.LoadOrStore(key, newTime, func() (string, error) { + return newvalue, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(newvalue)) + + newvalue2 := "asdfasdf" + v, err = cache.LoadOrStore(key, newTime, func() (string, error) { + return newvalue2, nil + }) + Expect(err).ToNot(HaveOccurred()) + Expect(v).To(Equal(newvalue)) + }) + + It("passes on error from the store func", func() { + v, err := cache.LoadOrStore(key, time.Now(), func() (string, error) { + return "", errors.New("baa") + }) + Expect(v).To(Equal("")) + Expect(err.Error()).To(Equal("baa")) + + }) + }) +}) From 4e1cf9b846d63ca40d01cc309418faef85710b92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Fri, 16 Jun 2023 15:01:08 +0200 Subject: [PATCH 2/6] Cache space lookup indexes in memory --- .../utils/decomposedfs/decomposedfs.go | 4 +- pkg/storage/utils/decomposedfs/spaces.go | 98 +++++++++++++++---- 2 files changed, 82 insertions(+), 20 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index ebcafa4f3b..d105bea917 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -49,6 +49,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/mtimesyncedcache" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" @@ -99,7 +100,8 @@ type Decomposedfs struct { stream events.Stream cache cache.StatCache - UserCache *ttlcache.Cache + UserCache *ttlcache.Cache + spaceIDCache mtimesyncedcache.Cache[string, map[string]string] } // NewDefault returns an instance with default components diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index b78ec624e6..c7b96bbaac 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -299,17 +299,37 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide matches := map[string]struct{}{} if requestedUserID != nil { - path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), nodeID) - m, err := filepath.Glob(path) + indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()) + fi, err := os.Stat(indexPath) if err != nil { return nil, err } - for _, match := range m { - link, err := os.Readlink(match) + allMatches, err := fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string]string, error) { + path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), "*") + m, err := filepath.Glob(path) if err != nil { - continue + return nil, err + } + matches := map[string]string{} + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + matches[match] = link } - matches[link] = struct{}{} + return matches, nil + }) + if err != nil { + return nil, err + } + + if nodeID == spaceIDAny { + for _, match := range allMatches { + matches[match] = struct{}{} + } + } else { + matches[allMatches[nodeID]] = struct{}{} } // get Groups for userid @@ -323,17 +343,37 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide } for _, group := range user.Groups { - path := filepath.Join(fs.o.Root, "indexes", "by-group-id", group, nodeID) - m, err := filepath.Glob(path) + indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group) + fi, err := os.Stat(indexPath) if err != nil { - return nil, err + continue } - for _, match := range m { - link, err := os.Readlink(match) + allMatches, err := fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) { + path := filepath.Join(fs.o.Root, "indexes", "by-group-id", group, "*") + m, err := filepath.Glob(path) if err != nil { - continue + return nil, err + } + matches := map[string]string{} + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + matches[match] = link } - matches[link] = struct{}{} + return matches, nil + }) + if err != nil { + return nil, err + } + + if nodeID == spaceIDAny { + for _, match := range allMatches { + matches[match] = struct{}{} + } + } else { + matches[allMatches[nodeID]] = struct{}{} } } @@ -341,17 +381,37 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if requestedUserID == nil { for spaceType := range spaceTypes { - path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, nodeID) - m, err := filepath.Glob(path) + indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType) + fi, err := os.Stat(indexPath) if err != nil { return nil, err } - for _, match := range m { - link, err := os.Readlink(match) + allMatches, err := fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string]string, error) { + path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, "*") + m, err := filepath.Glob(path) if err != nil { - continue + return nil, err + } + matches := map[string]string{} + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + matches[match] = link + } + return matches, nil + }) + if err != nil { + return nil, err + } + + if nodeID == spaceIDAny { + for _, match := range allMatches { + matches[match] = struct{}{} } - matches[link] = struct{}{} + } else { + matches[allMatches[nodeID]] = struct{}{} } } } From f3b4dc18e9a9c75ba423c639280baf3a6f2d95fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 19 Jun 2023 08:50:32 +0200 Subject: [PATCH 3/6] Handle missing corner case --- .../decomposedfs/mtimesyncedcache/mtimesyncedcache.go | 10 +++++++--- .../mtimesyncedcache/mtimesyncedcache_test.go | 11 +++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go index 54e8d94959..65f7bd3f8d 100644 --- a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache.go @@ -30,9 +30,13 @@ func (c *Cache[K, T]) Store(key K, mtime time.Time, value T) error { return nil } -func (c *Cache[K, T]) Load(key K) T { - entry, _ := c.entries.Load(key) - return entry.value +func (c *Cache[K, T]) Load(key K) (T, bool) { + entry, ok := c.entries.Load(key) + if !ok { + var t T + return t, false + } + return entry.value, true } func (c *Cache[K, T]) LoadOrStore(key K, mtime time.Time, f func() (T, error)) (T, error) { diff --git a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go index b6c8adfa46..1ad64cade1 100644 --- a/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go +++ b/pkg/storage/utils/decomposedfs/mtimesyncedcache/mtimesyncedcache_test.go @@ -29,8 +29,6 @@ var _ = Describe("Mtimesyncedcache", func() { err := cache.Store(key, time, value) Expect(err).ToNot(HaveOccurred()) }) - - PIt("returns an error when the mtime is older") }) Describe("Load", func() { @@ -38,13 +36,14 @@ var _ = Describe("Mtimesyncedcache", func() { err := cache.Store(key, time.Now(), value) Expect(err).ToNot(HaveOccurred()) - v := cache.Load(key) + v, ok := cache.Load(key) + Expect(ok).To(BeTrue()) Expect(v).To(Equal(value)) }) - PIt("fails the value doesn't exist", func() { - v := cache.Load(key) - Expect(v).To(Equal(value)) + It("reports when the key doesn't exist", func() { + _, ok := cache.Load("doesnotexist") + Expect(ok).To(BeFalse()) }) }) From 51e1763b6f49ef996e18a0062bea1ad0cc4c1952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 19 Jun 2023 09:23:25 +0200 Subject: [PATCH 4/6] Fix listing all spaces of all types. Do not choke on errors. --- pkg/storage/utils/decomposedfs/spaces.go | 41 +++++++++++++----------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index c7b96bbaac..dec5499129 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -299,27 +299,27 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide matches := map[string]struct{}{} if requestedUserID != nil { + allMatches := map[string]string{} indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()) fi, err := os.Stat(indexPath) - if err != nil { - return nil, err - } - allMatches, err := fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string]string, error) { - path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), "*") - m, err := filepath.Glob(path) - if err != nil { - return nil, err - } - matches := map[string]string{} - for _, match := range m { - link, err := os.Readlink(match) + if err == nil { + allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string]string, error) { + path := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId(), "*") + m, err := filepath.Glob(path) if err != nil { - continue + return nil, err } - matches[match] = link - } - return matches, nil - }) + matches := map[string]string{} + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + matches[match] = link + } + return matches, nil + }) + } if err != nil { return nil, err } @@ -381,10 +381,13 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if requestedUserID == nil { for spaceType := range spaceTypes { - indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType) + indexPath := filepath.Join(fs.o.Root, "indexes", "by-type") + if spaceType != spaceTypeAny { + indexPath = filepath.Join(indexPath, spaceType) + } fi, err := os.Stat(indexPath) if err != nil { - return nil, err + continue } allMatches, err := fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string]string, error) { path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, "*") From c9bc7aa5894f0c9abb0bfa357149f36d928882c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 19 Jun 2023 09:28:30 +0200 Subject: [PATCH 5/6] Add changelog --- changelog/unreleased/cache-space-indexes.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/cache-space-indexes.md diff --git a/changelog/unreleased/cache-space-indexes.md b/changelog/unreleased/cache-space-indexes.md new file mode 100644 index 0000000000..ad85ca4aff --- /dev/null +++ b/changelog/unreleased/cache-space-indexes.md @@ -0,0 +1,5 @@ +Enhancement: cache space indexes + +decomposedfs now caches the different space indexes in memory which greatly improves the performance of ListStorageSpaces on slow storages. + +https://github.com/cs3org/reva/pull/3987 From 1991fec74d79be58965b75ea2ca91de5e106e316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 19 Jun 2023 10:43:44 +0200 Subject: [PATCH 6/6] Touch the by-type index root when linking spaces to invalidate cache --- pkg/storage/utils/decomposedfs/spaces.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index dec5499129..0ddd32f335 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -840,14 +840,16 @@ func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType stri if isAlreadyExists(err) { appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("spacetype", spaceType).Msg("symlink already exists") // FIXME: is it ok to wipe this err if the symlink already exists? - err = nil } else { // TODO how should we handle error cases here? appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("spacetype", spaceType).Msg("could not create symlink") + return err } } - return err + // touch index root to invalidate caches + now := time.Now() + return os.Chtimes(filepath.Join(fs.o.Root, "indexes", "by-type"), now, now) } func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, checkPermissions bool) (*provider.StorageSpace, error) {