Skip to content

Commit

Permalink
decomposedfs: write metadata once
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Apr 25, 2023
1 parent ee89817 commit 41a3598
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 80 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/write-metadata-once.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Write Metadata once

decomposedfs now aggregates metadata when creating directories and spaces into a single write.

https://github.com/cs3org/reva/pull/3816
11 changes: 0 additions & 11 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/migrator"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
Expand Down Expand Up @@ -589,16 +588,6 @@ func (fs *Decomposedfs) CreateDir(ctx context.Context, ref *provider.Reference)
return
}

if fs.o.TreeTimeAccounting || fs.o.TreeSizeAccounting {
// mark the home node as the end of propagation
if err = n.SetXattrString(prefixes.PropagationAttr, "1"); err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate")

// FIXME: This does not return an error at all, but results in a severe situation that the
// part tree is not marked for propagation
return
}
}
return
}

Expand Down
54 changes: 15 additions & 39 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,48 +164,22 @@ func (n *Node) SetType(t provider.ResourceType) {
n.nodeType = &t
}

// ChangeOwner sets the owner of n to newOwner
func (n *Node) ChangeOwner(new *userpb.UserId) (err error) {
n.SpaceRoot.owner = new

attribs := Attributes{}
attribs.SetString(prefixes.OwnerIDAttr, new.OpaqueId)
attribs.SetString(prefixes.OwnerIDPAttr, new.Idp)
attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(new.Type))

if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
return err
}

return
}

// WriteAllNodeMetadata writes the Node metadata to disk
func (n *Node) WriteAllNodeMetadata(ctx context.Context) (err error) {
// NodeMetadata writes the Node metadata to disk and allows passing additional attributes
func (n *Node) NodeMetadata() Attributes {
attribs := Attributes{}
attribs.SetInt64(prefixes.TypeAttr, int64(n.Type()))
attribs.SetString(prefixes.ParentidAttr, n.ParentID)
attribs.SetString(prefixes.NameAttr, n.Name)
attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)

return n.SetXattrs(attribs, true)
if n.Type() == provider.ResourceType_RESOURCE_TYPE_FILE {
attribs.SetString(prefixes.BlobIDAttr, n.BlobID)
attribs.SetInt64(prefixes.BlobsizeAttr, n.Blobsize)
}
return attribs
}

// WriteOwner writes the space owner
func (n *Node) WriteOwner(owner *userpb.UserId) error {
// SetOwner sets the space owner on the node
func (n *Node) SetOwner(owner *userpb.UserId) {
n.SpaceRoot.owner = owner

attribs := Attributes{}
attribs.SetString(prefixes.OwnerIDAttr, owner.OpaqueId)
attribs.SetString(prefixes.OwnerIDPAttr, owner.Idp)
attribs.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(owner.Type))

if err := n.SpaceRoot.SetXattrs(attribs, true); err != nil {
return err
}
n.SpaceRoot.owner = owner
return nil
}

// SpaceOwnerOrManager returns the space owner of the space. If no owner is set
Expand Down Expand Up @@ -354,11 +328,13 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis

