Skip to content

Commit

Permalink
Merge pull request #4916 from aduffeck/posixfs-fixes
Browse files Browse the repository at this point in the history
Posixfs improvements
  • Loading branch information
butonic authored Nov 8, 2024
2 parents 8b3179d + 9eaa5bb commit 2ee6db9
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 38 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/improve-posixfs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: Improve posixfs stability and performance

The posixfs storage driver saw a number of bugfixes and optimizations.

https://github.com/cs3org/reva/pull/4916

22 changes: 20 additions & 2 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const (
ActionUpdate
ActionMove
ActionDelete
ActionMoveFrom
)

type queueItem struct {
Expand Down Expand Up @@ -160,14 +161,14 @@ func (t *Tree) workScanQueue() {
}

// Scan scans the given path and updates the id chache
func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) error {
func (t *Tree) Scan(path string, action EventAction, isDir bool) error {
// cases:
switch action {
case ActionCreate:
if !isDir {
// 1. New file (could be emitted as part of a new directory)
// -> assimilate file
// -> scan parent directory recursively
// -> scan parent directory recursively to update tree size and catch nodes that weren't covered by an event
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
t.scanDebouncer.Debounce(scanItem{
Path: path,
Expand Down Expand Up @@ -216,8 +217,24 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) e
Recurse: isDir,
})

case ActionMoveFrom:
// 6. file/directory moved out of the watched directory
// -> update directory
if err := t.setDirty(filepath.Dir(path), true); err != nil {
return err
}

go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }()

case ActionDelete:
_ = t.HandleFileDelete(path)
// 7. Deleted file or directory
// -> update parent and all children
t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
ForceRescan: true,
Recurse: true,
})
}

return nil
Expand Down Expand Up @@ -593,6 +610,7 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error {
if !dirty {
return filepath.SkipDir
}
sizes[path] += 0 // Make sure to set the size to 0 for empty directories
}

attribs, err := t.lookup.MetadataBackend().All(context.Background(), path)
Expand Down
24 changes: 21 additions & 3 deletions pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package tree

import (
Expand Down Expand Up @@ -64,15 +82,15 @@ start:
}
switch ev.Event {
case "CREATE":
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false, false) }()
go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false) }()
case "CLOSE":
bytesWritten, err := strconv.Atoi(ev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false, true) }()
go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false) }()
}
case "RENAME":
go func() {
_ = w.tree.Scan(ev.Path, ActionMove, false, true)
_ = w.tree.Scan(ev.Path, ActionMove, false)
_ = w.tree.WarmupIDCache(ev.Path, false, false)
}()
}
Expand Down
52 changes: 38 additions & 14 deletions pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package tree

import (
Expand Down Expand Up @@ -29,13 +47,13 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
Topic: topic,
})

lwev := &lwe{}
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}

lwev := &lwe{}
err = json.Unmarshal(m.Value, lwev)
if err != nil {
continue
Expand All @@ -45,22 +63,28 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) {
continue
}

isDir := strings.Contains(lwev.Event, "IN_ISDIR")
go func() {
isDir := strings.Contains(lwev.Event, "IN_ISDIR")

switch {
case strings.Contains(lwev.Event, "IN_DELETE"):
_ = w.tree.Scan(lwev.Path, ActionDelete, isDir)

switch {
case strings.Contains(lwev.Event, "IN_CREATE"):
go func() { _ = w.tree.Scan(lwev.Path, ActionCreate, isDir, false) }()
case strings.Contains(lwev.Event, "IN_MOVE_FROM"):
_ = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir)

case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
go func() { _ = w.tree.Scan(lwev.Path, ActionUpdate, isDir, true) }()
case strings.Contains(lwev.Event, "IN_CREATE"):
_ = w.tree.Scan(lwev.Path, ActionCreate, isDir)

case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"):
bytesWritten, err := strconv.Atoi(lwev.BytesWritten)
if err == nil && bytesWritten > 0 {
_ = w.tree.Scan(lwev.Path, ActionUpdate, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
_ = w.tree.Scan(lwev.Path, ActionMove, isDir)
}
case strings.Contains(lwev.Event, "IN_MOVED_TO"):
go func() {
_ = w.tree.Scan(lwev.Path, ActionMove, isDir, true)
}()
}
}()
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
Expand Down
43 changes: 31 additions & 12 deletions pkg/storage/fs/posix/tree/inotifywatcher.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
// Copyright 2018-2024 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package tree

import (
Expand Down Expand Up @@ -30,6 +48,7 @@ func (iw *InotifyWatcher) Watch(path string) {
Events: []inotifywaitgo.EVENT{
inotifywaitgo.CREATE,
inotifywaitgo.MOVED_TO,
inotifywaitgo.MOVED_FROM,
inotifywaitgo.CLOSE_WRITE,
inotifywaitgo.DELETE,
},
Expand All @@ -45,18 +64,18 @@ func (iw *InotifyWatcher) Watch(path string) {
if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) {
continue
}
switch e {
case inotifywaitgo.DELETE:
go func() { _ = iw.tree.HandleFileDelete(event.Filename) }()
case inotifywaitgo.CREATE:
go func() { _ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir, false) }()
case inotifywaitgo.MOVED_TO:
go func() {
_ = iw.tree.Scan(event.Filename, ActionMove, event.IsDir, true)
}()
case inotifywaitgo.CLOSE_WRITE:
go func() { _ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir, true) }()
}
go func() {
switch e {
case inotifywaitgo.DELETE:
_ = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir)
case inotifywaitgo.MOVED_FROM:
_ = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir)
case inotifywaitgo.CREATE, inotifywaitgo.MOVED_TO:
_ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir)
case inotifywaitgo.CLOSE_WRITE:
_ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir)
}
}()
}

case err := <-errors:
Expand Down
27 changes: 20 additions & 7 deletions pkg/storage/fs/posix/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
return errors.Wrap(err, "Decomposedfs: could not move child")
}

// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))

// rename the lock (if it exists)
if _, err := os.Stat(oldNode.LockFilePath()); err == nil {
err = os.Rename(
Expand All @@ -321,11 +327,6 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}

// update the id cache
if newNode.ID == "" {
newNode.ID = oldNode.ID
}
_ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name))
// update id cache for the moved subtree.
if oldNode.IsDir(ctx) {
err = t.WarmupIDCache(filepath.Join(newNode.ParentPath(), newNode.Name), false, false)
Expand All @@ -334,11 +335,23 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node)
}
}

err = t.Propagate(ctx, oldNode, 0)
// the size diff is the current treesize or blobsize of the old/source node
var sizeDiff int64
if oldNode.IsDir(ctx) {
treeSize, err := oldNode.GetTreeSize(ctx)
if err != nil {
return err
}
sizeDiff = int64(treeSize)
} else {
sizeDiff = oldNode.Blobsize
}

err = t.Propagate(ctx, oldNode, -sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate old node")
}
err = t.Propagate(ctx, newNode, 0)
err = t.Propagate(ctx, newNode, sizeDiff)
if err != nil {
return errors.Wrap(err, "Decomposedfs: Move: could not propagate new node")
}
Expand Down

0 comments on commit 2ee6db9

Please sign in to comment.