From 5a77df5b831a4dd94c00d5b32287de51d64b0b8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 14 Nov 2024 10:20:25 +0100 Subject: [PATCH 1/8] Improve error logging --- pkg/storage/fs/posix/posix.go | 3 +- pkg/storage/fs/posix/trashbin/trashbin.go | 22 ++++--- pkg/storage/fs/posix/tree/assimilation.go | 60 ++++++++++++++----- .../posix/tree/gpfsfilauditloggingwatcher.go | 36 ++++++----- .../fs/posix/tree/gpfswatchfolderwatcher.go | 23 ++++--- pkg/storage/fs/posix/tree/inotifywatcher.go | 17 ++++-- pkg/storage/fs/posix/tree/tree.go | 28 ++++++--- 7 files changed, 131 insertions(+), 58 deletions(-) diff --git a/pkg/storage/fs/posix/posix.go b/pkg/storage/fs/posix/posix.go index 340f89b1fe..ef3bc4a336 100644 --- a/pkg/storage/fs/posix/posix.go +++ b/pkg/storage/fs/posix/posix.go @@ -32,6 +32,7 @@ import ( microstore "go-micro.dev/v4/store" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore" @@ -84,7 +85,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend) } - trashbin, err := trashbin.New(o, lu) + trashbin, err := trashbin.New(o, lu, logger.New()) if err != nil { return nil, err } diff --git a/pkg/storage/fs/posix/trashbin/trashbin.go b/pkg/storage/fs/posix/trashbin/trashbin.go index f0fed08e16..02c89c6644 100644 --- a/pkg/storage/fs/posix/trashbin/trashbin.go +++ b/pkg/storage/fs/posix/trashbin/trashbin.go @@ -26,6 +26,9 @@ import ( "strings" "time" + "github.com/google/uuid" + "github.com/rs/zerolog" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/v2/pkg/storage" @@ -34,13 +37,13 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/utils" - "github.com/google/uuid" ) type Trashbin struct { - fs storage.FS - o *options.Options - lu *lookup.Lookup + fs storage.FS + o *options.Options + lu *lookup.Lookup + log *zerolog.Logger } const ( @@ -49,10 +52,11 @@ const ( ) // New returns a new Trashbin -func New(o *options.Options, lu *lookup.Lookup) (*Trashbin, error) { +func New(o *options.Options, lu *lookup.Lookup, log *zerolog.Logger) (*Trashbin, error) { return &Trashbin{ - o: o, - lu: lu, + o: o, + lu: lu, + log: log, }, nil } @@ -261,7 +265,9 @@ func (tb *Trashbin) RestoreRecycleItem(ctx context.Context, ref *provider.Refere if err != nil { return err } - _ = tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath) + if err := tb.lu.CacheID(ctx, n.SpaceID, string(id), restorePath); err != nil { + tb.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", string(id)).Str("path", restorePath).Msg("trashbin: error caching id") + } // cleanup trash info if relativePath == "." || relativePath == "/" { diff --git a/pkg/storage/fs/posix/tree/assimilation.go b/pkg/storage/fs/posix/tree/assimilation.go index 687eb5ca47..ac80e0d125 100644 --- a/pkg/storage/fs/posix/tree/assimilation.go +++ b/pkg/storage/fs/posix/tree/assimilation.go @@ -227,7 +227,11 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }() case ActionDelete: - _ = t.HandleFileDelete(path) + err := t.HandleFileDelete(path) + if err != nil { + return err + } + // 7. Deleted file or directory // -> update parent and all children t.scanDebouncer.Debounce(scanItem{ @@ -242,8 +246,12 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { func (t *Tree) HandleFileDelete(path string) error { // purge metadata - _ = t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path) - _ = t.lookup.MetadataBackend().Purge(context.Background(), path) + if err := t.lookup.(*lookup.Lookup).IDCache.DeleteByPath(context.Background(), path); err != nil { + t.log.Error().Err(err).Str("path", path).Msg("could not delete id cache entry by path") + } + if err := t.lookup.MetadataBackend().Purge(context.Background(), path); err != nil { + t.log.Error().Err(err).Str("path", path).Msg("could not purge metadata") + } // send event owner, spaceID, nodeID, parentID, err := t.getOwnerAndIDs(filepath.Dir(path)) @@ -371,21 +379,29 @@ func (t *Tree) assimilate(item scanItem) error { if err == nil { // this id clashes with an existing id -> clear metadata and re-assimilate - _ = t.lookup.MetadataBackend().Purge(context.Background(), item.Path) + if err := t.lookup.MetadataBackend().Purge(context.Background(), item.Path); err != nil { + t.log.Error().Err(err).Str("path", item.Path).Msg("could not purge metadata") + } go func() { _ = t.assimilate(scanItem{Path: item.Path, ForceRescan: true}) }() } else { // this is a move - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path) + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil { + t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id") + } _, err := t.updateFile(item.Path, string(id), spaceID) if err != nil { return err } // purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair. - _ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath) - _ = t.lookup.MetadataBackend().Purge(context.Background(), previousPath) + if err := t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath); err != nil { + t.log.Error().Err(err).Str("path", previousPath).Msg("could not delete id cache entry by path") + } + if err := t.lookup.MetadataBackend().Purge(context.Background(), previousPath); err != nil { + t.log.Error().Err(err).Str("path", previousPath).Msg("could not purge metadata") + } fi, err := os.Stat(item.Path) if err != nil { @@ -393,7 +409,11 @@ func (t *Tree) assimilate(item scanItem) error { } if fi.IsDir() { // if it was moved and it is a directory we need to propagate the move - go func() { _ = t.WarmupIDCache(item.Path, false, true) }() + go func() { + if err := t.WarmupIDCache(item.Path, false, true); err != nil { + t.log.Error().Err(err).Str("path", item.Path).Msg("could not warmup id cache") + } + }() } parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr) @@ -426,7 +446,9 @@ func (t *Tree) assimilate(item scanItem) error { } } else { // This item had already been assimilated in the past. Update the path - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path) + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil { + t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id") + } _, err := t.updateFile(item.Path, string(id), spaceID) if err != nil { @@ -555,7 +577,9 @@ assimilate: return nil, errors.Wrap(err, "failed to set attributes") } - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path) + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, id, path); err != nil { + t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", id).Str("path", path).Msg("could not cache id") + } return fi, nil } @@ -654,10 +678,14 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error { _ = t.assimilate(scanItem{Path: path, ForceRescan: true}) } } - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path) + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path); err != nil { + t.log.Error().Err(err).Str("spaceID", string(spaceID)).Str("id", string(id)).Str("path", path).Msg("could not cache id") + } } } else if assimilate { - _ = t.assimilate(scanItem{Path: path, ForceRescan: true}) + if err := t.assimilate(scanItem{Path: path, ForceRescan: true}); err != nil { + t.log.Error().Err(err).Str("path", path).Msg("could not assimilate item") + } } return t.setDirty(path, false) }) @@ -665,9 +693,13 @@ func (t *Tree) WarmupIDCache(root string, assimilate, onlyDirty bool) error { for dir, size := range sizes { if dir == root { // Propagate the size diff further up the tree - _ = t.propagateSizeDiff(dir, size) + if err := t.propagateSizeDiff(dir, size); err != nil { + t.log.Error().Err(err).Str("path", dir).Msg("could not propagate size diff") + } + } + if err := t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size))); err != nil { + t.log.Error().Err(err).Str("path", dir).Int64("size", size).Msg("could not set tree size") } - _ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size))) } if err != nil { diff --git a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go index ead3cf398f..2822702bea 100644 --- a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go +++ b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go @@ -25,10 +25,13 @@ import ( "os" "strconv" "time" + + "github.com/rs/zerolog" ) type GpfsFileAuditLoggingWatcher struct { tree *Tree + log *zerolog.Logger } type lwe struct { @@ -37,9 +40,10 @@ type lwe struct { BytesWritten string } -func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string) (*GpfsFileAuditLoggingWatcher, error) { +func NewGpfsFileAuditLoggingWatcher(tree *Tree, auditLogFile string, log *zerolog.Logger) (*GpfsFileAuditLoggingWatcher, error) { w := &GpfsFileAuditLoggingWatcher{ tree: tree, + log: log, } _, err := os.Stat(auditLogFile) @@ -80,20 +84,24 @@ start: if isLockFile(ev.Path) || isTrash(ev.Path) || w.tree.isUpload(ev.Path) { continue } - switch ev.Event { - case "CREATE": - go func() { _ = w.tree.Scan(ev.Path, ActionCreate, false) }() - case "CLOSE": - bytesWritten, err := strconv.Atoi(ev.BytesWritten) - if err == nil && bytesWritten > 0 { - go func() { _ = w.tree.Scan(ev.Path, ActionUpdate, false) }() + go func() { + switch ev.Event { + case "CREATE": + err = w.tree.Scan(ev.Path, ActionCreate, false) + case "CLOSE": + bytesWritten, err := strconv.Atoi(ev.BytesWritten) + if err == nil && bytesWritten > 0 { + err = w.tree.Scan(ev.Path, ActionUpdate, false) + } + case "RENAME": + err = w.tree.Scan(ev.Path, ActionMove, false) + if warmupErr := w.tree.WarmupIDCache(ev.Path, false, false); warmupErr != nil { + w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache") + } } - case "RENAME": - go func() { - _ = w.tree.Scan(ev.Path, ActionMove, false) - _ = w.tree.WarmupIDCache(ev.Path, false, false) - }() - } + }() + w.log.Error().Err(err).Str("path", ev.Path).Msg("error scanning file") + case io.EOF: time.Sleep(1 * time.Second) default: diff --git a/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go b/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go index 5a0fb6a4bd..5d05bcd11c 100644 --- a/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go +++ b/pkg/storage/fs/posix/tree/gpfswatchfolderwatcher.go @@ -25,18 +25,21 @@ import ( "strconv" "strings" + "github.com/rs/zerolog" kafka "github.com/segmentio/kafka-go" ) type GpfsWatchFolderWatcher struct { tree *Tree brokers []string + log *zerolog.Logger } -func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string) (*GpfsWatchFolderWatcher, error) { +func NewGpfsWatchFolderWatcher(tree *Tree, kafkaBrokers []string, log *zerolog.Logger) (*GpfsWatchFolderWatcher, error) { return &GpfsWatchFolderWatcher{ tree: tree, brokers: kafkaBrokers, + log: log, }, nil } @@ -66,23 +69,27 @@ func (w *GpfsWatchFolderWatcher) Watch(topic string) { go func() { isDir := strings.Contains(lwev.Event, "IN_ISDIR") + var err error switch { case strings.Contains(lwev.Event, "IN_DELETE"): - _ = w.tree.Scan(lwev.Path, ActionDelete, isDir) + err = w.tree.Scan(lwev.Path, ActionDelete, isDir) case strings.Contains(lwev.Event, "IN_MOVE_FROM"): - _ = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir) + err = w.tree.Scan(lwev.Path, ActionMoveFrom, isDir) case strings.Contains(lwev.Event, "IN_CREATE"): - _ = w.tree.Scan(lwev.Path, ActionCreate, isDir) + err = w.tree.Scan(lwev.Path, ActionCreate, isDir) case strings.Contains(lwev.Event, "IN_CLOSE_WRITE"): - bytesWritten, err := strconv.Atoi(lwev.BytesWritten) - if err == nil && bytesWritten > 0 { - _ = w.tree.Scan(lwev.Path, ActionUpdate, isDir) + bytesWritten, convErr := strconv.Atoi(lwev.BytesWritten) + if convErr == nil && bytesWritten > 0 { + err = w.tree.Scan(lwev.Path, ActionUpdate, isDir) } case strings.Contains(lwev.Event, "IN_MOVED_TO"): - _ = w.tree.Scan(lwev.Path, ActionMove, isDir) + err = w.tree.Scan(lwev.Path, ActionMove, isDir) + } + if err != nil { + w.log.Error().Err(err).Str("path", lwev.Path).Msg("error scanning path") } }() } diff --git a/pkg/storage/fs/posix/tree/inotifywatcher.go b/pkg/storage/fs/posix/tree/inotifywatcher.go index 54b7157e0f..c3045f1b4f 100644 --- a/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -22,15 +22,18 @@ import ( "fmt" "github.com/pablodz/inotifywaitgo/inotifywaitgo" + "github.com/rs/zerolog" ) type InotifyWatcher struct { tree *Tree + log *zerolog.Logger } -func NewInotifyWatcher(tree *Tree) *InotifyWatcher { +func NewInotifyWatcher(tree *Tree, log *zerolog.Logger) *InotifyWatcher { return &InotifyWatcher{ tree: tree, + log: log, } } @@ -65,15 +68,19 @@ func (iw *InotifyWatcher) Watch(path string) { continue } go func() { + var err error switch e { case inotifywaitgo.DELETE: - _ = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir) + err = iw.tree.Scan(event.Filename, ActionDelete, event.IsDir) case inotifywaitgo.MOVED_FROM: - _ = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir) + err = iw.tree.Scan(event.Filename, ActionMoveFrom, event.IsDir) case inotifywaitgo.CREATE, inotifywaitgo.MOVED_TO: - _ = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir) + err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir) case inotifywaitgo.CLOSE_WRITE: - _ = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir) + err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir) + } + if err != nil { + iw.log.Error().Err(err).Str("path", event.Filename).Msg("error scanning file") } }() } diff --git a/pkg/storage/fs/posix/tree/tree.go b/pkg/storage/fs/posix/tree/tree.go index 7bf7e34a1b..cbe0f392e2 100644 --- a/pkg/storage/fs/posix/tree/tree.go +++ b/pkg/storage/fs/posix/tree/tree.go @@ -121,17 +121,17 @@ func New(lu node.PathLookup, bs Blobstore, um usermapper.Mapper, trashbin *trash var err error switch o.WatchType { case "gpfswatchfolder": - t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ",")) + t.watcher, err = NewGpfsWatchFolderWatcher(t, strings.Split(o.WatchFolderKafkaBrokers, ","), log) if err != nil { return nil, err } case "gpfsfileauditlogging": - t.watcher, err = NewGpfsFileAuditLoggingWatcher(t, o.WatchPath) + t.watcher, err = NewGpfsFileAuditLoggingWatcher(t, o.WatchPath, log) if err != nil { return nil, err } default: - t.watcher = NewInotifyWatcher(t) + t.watcher = NewInotifyWatcher(t, log) watchPath = o.Root } @@ -213,7 +213,9 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool, n.SetType(provider.ResourceType_RESOURCE_TYPE_FILE) // Set id in cache - _ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), n.SpaceID, n.ID, nodePath) + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), n.SpaceID, n.ID, nodePath); err != nil { + t.log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", nodePath).Msg("could not cache id") + } if err := os.MkdirAll(filepath.Dir(nodePath), 0700); err != nil { return errors.Wrap(err, "Decomposedfs: error creating node") @@ -312,7 +314,9 @@ func (t *Tree) Move(ctx context.Context, oldNode *node.Node, newNode *node.Node) if newNode.ID == "" { newNode.ID = oldNode.ID } - _ = t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name)) + if err := t.lookup.(*lookup.Lookup).CacheID(ctx, newNode.SpaceID, newNode.ID, filepath.Join(newNode.ParentPath(), newNode.Name)); err != nil { + t.log.Error().Err(err).Str("spaceID", newNode.SpaceID).Str("id", newNode.ID).Str("path", filepath.Join(newNode.ParentPath(), newNode.Name)).Msg("could not cache id") + } // rename the lock (if it exists) if _, err := os.Stat(oldNode.LockFilePath()); err == nil { @@ -476,7 +480,11 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) error { } // remove entry from cache immediately to avoid inconsistencies - defer func() { _ = t.idCache.Delete(path) }() + defer func() { + if err := t.idCache.Delete(path); err != nil { + log.Error().Err(err).Str("path", path).Msg("could not delete id from cache") + } + }() if appctx.DeletingSharedResourceFromContext(ctx) { src := filepath.Join(n.ParentPath(), n.Name) @@ -495,7 +503,9 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) error { } // Remove lock file if it exists - _ = os.Remove(n.LockFilePath()) + if err := os.Remove(n.LockFilePath()); err != nil { + log.Error().Err(err).Str("path", n.LockFilePath()).Msg("could not remove lock file") + } err := t.trashbin.MoveToTrash(ctx, n, path) if err != nil { @@ -748,7 +758,9 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { return errors.Wrap(err, "Decomposedfs: error creating node") } - _ = idcache.Set(ctx, n.SpaceID, n.ID, path) + if err := idcache.Set(ctx, n.SpaceID, n.ID, path); err != nil { + log.Error().Err(err).Str("spaceID", n.SpaceID).Str("id", n.ID).Str("path", path).Msg("could not cache id") + } attributes := n.NodeMetadata(ctx) attributes[prefixes.IDAttr] = []byte(n.ID) From dd466eb8ca15f8ecc267a67eb5ab440dd25c0f86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 14 Nov 2024 10:33:08 +0100 Subject: [PATCH 2/8] Log warning for unhandled events --- pkg/storage/fs/posix/tree/inotifywatcher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/storage/fs/posix/tree/inotifywatcher.go b/pkg/storage/fs/posix/tree/inotifywatcher.go index c3045f1b4f..b5d0905f62 100644 --- a/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -78,6 +78,9 @@ func (iw *InotifyWatcher) Watch(path string) { err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir) case inotifywaitgo.CLOSE_WRITE: err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir) + default: + iw.log.Warn().Interface("event", event).Msg("unhandled event") + return } if err != nil { iw.log.Error().Err(err).Str("path", event.Filename).Msg("error scanning file") From 8a09d0c9fdf0b60df9f82e492ee10f0a34df7eb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Thu, 14 Nov 2024 10:53:20 +0100 Subject: [PATCH 3/8] Improve logging --- pkg/storage/fs/posix/tree/assimilation.go | 23 +++++++++++++++---- .../posix/tree/gpfsfilauditloggingwatcher.go | 8 +++++-- pkg/storage/fs/posix/tree/inotifywatcher.go | 6 ++--- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/pkg/storage/fs/posix/tree/assimilation.go b/pkg/storage/fs/posix/tree/assimilation.go index ac80e0d125..bd5e2d3440 100644 --- a/pkg/storage/fs/posix/tree/assimilation.go +++ b/pkg/storage/fs/posix/tree/assimilation.go @@ -165,6 +165,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { // cases: switch action { case ActionCreate: + t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionCreate)") if !isDir { // 1. New file (could be emitted as part of a new directory) // -> assimilate file @@ -197,6 +198,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { } case ActionUpdate: + t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionUpdate)") // 3. Updated file // -> update file unless parent directory is being rescanned if !t.scanDebouncer.InProgress(filepath.Dir(path)) { @@ -207,6 +209,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { } case ActionMove: + t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMove)") // 4. Moved file // -> update file // 5. Moved directory @@ -218,6 +221,7 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { }) case ActionMoveFrom: + t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("scanning path (ActionMoveFrom)") // 6. file/directory moved out of the watched directory // -> update directory if err := t.setDirty(filepath.Dir(path), true); err != nil { @@ -227,13 +231,16 @@ func (t *Tree) Scan(path string, action EventAction, isDir bool) error { go func() { _ = t.WarmupIDCache(filepath.Dir(path), false, true) }() case ActionDelete: + t.log.Debug().Str("path", path).Bool("isDir", isDir).Msg("handling deleted item") + + // 7. Deleted file or directory + // -> update parent and all children + err := t.HandleFileDelete(path) if err != nil { return err } - // 7. Deleted file or directory - // -> update parent and all children t.scanDebouncer.Debounce(scanItem{ Path: filepath.Dir(path), ForceRescan: true, @@ -377,16 +384,21 @@ func (t *Tree) assimilate(item scanItem) error { if ok && len(previousParentID) > 0 && previousPath != item.Path { _, err := os.Stat(previousPath) if err == nil { - // this id clashes with an existing id -> clear metadata and re-assimilate + // this id clashes with an existing item -> clear metadata and re-assimilate + t.log.Debug().Str("path", item.Path).Msg("ID clash detected, purging metadata and re-assimilating") if err := t.lookup.MetadataBackend().Purge(context.Background(), item.Path); err != nil { t.log.Error().Err(err).Str("path", item.Path).Msg("could not purge metadata") } go func() { - _ = t.assimilate(scanItem{Path: item.Path, ForceRescan: true}) + if err := t.assimilate(scanItem{Path: item.Path, ForceRescan: true}); err != nil { + t.log.Error().Err(err).Str("path", item.Path).Msg("could not re-assimilate") + } }() } else { // this is a move + t.log.Debug().Str("path", item.Path).Msg("move detected") + if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil { t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id") } @@ -446,6 +458,7 @@ func (t *Tree) assimilate(item scanItem) error { } } else { // This item had already been assimilated in the past. Update the path + t.log.Debug().Str("path", item.Path).Msg("updating cached path") if err := t.lookup.(*lookup.Lookup).CacheID(context.Background(), spaceID, string(id), item.Path); err != nil { t.log.Error().Err(err).Str("spaceID", spaceID).Str("id", string(id)).Str("path", item.Path).Msg("could not cache id") } @@ -456,6 +469,7 @@ func (t *Tree) assimilate(item scanItem) error { } } } else { + t.log.Debug().Str("path", item.Path).Msg("new item detected") // assimilate new file newId := uuid.New().String() fi, err := t.updateFile(item.Path, newId, spaceID) @@ -572,6 +586,7 @@ assimilate: return nil, errors.Wrap(err, "failed to propagate") } + t.log.Debug().Str("path", path).Interface("attributes", attributes).Msg("setting attributes") err = t.lookup.MetadataBackend().SetMultiple(context.Background(), path, attributes, false) if err != nil { return nil, errors.Wrap(err, "failed to set attributes") diff --git a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go index 2822702bea..681a7910c3 100644 --- a/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go +++ b/pkg/storage/fs/posix/tree/gpfsfilauditloggingwatcher.go @@ -79,6 +79,7 @@ start: case nil: err := json.Unmarshal([]byte(line), ev) if err != nil { + w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line") continue } if isLockFile(ev.Path) || isTrash(ev.Path) || w.tree.isUpload(ev.Path) { @@ -89,7 +90,8 @@ start: case "CREATE": err = w.tree.Scan(ev.Path, ActionCreate, false) case "CLOSE": - bytesWritten, err := strconv.Atoi(ev.BytesWritten) + var bytesWritten int + bytesWritten, err = strconv.Atoi(ev.BytesWritten) if err == nil && bytesWritten > 0 { err = w.tree.Scan(ev.Path, ActionUpdate, false) } @@ -99,8 +101,10 @@ start: w.log.Error().Err(warmupErr).Str("path", ev.Path).Msg("error warming up id cache") } } + if err != nil { + w.log.Error().Err(err).Str("line", line).Msg("error unmarshalling line") + } }() - w.log.Error().Err(err).Str("path", ev.Path).Msg("error scanning file") case io.EOF: time.Sleep(1 * time.Second) diff --git a/pkg/storage/fs/posix/tree/inotifywatcher.go b/pkg/storage/fs/posix/tree/inotifywatcher.go index b5d0905f62..dc578ad457 100644 --- a/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -63,10 +63,10 @@ func (iw *InotifyWatcher) Watch(path string) { for { select { case event := <-events: + if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) { + continue + } for _, e := range event.Events { - if isLockFile(event.Filename) || isTrash(event.Filename) || iw.tree.isUpload(event.Filename) { - continue - } go func() { var err error switch e { From a02b8d543dbb8cf1b0105d0f11ed0eca884295fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Mon, 18 Nov 2024 08:34:34 +0100 Subject: [PATCH 4/8] Handle (ignore) CLOSE events --- pkg/storage/fs/posix/tree/inotifywatcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/fs/posix/tree/inotifywatcher.go b/pkg/storage/fs/posix/tree/inotifywatcher.go index dc578ad457..660cdbdacd 100644 --- a/pkg/storage/fs/posix/tree/inotifywatcher.go +++ b/pkg/storage/fs/posix/tree/inotifywatcher.go @@ -78,6 +78,8 @@ func (iw *InotifyWatcher) Watch(path string) { err = iw.tree.Scan(event.Filename, ActionCreate, event.IsDir) case inotifywaitgo.CLOSE_WRITE: err = iw.tree.Scan(event.Filename, ActionUpdate, event.IsDir) + case inotifywaitgo.CLOSE: + // ignore, already handled by CLOSE_WRITE default: iw.log.Warn().Interface("event", event).Msg("unhandled event") return From 82dd87bf9e9eb498de932eef45d60f1999acd2fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 19 Nov 2024 09:14:48 +0100 Subject: [PATCH 5/8] Pass on initialized logger --- pkg/storage/fs/posix/posix.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/fs/posix/posix.go b/pkg/storage/fs/posix/posix.go index ef3bc4a336..e6faca47b1 100644 --- a/pkg/storage/fs/posix/posix.go +++ b/pkg/storage/fs/posix/posix.go @@ -32,7 +32,6 @@ import ( microstore "go-micro.dev/v4/store" "github.com/cs3org/reva/v2/pkg/events" - "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/fs/posix/blobstore" @@ -85,7 +84,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend) } - trashbin, err := trashbin.New(o, lu, logger.New()) + trashbin, err := trashbin.New(o, lu, log) if err != nil { return nil, err } From 201d555d015c4ab1559e6ecd75e64cfc1471e006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 19 Nov 2024 09:20:11 +0100 Subject: [PATCH 6/8] Fix test --- pkg/storage/fs/posix/testhelpers/helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/fs/posix/testhelpers/helpers.go b/pkg/storage/fs/posix/testhelpers/helpers.go index 4cf8c8df70..2dff5e2807 100644 --- a/pkg/storage/fs/posix/testhelpers/helpers.go +++ b/pkg/storage/fs/posix/testhelpers/helpers.go @@ -182,7 +182,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { if err != nil { return nil, err } - tb, err := trashbin.New(o, lu) + tb, err := trashbin.New(o, lu, nil) if err != nil { return nil, err } From e8d50bab7aad2643899c9786261a81bdd7eec5c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 19 Nov 2024 09:21:16 +0100 Subject: [PATCH 7/8] Add changelog --- changelog/unreleased/improve-posixfs-logging.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/improve-posixfs-logging.md diff --git a/changelog/unreleased/improve-posixfs-logging.md b/changelog/unreleased/improve-posixfs-logging.md new file mode 100644 index 0000000000..3e4ac64004 --- /dev/null +++ b/changelog/unreleased/improve-posixfs-logging.md @@ -0,0 +1,5 @@ +Enhancement: Improve posixfs error handling and logging + +We improved error handling and logging in the posixfs storage driver. + +https://github.com/cs3org/reva/pull/4956 From 06123ec688fccdce90c093a7456a84f48bb153bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 19 Nov 2024 09:34:59 +0100 Subject: [PATCH 8/8] Fix tests --- pkg/storage/fs/posix/testhelpers/helpers.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/storage/fs/posix/testhelpers/helpers.go b/pkg/storage/fs/posix/testhelpers/helpers.go index 2dff5e2807..0f00229765 100644 --- a/pkg/storage/fs/posix/testhelpers/helpers.go +++ b/pkg/storage/fs/posix/testhelpers/helpers.go @@ -25,6 +25,7 @@ import ( "path/filepath" "github.com/google/uuid" + "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "google.golang.org/grpc" @@ -177,12 +178,14 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { }, ) + logger := zerolog.New(os.Stderr).With().Logger() + bs := &treemocks.Blobstore{} - tree, err := tree.New(lu, bs, um, &trashbin.Trashbin{}, o, nil, store.Create(), nil) + tree, err := tree.New(lu, bs, um, &trashbin.Trashbin{}, o, nil, store.Create(), &logger) if err != nil { return nil, err } - tb, err := trashbin.New(o, lu, nil) + tb, err := trashbin.New(o, lu, &logger) if err != nil { return nil, err }