if revisionSuffix == "" {
n.BlobID = attrs.String(prefixes.BlobIDAttr)
blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
if n.BlobID != "" {
blobSize, err := attrs.Int64(prefixes.BlobsizeAttr)
if err != nil {
return nil, err
}
n.Blobsize = blobSize
}
n.Blobsize = blobSize
} else {
n.BlobID, err = lu.ReadBlobIDAttr(nodePath + revisionSuffix)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ var _ = Describe("Node", func() {
n.BlobID = "TestBlobID"
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata(env.Ctx)
n.SetXattrs(n.NodeMetadata(), true)
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expand Down
33 changes: 15 additions & 18 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,18 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
return nil, errors.Wrap(err, "Decomposedfs: error creating node")
}

if err := root.WriteAllNodeMetadata(ctx); err != nil {
return nil, err
}
var owner *userv1beta1.UserId
if req.GetOwner() != nil && req.GetOwner().GetId() != nil {
owner = req.GetOwner().GetId()
root.SetOwner(req.GetOwner().GetId())
} else {
owner = &userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER}
}
if err := root.WriteOwner(owner); err != nil {
return nil, err
}

err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
if err != nil {
return nil, err
root.SetOwner(&userv1beta1.UserId{OpaqueId: spaceID, Type: userv1beta1.UserType_USER_TYPE_SPACE_OWNER})
}

metadata := make(node.Attributes, 6)
metadata := node.Attributes{}
metadata.SetString(prefixes.OwnerIDAttr, root.Owner().GetOpaqueId())
metadata.SetString(prefixes.OwnerIDPAttr, root.Owner().GetIdp())
metadata.SetString(prefixes.OwnerTypeAttr, utils.UserTypeToString(root.Owner().GetType()))

// always enable propagation on the storage space root
// mark the space root node as the end of propagation
// always mark the space root node as the end of propagation
metadata.SetString(prefixes.PropagationAttr, "1")
metadata.SetString(prefixes.NameAttr, req.Name)
metadata.SetString(prefixes.SpaceNameAttr, req.Name)
Expand Down Expand Up @@ -151,14 +141,20 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
metadata.SetString(prefixes.SpaceAliasAttr, alias)
}

// Write node
if err := root.SetXattrs(metadata, true); err != nil {
return nil, err
}

// Write index
err = fs.updateIndexes(ctx, req.GetOwner().GetId().GetOpaqueId(), req.Type, root.ID)
if err != nil {
return nil, err
}

ctx = context.WithValue(ctx, utils.SpaceGrant, struct{ SpaceType string }{SpaceType: req.Type})

if req.Type != _spaceTypePersonal {
u := ctxpkg.ContextMustGetUser(ctx)
if err := fs.AddGrant(ctx, &provider.Reference{
ResourceId: &provider.ResourceId{
SpaceId: spaceID,
Expand Down Expand Up @@ -639,6 +635,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De
}

// FIXME remove space blobs
// FIXME invalidate cache

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (t *TestEnv) CreateTestFile(name, blobID, parentID, spaceID string, blobSiz
if err != nil {
return nil, err
}
err = n.WriteAllNodeMetadata(context.Background())
err = n.SetXattrs(n.NodeMetadata(), true)
if err != nil {
return nil, err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ func (t *Tree) TouchFile(ctx context.Context, n *node.Node, markprocessing bool)
return errors.Wrap(err, "Decomposedfs: error creating node")
}

err = n.WriteAllNodeMetadata(ctx)
attributes := n.NodeMetadata()
if markprocessing {
attributes[prefixes.StatusPrefix] = []byte(node.ProcessingStatus)
}
err = n.SetXattrs(attributes, true)
if err != nil {
return err
}

if markprocessing {
_ = n.SetXattr(prefixes.StatusPrefix, []byte(node.ProcessingStatus))
}

// link child name to parent if it is new
childNameLink := filepath.Join(n.ParentPath(), n.Name)
var link string
Expand Down Expand Up @@ -196,10 +196,6 @@ func (t *Tree) CreateDir(ctx context.Context, n *node.Node) (err error) {
return
}

if err := n.SetTreeSize(0); err != nil {
return err
}

// make child appear in listings
relativeNodePath := filepath.Join("../../../../../", lookup.Pathify(n.ID, 4, 2))
err = os.Symlink(relativeNodePath, filepath.Join(n.ParentPath(), n.Name))
Expand Down Expand Up @@ -907,7 +903,12 @@ func (t *Tree) createDirNode(ctx context.Context, n *node.Node) (err error) {
return errors.Wrap(err, "Decomposedfs: error creating node")
}

return n.WriteAllNodeMetadata(ctx)
attributes := n.NodeMetadata()
attributes[prefixes.TreesizeAttr] = []byte("0") // initialize as empty, TODO why bother? if it is not set we could treat it as 0?
if t.options.TreeTimeAccounting || t.options.TreeSizeAccounting {
attributes[prefixes.PropagationAttr] = []byte("1") // mark the node for propagation
}
return n.SetXattrs(attributes, true)
}

var nodeIDRegep = regexp.MustCompile(`.*/nodes/([^.]*).*`)
Expand Down

0 comments on commit 41a3598

Please sign in to comment.