diff --git a/changelog/unreleased/write-metadata-once.md b/changelog/unreleased/write-metadata-once.md new file mode 100644 index 00000000000..b11e7a85402 --- /dev/null +++ b/changelog/unreleased/write-metadata-once.md @@ -0,0 +1,5 @@ +Bugfix: Write Metadata once + +decomposedfs now aggregates metadata when creating directories and spaces into a single write. + +https://github.com/cs3org/reva/pull/3816 diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 012b7af852f..fc051372ea0 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -48,7 +48,6 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" - "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" @@ -589,16 +588,6 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference) return } - if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting { - // mark the home node as the end of propagation - if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil { - appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate") - - // FIXME: This does not return an error at all, but results in a severe situation that the - // part tree is not marked for propagation - return - } - } return } diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index ea682ab22e3..c802ebfa1ec 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -164,48 +164,22 @@ func (n *Node) SetType(t provider.ResourceType) { n.nodeType = &t } -// ChangeOwner sets the owner of n to newOwner -func (n *Node) ChangeOwner(new *userpb.UserId) (err error) { - n.SpaceRoot.owner = new - - attribs := Attributes{} - attribs.SetString(prefixes.OwnerIDAttr, new.OpaqueId) - attribs.SetString(prefixes.OwnerIDPAttr, new.Idp) - attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(new.Type)) - - if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil { - return err - } - - return -} - -// WriteAllNodeMetadata writes the Node metadata to disk -func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) { +// NodeMetadata writes the Node metadata to disk and allows passing additional attributes +func (n *Node) NodeMetadata() Attributes { attribs := Attributes{} attribs.SetInt64(prefixes.TypeAttr, int64(n.Type())) attribs.SetString(prefixes.ParentidAttr, n.ParentID) attribs.SetString(prefixes.NameAttr, n.Name) - attribs.SetString(prefixes.BlobIDAttr, n.BlobID) - attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize) - - return n.SetXattrs(attribs, true) + if n.Type() == provider.ResourceType_RESOURCE_TYPE_FILE { + attribs.SetString(prefixes.BlobIDAttr, n.BlobID) + attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize) + } + return attribs } -// WriteOwner writes the space owner -func (n *Node) WriteOwner(owner *userpb.UserId) error { +// SetOwner sets the space owner on the node +func (n *Node) SetOwner(owner *userpb.UserId) { n.SpaceRoot.owner = owner - - attribs := Attributes{} - attribs.SetString(prefixes.OwnerIDAttr, owner.OpaqueId) - attribs.SetString(prefixes.OwnerIDPAttr, owner.Idp) - attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(owner.Type)) - - if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil { - return err - } - n.SpaceRoot.owner = owner - return nil } // SpaceOwnerOrManager returns the space owner of the space. If no owner is set @@ -354,11 +328,13 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis if revisionSuffix == "" { n.BlobID = attrs.String(prefixes.BlobIDAttr) - blobSize, err := attrs.Int64(prefixes.BlobsizeAttr) - if err != nil { - return nil, err + if n.BlobID != "" { + blobSize, err := attrs.Int64(prefixes.BlobsizeAttr) + if err != nil { + return nil, err + } + n.Blobsize = blobSize } - n.Blobsize = blobSize } else { n.BlobID, err = lu.ReadBlobIDAttr(nodePath + revisionSuffix) if err != nil { diff --git a/pkg/storage/utils/decomposedfs/node/node_test.go b/pkg/storage/utils/decomposedfs/node/node_test.go index 81172f5d429..b0e76027c30 100644 --- a/pkg/storage/utils/decomposedfs/node/node_test.go +++ b/pkg/storage/utils/decomposedfs/node/node_test.go @@ -94,7 +94,7 @@ var _ = Describe("Node", func() { n.BlobID = "TestBlobID" n.Blobsize = blobsize - err = n.WriteAllNodeMetadata(env.Ctx) + n.SetXattrs(n.NodeMetadata(), true) Expect(err).ToNot(HaveOccurred()) n2, err := env.Lookup.NodeFromResource(env.Ctx, ref) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index 96a8841324f..1b3245917be 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -101,28 +101,18 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr return nil, errors.Wrap(err, "Decomposedfs: error creating node") } - if err := root.WriteAllNodeMetadata(ctx); err != nil { - return nil, err - } - var owner *userv1beta1.UserId if req.GetOwner() != nil && req.GetOwner().GetId() != nil { - owner = req.GetOwner().GetId() + root.SetOwner(req.GetOwner().GetId()) } else { - owner = &userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER} - } - if err := root.WriteOwner(owner); err != nil { - return nil, err - } - - err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID) - if err != nil { - return nil, err + root.SetOwner(&userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER}) } - metadata := make(node.Attributes, 6) + metadata := node.Attributes{} + metadata.SetString(prefixes.OwnerIDAttr, root.Owner().GetOpaqueId()) + metadata.SetString(prefixes.OwnerIDPAttr, root.Owner().GetIdp()) + metadata.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(root.Owner().GetType())) - // always enable propagation on the storage space root - // mark the space root node as the end of propagation + // always mark the space root node as the end of propagation metadata.SetString(prefixes.PropagationAttr, "1") metadata.SetString(prefixes.NameAttr, req.Name) metadata.SetString(prefixes.SpaceNameAttr, req.Name) @@ -151,14 +141,20 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr metadata.SetString(prefixes.SpaceAliasAttr, alias) } + // Write node if err := root.SetXattrs(metadata, true); err != nil { return nil, err } + // Write index + err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID) + if err != nil { + return nil, err + } + ctx = context.WithValue(ctx, utils.SpaceGrant, struct{ SpaceType string }{SpaceType: req.Type}) if req.Type != _spaceTypePersonal { - u := ctxpkg.ContextMustGetUser(ctx) if err := fs.AddGrant(ctx, &provider.Reference{ ResourceId: &provider.ResourceId{ SpaceId: spaceID, @@ -639,6 +635,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De } // FIXME remove space blobs + // FIXME invalidate cache return nil } diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 25f9c7e02b6..adef14791aa 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -231,7 +231,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz if err != nil { return nil, err } - err = n.WriteAllNodeMetadata(context.Background()) + err = n.SetXattrs(n.NodeMetadata(), true) if err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 96ecf1e8634..ed2d8a11227 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -151,15 +151,15 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool) return errors.Wrap(err, "Decomposedfs: error creating node") } - err = n.WriteAllNodeMetadata(ctx) + attributes := n.NodeMetadata() + if markprocessing { + attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus) + } + err = n.SetXattrs(attributes, true) if err != nil { return err } - if markprocessing { - _ = n.SetXattr(prefixes.StatusPrefix, []byte(node.ProcessingStatus)) - } - // link child name to parent if it is new childNameLink := filepath.Join(n.ParentPath(), n.Name) var link string @@ -196,10 +196,6 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) { return } - if err := n.SetTreeSize(0); err != nil { - return err - } - // make child appear in listings relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2)) err = os.Symlink(relativeNodePath, filepath.Join(n.ParentPath(), n.Name)) @@ -907,7 +903,12 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) { return errors.Wrap(err, "Decomposedfs: error creating node") } - return n.WriteAllNodeMetadata(ctx) + attributes := n.NodeMetadata() + attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0? + if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting { + attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation + } + return n.SetXattrs(attributes, true) } var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)