From 41a359877ec8924065409b90e428d741844bdfc0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= <jfd@butonic.de>
Date: Tue, 25 Apr 2023 13:09:49 +0200
Subject: [PATCH] decomposedfs: write metadata once
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
---
 changelog/unreleased/write-metadata-once.md   |  5 ++
 .../utils/decomposedfs/decomposedfs.go        | 11 ----
 pkg/storage/utils/decomposedfs/node/node.go   | 54 ++++++-------------
 .../utils/decomposedfs/node/node_test.go      |  2 +-
 pkg/storage/utils/decomposedfs/spaces.go      | 33 ++++++------
 .../utils/decomposedfs/testhelpers/helpers.go |  2 +-
 pkg/storage/utils/decomposedfs/tree/tree.go   | 21 ++++----
 7 files changed, 48 insertions(+), 80 deletions(-)
 create mode 100644 changelog/unreleased/write-metadata-once.md

diff --git a/changelog/unreleased/write-metadata-once.md b/changelog/unreleased/write-metadata-once.md
new file mode 100644
index 00000000000..b11e7a85402
--- /dev/null
+++ b/changelog/unreleased/write-metadata-once.md
@@ -0,0 +1,5 @@
+Bugfix: Write Metadata once
+
+decomposedfs now aggregates metadata when creating directories and spaces into a single write.
+
+https://github.com/cs3org/reva/pull/3816
diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go
index 012b7af852f..fc051372ea0 100644
--- a/pkg/storage/utils/decomposedfs/decomposedfs.go
+++ b/pkg/storage/utils/decomposedfs/decomposedfs.go
@@ -48,7 +48,6 @@ import (
 	"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
 	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
 	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
-	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
 	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
 	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
 	"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
@@ -589,16 +588,6 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)
 		return
 	}
 
-	if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
-		// mark the home node as the end of propagation
-		if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
-			appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")
-
-			// FIXME: This does not return an error at all, but results in a severe situation that the
-			// part tree is not marked for propagation
-			return
-		}
-	}
 	return
 }
 
diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go
index ea682ab22e3..c802ebfa1ec 100644
--- a/pkg/storage/utils/decomposedfs/node/node.go
+++ b/pkg/storage/utils/decomposedfs/node/node.go
@@ -164,48 +164,22 @@ func (n *Node) SetType(t provider.ResourceType) {
 	n.nodeType = &t
 }
 
-// ChangeOwner sets the owner of n to newOwner
-func (n *Node) ChangeOwner(new *userpb.UserId) (err error) {
-	n.SpaceRoot.owner = new
-
-	attribs := Attributes{}
-	attribs.SetString(prefixes.OwnerIDAttr, new.OpaqueId)
-	attribs.SetString(prefixes.OwnerIDPAttr, new.Idp)
-	attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(new.Type))
-
-	if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
-		return err
-	}
-
-	return
-}
-
-// WriteAllNodeMetadata writes the Node metadata to disk
-func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) {
+// NodeMetadata writes the Node metadata to disk and allows passing additional attributes
+func (n *Node) NodeMetadata() Attributes {
 	attribs := Attributes{}
 	attribs.SetInt64(prefixes.TypeAttr, int64(n.Type()))
 	attribs.SetString(prefixes.ParentidAttr, n.ParentID)
 	attribs.SetString(prefixes.NameAttr, n.Name)
-	attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
-	attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)
-
-	return n.SetXattrs(attribs, true)
+	if n.Type() == provider.ResourceType_RESOURCE_TYPE_FILE {
+		attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
+		attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)
+	}
+	return attribs
 }
 
-// WriteOwner writes the space owner
-func (n *Node) WriteOwner(owner *userpb.UserId) error {
+// SetOwner sets the space owner on the node
+func (n *Node) SetOwner(owner *userpb.UserId) {
 	n.SpaceRoot.owner = owner
-
-	attribs := Attributes{}
-	attribs.SetString(prefixes.OwnerIDAttr, owner.OpaqueId)
-	attribs.SetString(prefixes.OwnerIDPAttr, owner.Idp)
-	attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(owner.Type))
-
-	if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
-		return err
-	}
-	n.SpaceRoot.owner = owner
-	return nil
 }
 
 // SpaceOwnerOrManager returns the space owner of the space. If no owner is set
