diff --git a/changelog/unreleased/improve-posixfs.md b/changelog/unreleased/improve-posixfs.md new file mode 100644 index 0000000000..94468e55ed --- /dev/null +++ b/changelog/unreleased/improve-posixfs.md @@ -0,0 +1,5 @@ +Enhancement: Improve posixfs storage driver + +Improve the posixfs storage driver by fixing several issues and corner cases. + +https://github.com/cs3org/reva/pull/4763 diff --git a/go.mod b/go.mod index 16361ad003..3a0850b97d 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/onsi/ginkgo/v2 v2.15.0 github.com/onsi/gomega v1.31.1 github.com/owncloud/ocis/v2 v2.0.0 - github.com/pablodz/inotifywaitgo v0.0.6 + github.com/pablodz/inotifywaitgo v0.0.7 github.com/pkg/errors v0.9.1 github.com/pkg/xattr v0.4.9 github.com/prometheus/alertmanager v0.26.0 diff --git a/go.sum b/go.sum index 28f7afbdce..3067622e9d 100644 --- a/go.sum +++ b/go.sum @@ -1467,8 +1467,8 @@ github.com/owncloud/ocis/v2 v2.0.0 h1:eHmUpW73dAT0X+JXRStYRzHt9gBUGlysnLg3vjJzac github.com/owncloud/ocis/v2 v2.0.0/go.mod h1:qH016gkfh/PNOv+xfiwD2weWY99nZTTghKhgajshYYk= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= -github.com/pablodz/inotifywaitgo v0.0.6 h1:BTjQfnixXwG7oYmlIiyhWA6iyO9BtxatB3YgiibOTFc= -github.com/pablodz/inotifywaitgo v0.0.6/go.mod h1:OtzRCsYTJlIr+vAzlOtauTkfQ1c25ebFuXq8tbbf8cw= +github.com/pablodz/inotifywaitgo v0.0.7 h1:1ii49dGBnRn0t1Sz7RGZS6/NberPEDQprwKHN49Bv6U= +github.com/pablodz/inotifywaitgo v0.0.7/go.mod h1:OtzRCsYTJlIr+vAzlOtauTkfQ1c25ebFuXq8tbbf8cw= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= diff --git a/pkg/storage/fs/posix/tree/assimilation.go b/pkg/storage/fs/posix/tree/assimilation.go index 966b2a61cf..a350175807 100644 --- a/pkg/storage/fs/posix/tree/assimilation.go +++ b/pkg/storage/fs/posix/tree/assimilation.go @@ -46,52 +46,89 @@ import ( type ScanDebouncer struct { after time.Duration f func(item scanItem) - pending map[string]*time.Timer + pending sync.Map inProgress sync.Map mutex sync.Mutex } +type EventAction int + +const ( + ActionCreate EventAction = iota + ActionUpdate + ActionMove + ActionDelete +) + +type queueItem struct { + item scanItem + timer *time.Timer +} + // NewScanDebouncer returns a new SpaceDebouncer instance func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer { return &ScanDebouncer{ after: d, f: f, - pending: map[string]*time.Timer{}, + pending: sync.Map{}, inProgress: sync.Map{}, } } -// Debounce restars the debounce timer for the given space +// Debounce restarts the debounce timer for the given space func (d *ScanDebouncer) Debounce(item scanItem) { d.mutex.Lock() defer d.mutex.Unlock() path := item.Path force := item.ForceRescan - if t := d.pending[item.Path]; t != nil { - force = force || item.ForceRescan - t.Stop() + recurse := item.Recurse + if i, ok := d.pending.Load(item.Path); ok { + queueItem := i.(*queueItem) + force = force || queueItem.item.ForceRescan + recurse = recurse || queueItem.item.Recurse + queueItem.timer.Stop() } - d.pending[item.Path] = time.AfterFunc(d.after, func() { - if _, ok := d.inProgress.Load(path); ok { - // Reschedule this run for when the previous run has finished - d.mutex.Lock() - d.pending[path].Reset(d.after) - d.mutex.Unlock() - return - } + d.pending.Store(item.Path, &queueItem{ + item: item, + timer: time.AfterFunc(d.after, func() { + if _, ok := d.inProgress.Load(path); ok { + // Reschedule this run for when the previous run has finished + d.mutex.Lock() + if i, ok := d.pending.Load(path); ok { + i.(*queueItem).timer.Reset(d.after) + } - d.inProgress.Store(path, true) - defer d.inProgress.Delete(path) - d.f(scanItem{ - Path: path, - ForceRescan: force, - }) + d.mutex.Unlock() + return + } + + d.pending.Delete(path) + d.inProgress.Store(path, true) + defer d.inProgress.Delete(path) + d.f(scanItem{ + Path: path, + ForceRescan: force, + Recurse: recurse, + }) + }), }) } +// InProgress returns true if the given path is currently being processed +func (d *ScanDebouncer) InProgress(path string) bool { + d.mutex.Lock() + defer d.mutex.Unlock() + if _, ok := d.pending.Load(path); ok { + return true + } + + _, ok := d.inProgress.Load(path) + return ok +} + func (t *Tree) workScanQueue() { for i := 0; i < t.options.MaxConcurrency; i++ { go func() { @@ -103,17 +140,73 @@ func (t *Tree) workScanQueue() { log.Error().Err(err).Str("path", item.Path).Msg("failed to assimilate item") continue } + + if item.Recurse { + err = t.WarmupIDCache(item.Path, true) + if err != nil { + log.Error().Err(err).Str("path", item.Path).Msg("failed to warmup id cache") + } + } } }() } } // Scan scans the given path and updates the id chache -func (t *Tree) Scan(path string, forceRescan bool) error { - t.scanDebouncer.Debounce(scanItem{ - Path: path, - ForceRescan: forceRescan, - }) +func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) error { + // cases: + switch action { + case ActionCreate: + if !isDir { + // 1. New file (could be emitted as part of a new directory) + // -> assimilate file + // -> scan parent directory recursively + if !t.scanDebouncer.InProgress(filepath.Dir(path)) { + t.scanDebouncer.Debounce(scanItem{ + Path: path, + ForceRescan: false, + }) + } + t.scanDebouncer.Debounce(scanItem{ + Path: filepath.Dir(path), + ForceRescan: true, + Recurse: true, + }) + } else { + // 2. New directory + // -> scan directory + t.scanDebouncer.Debounce(scanItem{ + Path: path, + ForceRescan: true, + Recurse: true, + }) + } + + case ActionUpdate: + // 3. Updated file + // -> update file unless parent directory is being rescanned + if !t.scanDebouncer.InProgress(filepath.Dir(path)) { + t.scanDebouncer.Debounce(scanItem{ + Path: path, + ForceRescan: true, + }) + } + + case ActionMove: + // 4. Moved file + // -> update file + // 5. Moved directory + // -> update directory and all children + t.scanDebouncer.Debounce(scanItem{ + Path: path, + ForceRescan: isDir, + Recurse: isDir, + }) + + case ActionDelete: + _ = t.HandleFileDelete(path) + } + return nil } @@ -183,44 +276,43 @@ func (t *Tree) getOwnerAndIDs(path string) (*userv1beta1.UserId, string, string, return owner, nodeID, spaceID, parentID, nil } -func (t *Tree) assimilate(item scanItem) error { - var err error +func (t *Tree) findSpaceId(path string) (string, node.Attributes, error) { // find the space id, scope by the according user - spaceID := []byte("") - spaceCandidate := item.Path + spaceCandidate := path spaceAttrs := node.Attributes{} for strings.HasPrefix(spaceCandidate, t.options.Root) { - spaceAttrs, err = t.lookup.MetadataBackend().All(context.Background(), spaceCandidate) - if err == nil && len(spaceAttrs[prefixes.SpaceIDAttr]) > 0 { - spaceID = spaceAttrs[prefixes.SpaceIDAttr] + spaceAttrs, err := t.lookup.MetadataBackend().All(context.Background(), spaceCandidate) + spaceID := spaceAttrs[prefixes.SpaceIDAttr] + if err == nil && len(spaceID) > 0 { if t.options.UseSpaceGroups { // set the uid and gid for the space fi, err := os.Stat(spaceCandidate) if err != nil { - return err + return "", spaceAttrs, err } sys := fi.Sys().(*syscall.Stat_t) gid := int(sys.Gid) _, err = t.userMapper.ScopeUserByIds(-1, gid) if err != nil { - return err + return "", spaceAttrs, err } } - break + + return string(spaceID), spaceAttrs, nil } spaceCandidate = filepath.Dir(spaceCandidate) } - if len(spaceID) == 0 { - return fmt.Errorf("did not find space id for path") - } + return "", spaceAttrs, fmt.Errorf("could not find space for path %s", path) +} +func (t *Tree) assimilate(item scanItem) error { var id []byte - if !item.ForceRescan { - // already assimilated? - id, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr) - if err == nil { - return t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path) - } + var err error + + // First find the space id + spaceID, spaceAttrs, err := t.findSpaceId(item.Path) + if err != nil { + return err } // lock the file for assimilation @@ -240,64 +332,62 @@ func (t *Tree) assimilate(item scanItem) error { // check for the id attribute again after grabbing the lock, maybe the file was assimilated/created by us in the meantime id, err = t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr) if err == nil { - previousPath, ok := t.lookup.(*lookup.Lookup).GetCachedID(context.Background(), string(spaceID), string(id)) + previousPath, ok := t.lookup.(*lookup.Lookup).GetCachedID(context.Background(), spaceID, string(id)) - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path) - if item.ForceRescan { - previousParentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr) - if err != nil { - return err - } + // This item had already been assimilated in the past. Update the path + _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path) - fi, err := t.updateFile(item.Path, string(id), string(spaceID)) - if err != nil { - return err - } + previousParentID, _ := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr) - // was it moved? - if ok && previousPath != item.Path { - // purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair. - _ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath) - _ = t.lookup.MetadataBackend().Purge(previousPath) + fi, err := t.updateFile(item.Path, string(id), spaceID) + if err != nil { + return err + } - if fi.IsDir() { - // if it was moved and it is a directory we need to propagate the move - go func() { _ = t.WarmupIDCache(item.Path, false) }() - } + // was it moved? + if ok && len(previousParentID) > 0 && previousPath != item.Path { + // purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair. + _ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath) + _ = t.lookup.MetadataBackend().Purge(previousPath) - parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr) - if err == nil && len(parentID) > 0 { - ref := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: t.options.MountID, - SpaceId: string(spaceID), - OpaqueId: string(parentID), - }, - Path: filepath.Base(item.Path), - } - oldRef := &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: t.options.MountID, - SpaceId: string(spaceID), - OpaqueId: string(previousParentID), - }, - Path: filepath.Base(previousPath), - } - t.PublishEvent(events.ItemMoved{ - SpaceOwner: user, - Executant: user, - Owner: user, - Ref: ref, - OldReference: oldRef, - Timestamp: utils.TSNow(), - }) + if fi.IsDir() { + // if it was moved and it is a directory we need to propagate the move + go func() { _ = t.WarmupIDCache(item.Path, false) }() + } + + parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr) + if err == nil && len(parentID) > 0 { + ref := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: t.options.MountID, + SpaceId: spaceID, + OpaqueId: string(parentID), + }, + Path: filepath.Base(item.Path), } + oldRef := &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: t.options.MountID, + SpaceId: spaceID, + OpaqueId: string(previousParentID), + }, + Path: filepath.Base(previousPath), + } + t.PublishEvent(events.ItemMoved{ + SpaceOwner: user, + Executant: user, + Owner: user, + Ref: ref, + OldReference: oldRef, + Timestamp: utils.TSNow(), + }) } + // } } } else { // assimilate new file newId := uuid.New().String() - fi, err := t.updateFile(item.Path, newId, string(spaceID)) + fi, err := t.updateFile(item.Path, newId, spaceID) if err != nil { return err } @@ -305,7 +395,7 @@ func (t *Tree) assimilate(item scanItem) error { ref := &provider.Reference{ ResourceId: &provider.ResourceId{ StorageId: t.options.MountID, - SpaceId: string(spaceID), + SpaceId: spaceID, OpaqueId: newId, }, } @@ -455,17 +545,25 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error { return err } - return filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + sizes := make(map[string]int64) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } - if strings.HasSuffix(path, ".flock") || strings.HasSuffix(path, ".mlock") { + // skip lock files + if isLockFile(path) { return nil } + // calculate tree sizes + if !info.IsDir() { + dir := filepath.Dir(path) + sizes[dir] += info.Size() + } + attribs, err := t.lookup.MetadataBackend().All(context.Background(), path) - if err == nil { + if err == nil && len(attribs[prefixes.IDAttr]) > 0 { nodeSpaceID := attribs[prefixes.SpaceIDAttr] if len(nodeSpaceID) > 0 { spaceID = nodeSpaceID @@ -496,10 +594,18 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error { id, ok := attribs[prefixes.IDAttr] if ok { _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path) - } else if assimilate { - _ = t.Scan(path, false) } + } else if assimilate { + _ = t.assimilate(scanItem{Path: path, ForceRescan: true}) } return nil }) + if err != nil { + return err + } + + for dir, size := range sizes { + _ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size))) + } + return nil } diff --git a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go index 4f9f173e55..a61c172802 100644 --- a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go +++ b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go @@ -61,15 +61,15 @@ start: } switch ev.Event { case "CREATE": - go func() { _ = w.tree.Scan(ev.Path, false) }() + go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false, false) }() case "CLOSE": bytesWritten, err := strconv.Atoi(ev.BytesWritten) if err == nil && bytesWritten > 0 { - go func() { _ = w.tree.Scan(ev.Path, true) }() + go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false, true) }() } case "RENAME": go func() { - _ = w.tree.Scan(ev.Path, true) + _ = w.tree.Scan(ev.Path, ActionMove, false, true) _ = w.tree.WarmupIDCache(ev.Path, false) }() } diff --git a/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go b/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go index 67b4d5828a..6d1e295269 100644 --- a/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go +++ b/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go @@ -41,21 +41,21 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) { continue } - if strings.HasSuffix(lwev.Path, ".flock") || strings.HasSuffix(lwev.Path, ".mlock") { + if isLockFile(lwev.Path) { continue } switch { case strings.Contains(lwev.Event, "IN_CREATE"): - go func() { _ = w.tree.Scan(lwev.Path, false) }() + go func() { _ = w.tree.Scan(lwev.Path, ActionCreate, false, false) }() case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"): bytesWritten, err := strconv.Atoi(lwev.BytesWritten) if err == nil && bytesWritten > 0 { - go func() { _ = w.tree.Scan(lwev.Path, true) }() + go func() { _ = w.tree.Scan(lwev.Path, ActionUpdate, false, true) }() } case strings.Contains(lwev.Event, "IN_MOVED_TO"): go func() { - _ = w.tree.Scan(lwev.Path, true) + _ = w.tree.Scan(lwev.Path, ActionMove, false, true) _ = w.tree.WarmupIDCache(lwev.Path, false) }() } diff --git a/pkg/storage/fs/posix/tree/inotifywatcher.go b/pkg/storage/fs/posix/tree/inotifywatcher.go index 9a6fa22a39..c50ddc1d46 100644 --- a/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -2,7 +2,6 @@ package tree import ( "fmt" - "strings" "github.com/pablodz/inotifywaitgo/inotifywaitgo" ) @@ -43,20 +42,20 @@ func (iw *InotifyWatcher) Watch(path string) { select { case event := <-events: for _, e := range event.Events { - if strings.HasSuffix(event.Filename, ".flock") || strings.HasSuffix(event.Filename, ".mlock") { + if isLockFile(event.Filename) { continue } switch e { case inotifywaitgo.DELETE: go func() { _ = iw.tree.HandleFileDelete(event.Filename) }() case inotifywaitgo.CREATE: - go func() { _ = iw.tree.Scan(event.Filename, false) }() + go func() { _ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir, false) }() case inotifywaitgo.MOVED_TO: go func() { - _ = iw.tree.Scan(event.Filename, true) + _ = iw.tree.Scan(event.Filename, ActionMove, event.IsDir, true) }() case inotifywaitgo.CLOSE_WRITE: - go func() { _ = iw.tree.Scan(event.Filename, true) }() + go func() { _ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir, true) }() } } diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index 68498f2070..bacceefe1c 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -75,6 +75,7 @@ type Watcher interface { type scanItem struct { Path string ForceRescan bool + Recurse bool } // Tree manages a hierarchical tree @@ -110,7 +111,7 @@ func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, o *options.Opti idCache: cache, propagator: propagator.New(lu, &o.Options), scanQueue: scanQueue, - scanDebouncer: NewScanDebouncer(500*time.Millisecond, func(item scanItem) { + scanDebouncer: NewScanDebouncer(1000*time.Millisecond, func(item scanItem) { scanQueue <- item }), es: es, @@ -393,6 +394,10 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro g.Go(func() error { defer close(work) for _, name := range names { + if isLockFile(name) { + continue + } + select { case work <- name: case <-ctx.Done(): @@ -877,3 +882,7 @@ func (t *Tree) readRecycleItem(ctx context.Context, spaceID, key, path string) ( return } + +func isLockFile(path string) bool { + return strings.HasSuffix(path, ".lock") || strings.HasSuffix(path, ".flock") || strings.HasSuffix(path, ".mlock") +}