@@ -354,11 +328,13 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
 
 	if revisionSuffix == "" {
 		n.BlobID = attrs.String(prefixes.BlobIDAttr)
-		blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
-		if err != nil {
-			return nil, err
+		if n.BlobID != "" {
+			blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
+			if err != nil {
+				return nil, err
+			}
+			n.Blobsize = blobSize
 		}
-		n.Blobsize = blobSize
 	} else {
 		n.BlobID, err = lu.ReadBlobIDAttr(nodePath + revisionSuffix)
 		if err != nil {
diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go
index 81172f5d429..b0e76027c30 100644
--- a/pkg/storage/utils/decomposedfs/node/node_test.go
+++ b/pkg/storage/utils/decomposedfs/node/node_test.go
@@ -94,7 +94,7 @@ var _ = Describe("Node", func() {
 			n.BlobID = "TestBlobID"
 			n.Blobsize = blobsize
 
-			err = n.WriteAllNodeMetadata(env.Ctx)
+			n.SetXattrs(n.NodeMetadata(), true)
 			Expect(err).ToNot(HaveOccurred())
 			n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
 			Expect(err).ToNot(HaveOccurred())
diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go
index 96a8841324f..1b3245917be 100644
--- a/pkg/storage/utils/decomposedfs/spaces.go
+++ b/pkg/storage/utils/decomposedfs/spaces.go
@@ -101,28 +101,18 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
 		return nil, errors.Wrap(err, "Decomposedfs: error creating node")
 	}
 
-	if err := root.WriteAllNodeMetadata(ctx); err != nil {
-		return nil, err
-	}
-	var owner *userv1beta1.UserId
 	if req.GetOwner() != nil && req.GetOwner().GetId() != nil {
-		owner = req.GetOwner().GetId()
+		root.SetOwner(req.GetOwner().GetId())
 	} else {
-		owner = &userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER}
-	}
-	if err := root.WriteOwner(owner); err != nil {
-		return nil, err
-	}
-
-	err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
-	if err != nil {
-		return nil, err
+		root.SetOwner(&userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER})
 	}
 
-	metadata := make(node.Attributes, 6)
+	metadata := node.Attributes{}
+	metadata.SetString(prefixes.OwnerIDAttr, root.Owner().GetOpaqueId())
+	metadata.SetString(prefixes.OwnerIDPAttr, root.Owner().GetIdp())
+	metadata.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(root.Owner().GetType()))
 
-	// always enable propagation on the storage space root
-	// mark the space root node as the end of propagation
+	// always mark the space root node as the end of propagation
 	metadata.SetString(prefixes.PropagationAttr, "1")
 	metadata.SetString(prefixes.NameAttr, req.Name)
 	metadata.SetString(prefixes.SpaceNameAttr, req.Name)
@@ -151,14 +141,20 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
 		metadata.SetString(prefixes.SpaceAliasAttr, alias)
 	}
 
+	// Write node
 	if err := root.SetXattrs(metadata, true); err != nil {
 		return nil, err
 	}
 
+	// Write index
+	err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
+	if err != nil {
+		return nil, err
+	}
+
 	ctx = context.WithValue(ctx, utils.SpaceGrant, struct{ SpaceType string }{SpaceType: req.Type})
 
 	if req.Type != _spaceTypePersonal {
-		u := ctxpkg.ContextMustGetUser(ctx)
 		if err := fs.AddGrant(ctx, &provider.Reference{
 			ResourceId: &provider.ResourceId{
 				SpaceId:  spaceID,
@@ -639,6 +635,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
 		}
 
 		// FIXME remove space blobs
+		// FIXME invalidate cache
 
 		return nil
 	}
diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go
index 25f9c7e02b6..adef14791aa 100644
--- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go
+++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go
@@ -231,7 +231,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz
 	if err != nil {
 		return nil, err
 	}
-	err = n.WriteAllNodeMetadata(context.Background())
+	err = n.SetXattrs(n.NodeMetadata(), true)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go
index 96ecf1e8634..ed2d8a11227 100644
--- a/pkg/storage/utils/decomposedfs/tree/tree.go
+++ b/pkg/storage/utils/decomposedfs/tree/tree.go
@@ -151,15 +151,15 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool)
 		return errors.Wrap(err, "Decomposedfs: error creating node")
 	}
 
-	err = n.WriteAllNodeMetadata(ctx)
+	attributes := n.NodeMetadata()
+	if markprocessing {
+		attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
+	}
+	err = n.SetXattrs(attributes, true)
 	if err != nil {
 		return err
 	}
 
-	if markprocessing {
-		_ = n.SetXattr(prefixes.StatusPrefix, []byte(node.ProcessingStatus))
-	}
-
 	// link child name to parent if it is new
 	childNameLink := filepath.Join(n.ParentPath(), n.Name)
 	var link string
@@ -196,10 +196,6 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
 		return
 	}
 
-	if err := n.SetTreeSize(0); err != nil {
-		return err
-	}
-
 	// make child appear in listings
 	relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
 	err = os.Symlink(relativeNodePath, filepath.Join(n.ParentPath(), n.Name))
@@ -907,7 +903,12 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
 		return errors.Wrap(err, "Decomposedfs: error creating node")
 	}
 
-	return n.WriteAllNodeMetadata(ctx)
+	attributes := n.NodeMetadata()
+	attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
+	if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
+		attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
+	}
+	return n.SetXattrs(attributes, true)
 }
 
 var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)