diff --git a/changelog/unreleased/ocis-synctime-accounting.md b/changelog/unreleased/ocis-synctime-accounting.md new file mode 100644 index 0000000000..2432e24fd7 --- /dev/null +++ b/changelog/unreleased/ocis-synctime-accounting.md @@ -0,0 +1,7 @@ +Enhancement: introduce ocis driver treetime accounting + +We added tree time accounting to the ocis storage driver which is modeled after [eos synctime accounting](http://eos-docs.web.cern.ch/eos-docs/configuration/namespace.html#enable-subtree-accounting). +It can be enabled using the new `treetime_accounting` option, which defaults to `false` +The `tmtime` is stored in an extended attribute `user.ocis.tmtime`. The treetime accounting is enabled for nodes which have the `user.ocis.propagation` extended attribute set to `"1"`. Currently, propagation is in sync. + +https://github.com/cs3org/reva/pull/1180 \ No newline at end of file diff --git a/pkg/storage/fs/ocis/metadata.go b/pkg/storage/fs/ocis/metadata.go index 88b0d7ed58..0d5859f3c0 100644 --- a/pkg/storage/fs/ocis/metadata.go +++ b/pkg/storage/fs/ocis/metadata.go @@ -41,6 +41,7 @@ func (fs *ocisfs) SetArbitraryMetadata(ctx context.Context, ref *provider.Refere } nodePath := filepath.Join(fs.pw.Root, "nodes", n.ID) for k, v := range md.Metadata { + // TODO set etag as temporary etag tmpEtagAttr attrName := metadataPrefix + k if err = xattr.Set(nodePath, attrName, []byte(v)); err != nil { return errors.Wrap(err, "ocisfs: could not set metadata attribute "+attrName+" to "+k) diff --git a/pkg/storage/fs/ocis/node.go b/pkg/storage/fs/ocis/node.go index eae9e4484b..97f463cb83 100644 --- a/pkg/storage/fs/ocis/node.go +++ b/pkg/storage/fs/ocis/node.go @@ -20,10 +20,13 @@ package ocis import ( "context" + "crypto/md5" "fmt" + "io" "os" "path/filepath" "strings" + "time" userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" @@ -47,17 +50,17 @@ type Node struct { func (n *Node) writeMetadata(owner *userpb.UserId) (err error) { nodePath := filepath.Join(n.pw.Root, "nodes", n.ID) - if err = xattr.Set(nodePath, "user.ocis.parentid", []byte(n.ParentID)); err != nil { + if err = xattr.Set(nodePath, parentidAttr, []byte(n.ParentID)); err != nil { return errors.Wrap(err, "ocisfs: could not set parentid attribute") } - if err = xattr.Set(nodePath, "user.ocis.name", []byte(n.Name)); err != nil { + if err = xattr.Set(nodePath, nameAttr, []byte(n.Name)); err != nil { return errors.Wrap(err, "ocisfs: could not set name attribute") } if owner != nil { - if err = xattr.Set(nodePath, "user.ocis.owner.id", []byte(owner.OpaqueId)); err != nil { + if err = xattr.Set(nodePath, ownerIDAttr, []byte(owner.OpaqueId)); err != nil { return errors.Wrap(err, "ocisfs: could not set owner id attribute") } - if err = xattr.Set(nodePath, "user.ocis.owner.idp", []byte(owner.Idp)); err != nil { + if err = xattr.Set(nodePath, ownerIDPAttr, []byte(owner.Idp)); err != nil { return errors.Wrap(err, "ocisfs: could not set owner idp attribute") } } @@ -75,13 +78,13 @@ func ReadNode(ctx context.Context, pw *Path, id string) (n *Node, err error) { // lookup parent id in extended attributes var attrBytes []byte - if attrBytes, err = xattr.Get(nodePath, "user.ocis.parentid"); err == nil { + if attrBytes, err = xattr.Get(nodePath, parentidAttr); err == nil { n.ParentID = string(attrBytes) } else { return } // lookup name in extended attributes - if attrBytes, err = xattr.Get(nodePath, "user.ocis.name"); err == nil { + if attrBytes, err = xattr.Get(nodePath, nameAttr); err == nil { n.Name = string(attrBytes) } else { return @@ -99,7 +102,7 @@ func ReadNode(ctx context.Context, pw *Path, id string) (n *Node, err error) { // walk to root to check node is not part of a deleted subtree parentPath := filepath.Join(n.pw.Root, "nodes", parentID) - if attrBytes, err = xattr.Get(parentPath, "user.ocis.parentid"); err == nil { + if attrBytes, err = xattr.Get(parentPath, parentidAttr); err == nil { parentID = string(attrBytes) log.Debug().Interface("node", n).Str("root.ID", root.ID).Str("parentID", parentID).Msg("ReadNode() found parent") } else { @@ -156,13 +159,13 @@ func (n *Node) Parent() (p *Node, err error) { // lookup parent id in extended attributes var attrBytes []byte - if attrBytes, err = xattr.Get(parentPath, "user.ocis.parentid"); err == nil { + if attrBytes, err = xattr.Get(parentPath, parentidAttr); err == nil { p.ParentID = string(attrBytes) } else { return } // lookup name in extended attributes - if attrBytes, err = xattr.Get(parentPath, "user.ocis.name"); err == nil { + if attrBytes, err = xattr.Get(parentPath, nameAttr); err == nil { p.Name = string(attrBytes) } else { return @@ -186,13 +189,13 @@ func (n *Node) Owner() (id string, idp string, err error) { // lookup parent id in extended attributes var attrBytes []byte // lookup name in extended attributes - if attrBytes, err = xattr.Get(nodePath, "user.ocis.owner.id"); err == nil { + if attrBytes, err = xattr.Get(nodePath, ownerIDAttr); err == nil { n.ownerID = string(attrBytes) } else { return } // lookup name in extended attributes - if attrBytes, err = xattr.Get(nodePath, "user.ocis.owner.idp"); err == nil { + if attrBytes, err = xattr.Get(nodePath, ownerIDPAttr); err == nil { n.ownerIDP = string(attrBytes) } else { return @@ -230,32 +233,22 @@ func (n *Node) AsResourceInfo(ctx context.Context) (ri *provider.ResourceInfo, e // nodeType = provider.ResourceType_RESOURCE_TYPE_REFERENCE } - var etag []byte - // TODO optionally store etag in new `root/attributes/` file - if etag, err = xattr.Get(nodePath, "user.ocis.etag"); err != nil { - log.Error().Err(err).Interface("node", n).Msg("could not read etag") - } - id := &provider.ResourceId{OpaqueId: n.ID} fn, err = n.pw.Path(ctx, n) if err != nil { return nil, err } + ri = &provider.ResourceInfo{ Id: id, Path: fn, Type: nodeType, - Etag: string(etag), MimeType: mime.Detect(nodeType == provider.ResourceType_RESOURCE_TYPE_CONTAINER, fn), Size: uint64(fi.Size()), // TODO fix permissions PermissionSet: &provider.ResourcePermissions{ListContainer: true, CreateContainer: true}, - Mtime: &types.Timestamp{ - Seconds: uint64(fi.ModTime().Unix()), - // TODO read nanos from where? Nanos: fi.MTimeNanos, - }, - Target: string(target), + Target: string(target), } if owner, idp, err := n.Owner(); err == nil { @@ -265,6 +258,41 @@ func (n *Node) AsResourceInfo(ctx context.Context) (ri *provider.ResourceInfo, e } } + // etag currently is a hash of fileid + tmtime (or mtime) + // TODO make etag of files use fileid and checksum + // TODO implment adding temporery etag in an attribute to restore backups + h := md5.New() + if _, err := io.WriteString(h, n.ID); err != nil { + return nil, err + } + var tmTime time.Time + if tmTime, err = n.GetTMTime(); err != nil { + // no tmtime, use mtime + tmTime = fi.ModTime() + } + if tb, err := tmTime.UTC().MarshalBinary(); err == nil { + if _, err := h.Write(tb); err != nil { + return nil, err + } + } else { + return nil, err + } + + // use temporary etag if it is set + if b, err := xattr.Get(nodePath, tmpEtagAttr); err == nil { + ri.Etag = string(b) + } else { + ri.Etag = fmt.Sprintf("%x", h.Sum(nil)) + } + + // mtime uses tmtime if present + // TODO expose mtime and tmtime separately? + un := tmTime.UnixNano() + ri.Mtime = &types.Timestamp{ + Seconds: uint64(un / 1000000000), + Nanos: uint32(un % 1000000000), + } + // TODO only read the requested metadata attributes if attrs, err := xattr.List(nodePath); err == nil { ri.ArbitraryMetadata = &provider.ArbitraryMetadata{ @@ -290,3 +318,39 @@ func (n *Node) AsResourceInfo(ctx context.Context) (ri *provider.ResourceInfo, e return ri, nil } + +// HasPropagation checks if the propagation attribute exists and is set to "1" +func (n *Node) HasPropagation() (propagation bool) { + nodePath := filepath.Join(n.pw.Root, "nodes", n.ID) + if b, err := xattr.Get(nodePath, propagationAttr); err == nil { + return string(b) == "1" + } + return false +} + +// GetTMTime reads the tmtime from the extended attributes +func (n *Node) GetTMTime() (tmTime time.Time, err error) { + nodePath := filepath.Join(n.pw.Root, "nodes", n.ID) + var b []byte + if b, err = xattr.Get(nodePath, treeMTimeAttr); err != nil { + return + } + return time.Parse(time.RFC3339Nano, string(b)) +} + +// SetTMTime writes the tmtime to the extended attributes +func (n *Node) SetTMTime(t time.Time) (err error) { + nodePath := filepath.Join(n.pw.Root, "nodes", n.ID) + return xattr.Set(nodePath, treeMTimeAttr, []byte(t.UTC().Format(time.RFC3339Nano))) +} + +// UnsetTempEtag removes the temporary etag attribute +func (n *Node) UnsetTempEtag() (err error) { + nodePath := filepath.Join(n.pw.Root, "nodes", n.ID) + if err = xattr.Remove(nodePath, tmpEtagAttr); err != nil { + if e, ok := err.(*xattr.Error); ok && e.Err.Error() == "no data available" { + return nil + } + } + return err +} diff --git a/pkg/storage/fs/ocis/ocis.go b/pkg/storage/fs/ocis/ocis.go index 2b614e8766..e19a22115b 100644 --- a/pkg/storage/fs/ocis/ocis.go +++ b/pkg/storage/fs/ocis/ocis.go @@ -27,6 +27,7 @@ import ( "strings" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/logger" "github.com/cs3org/reva/pkg/storage" @@ -47,16 +48,39 @@ const ( // collisions with other apps We are going to introduce a sub namespace // "user.ocis." + ocisPrefix string = "user.ocis." + parentidAttr string = ocisPrefix + "parentid" + ownerIDAttr string = ocisPrefix + "owner.id" + ownerIDPAttr string = ocisPrefix + "owner.idp" + // the base name of the node + // updated when the file is renamed or moved + nameAttr string = ocisPrefix + "name" + // SharePrefix is the prefix for sharing related extended attributes - sharePrefix string = "user.ocis.acl." - metadataPrefix string = "user.ocis.md." + sharePrefix string = ocisPrefix + "acl." + metadataPrefix string = ocisPrefix + "md." // TODO implement favorites metadata flag - //favPrefix string = "user.ocis.fav." // favorite flag, per user - // TODO use etag prefix instead of single etag property - //etagPrefix string = "user.ocis.etag." // allow overriding a calculated etag with one from the extended attributes - referenceAttr string = "user.ocis.cs3.ref" // arbitrary metadata - //checksumPrefix string = "user.ocis.cs." // TODO add checksum support - trashOriginAttr string = "user.ocis.trash.origin" // trash origin + //favPrefix string = ocisPrefix + "fav." // favorite flag, per user + + // a temporary etag for a folder that is removed when the mtime propagation happens + tmpEtagAttr string = ocisPrefix + "tmp.etag" + referenceAttr string = ocisPrefix + "cs3.ref" // arbitrary metadata + //checksumPrefix string = ocisPrefix + "cs." // TODO add checksum support + trashOriginAttr string = ocisPrefix + "trash.origin" // trash origin + + // we use a single attribute to enable or disable propagation of both: synctime and treesize + propagationAttr string = ocisPrefix + "propagation" + + // the tree modification time of the tree below this node, + // propagated when synctime_accounting is true and + // user.ocis.propagation=1 is set + // stored as a readable time.RFC3339Nano + treeMTimeAttr string = ocisPrefix + "tmtime" + + // the size of the tree below this node, + // propagated when treesize_accounting is true and + // user.ocis.propagation=1 is set + //treesizeAttr string = ocisPrefix + "treesize" ) func init() { @@ -149,11 +173,11 @@ func (fs *ocisfs) CreateHome(ctx context.Context) (err error) { return errtypes.NotSupported("ocisfs: CreateHome() home supported disabled") } - var n *Node + var n, h *Node if n, err = fs.pw.RootNode(ctx); err != nil { return } - _, err = fs.pw.WalkPath(ctx, n, fs.pw.mustGetUserLayout(ctx), func(ctx context.Context, n *Node) error { + h, err = fs.pw.WalkPath(ctx, n, fs.pw.mustGetUserLayout(ctx), func(ctx context.Context, n *Node) error { if !n.Exists { if err := fs.tp.CreateDir(ctx, n); err != nil { return err @@ -161,6 +185,15 @@ func (fs *ocisfs) CreateHome(ctx context.Context) (err error) { } return nil }) + + if fs.pw.TreeTimeAccounting { + homePath := filepath.Join(fs.pw.Root, "nodes", h.ID) + // mark the home node as the end of propagation + if err = xattr.Set(homePath, propagationAttr, []byte("1")); err != nil { + appctx.GetLogger(ctx).Error().Err(err).Interface("node", h).Msg("could not mark home as propagation root") + return + } + } return } @@ -190,7 +223,17 @@ func (fs *ocisfs) CreateDir(ctx context.Context, fn string) (err error) { if node.Exists { return errtypes.AlreadyExists(fn) } - return fs.tp.CreateDir(ctx, node) + err = fs.tp.CreateDir(ctx, node) + + if fs.pw.TreeTimeAccounting { + nodePath := filepath.Join(fs.pw.Root, "nodes", node.ID) + // mark the home node as the end of propagation + if err = xattr.Set(nodePath, propagationAttr, []byte("1")); err != nil { + appctx.GetLogger(ctx).Error().Err(err).Interface("node", node).Msg("could not mark node to propagate") + return + } + } + return } // CreateReference creates a reference as a node folder with the target stored in extended attributes @@ -347,7 +390,7 @@ func (fs *ocisfs) copyMD(s string, t string) (err error) { return err } for i := range attrs { - if strings.HasPrefix(attrs[i], "user.ocis.") { + if strings.HasPrefix(attrs[i], ocisPrefix) { var d []byte if d, err = xattr.Get(s, attrs[i]); err != nil { return err diff --git a/pkg/storage/fs/ocis/path.go b/pkg/storage/fs/ocis/path.go index 2a839dd51e..4573e41151 100644 --- a/pkg/storage/fs/ocis/path.go +++ b/pkg/storage/fs/ocis/path.go @@ -40,10 +40,16 @@ type Path struct { UserLayout string `mapstructure:"user_layout"` // TODO NodeLayout option to save nodes as eg. nodes/1d/d8/1dd84abf-9466-4e14-bb86-02fc4ea3abcf + ShareFolder string `mapstructure:"share_folder"` // EnableHome enables the creation of home directories. - EnableHome bool `mapstructure:"enable_home"` - ShareFolder string `mapstructure:"share_folder"` + EnableHome bool `mapstructure:"enable_home"` + + // propagate mtime changes as tmtime (tree modification time) to the parent directory when user.ocis.propagation=1 is set on a node + TreeTimeAccounting bool `mapstructure:"treetime_accounting"` + + // propagate size changes as treesize + TreeSizeAccounting bool `mapstructure:"treesize_accounting"` } // NodeFromResource takes in a request path or request id and converts it to a Node diff --git a/pkg/storage/fs/ocis/tree.go b/pkg/storage/fs/ocis/tree.go index beb9837a25..23d113689b 100644 --- a/pkg/storage/fs/ocis/tree.go +++ b/pkg/storage/fs/ocis/tree.go @@ -20,8 +20,6 @@ package ocis import ( "context" - "encoding/hex" - "math/rand" "os" "path/filepath" "time" @@ -34,6 +32,7 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/pkg/xattr" + "github.com/rs/zerolog/log" ) // Tree manages a hierarchical tree @@ -145,7 +144,7 @@ func (t *Tree) Move(ctx context.Context, oldNode *Node, newNode *Node) (err erro tgtPath := filepath.Join(t.pw.Root, "nodes", oldNode.ID) // update name attribute - if err := xattr.Set(tgtPath, "user.ocis.name", []byte(newNode.Name)); err != nil { + if err := xattr.Set(tgtPath, nameAttr, []byte(newNode.Name)); err != nil { return errors.Wrap(err, "ocisfs: could not set name attribute") } @@ -167,10 +166,10 @@ func (t *Tree) Move(ctx context.Context, oldNode *Node, newNode *Node) (err erro // update parentid and name tgtPath := filepath.Join(t.pw.Root, "nodes", newNode.ID) - if err := xattr.Set(tgtPath, "user.ocis.parentid", []byte(newNode.ParentID)); err != nil { + if err := xattr.Set(tgtPath, parentidAttr, []byte(newNode.ParentID)); err != nil { return errors.Wrap(err, "ocisfs: could not set parentid attribute") } - if err := xattr.Set(tgtPath, "user.ocis.name", []byte(newNode.Name)); err != nil { + if err := xattr.Set(tgtPath, nameAttr, []byte(newNode.Name)); err != nil { return errors.Wrap(err, "ocisfs: could not set name attribute") } @@ -286,33 +285,83 @@ func (t *Tree) Delete(ctx context.Context, node *Node) (err error) { } // Propagate propagates changes to the root of the tree -func (t *Tree) Propagate(ctx context.Context, node *Node) (err error) { - // generate an etag - bytes := make([]byte, 16) - if _, err := rand.Read(bytes); err != nil { - return err +func (t *Tree) Propagate(ctx context.Context, n *Node) (err error) { + if !t.pw.TreeTimeAccounting && !t.pw.TreeSizeAccounting { + // no propagation enabled + log.Debug().Msg("propagation disabled") + return } - // store in extended attribute - etag := hex.EncodeToString(bytes) - var root *Node + log := appctx.GetLogger(ctx) + nodePath := filepath.Join(t.pw.Root, "nodes", n.ID) + + // is propagation enabled for the parent node? + + var root *Node if root, err = t.pw.HomeOrRootNode(ctx); err != nil { return } - for err == nil && node.ID != root.ID { // TODO propagate up to where? - if err := xattr.Set(filepath.Join(t.pw.Root, "nodes", node.ID), "user.ocis.etag", []byte(etag)); err != nil { - log := appctx.GetLogger(ctx) - log.Error().Err(err).Msg("error storing file id") + + var fi os.FileInfo + if fi, err = os.Stat(nodePath); err != nil { + return err + } + + var b []byte + + for err == nil && n.ID != root.ID { + log.Debug().Interface("node", n).Msg("propagating") + + if n, err = n.Parent(); err != nil { + break } - // TODO propagate mtime - // TODO size accounting - if err != nil { - err = errors.Wrap(err, "ocisfs: Propagate: readlink error") - return + // TODO none, sync and async? + if !n.HasPropagation() { + log.Debug().Interface("node", n).Str("attr", propagationAttr).Msg("propagation attribute not set or unreadable, not propagating") + // if the attribute is not set treat it as false / none / no propagation + return nil + } + + if t.pw.TreeTimeAccounting { + // update the parent tree time if it is older than the nodes mtime + updateSyncTime := false + + var tmTime time.Time + tmTime, err = n.GetTMTime() + switch { + case err != nil: + // missing attribute, or invalid format, overwrite + log.Error().Err(err).Interface("node", n).Msg("could not read tmtime attribute, overwriting") + updateSyncTime = true + case tmTime.Before(fi.ModTime()): + log.Debug().Interface("node", n).Str("tmtime", string(b)).Str("mtime", fi.ModTime().UTC().Format(time.RFC3339Nano)).Msg("parent tmtime is older than node mtime, updating") + updateSyncTime = true + default: + log.Debug().Interface("node", n).Str("tmtime", string(b)).Str("mtime", fi.ModTime().UTC().Format(time.RFC3339Nano)).Msg("parent tmtime is younger than node mtime, not updating") + } + + if updateSyncTime { + // update the tree time of the parent node + if err = n.SetTMTime(fi.ModTime()); err != nil { + log.Error().Err(err).Interface("node", n).Time("tmtime", fi.ModTime().UTC()).Msg("could not update tmtime of parent node") + return + } + log.Debug().Interface("node", n).Time("tmtime", fi.ModTime().UTC()).Msg("updated tmtime of parent node") + } + + if err := n.UnsetTempEtag(); err != nil { + log.Error().Err(err).Interface("node", n).Msg("could not remove temporary etag attribute") + } + } - node, err = node.Parent() + // TODO size accounting + + } + if err != nil { + log.Error().Err(err).Interface("node", n).Msg("error propagating") + return } return } diff --git a/pkg/storage/fs/ocis/upload.go b/pkg/storage/fs/ocis/upload.go index d3678c11c1..e1c1c5ef91 100644 --- a/pkg/storage/fs/ocis/upload.go +++ b/pkg/storage/fs/ocis/upload.go @@ -35,6 +35,7 @@ import ( "github.com/cs3org/reva/pkg/user" "github.com/google/uuid" "github.com/pkg/errors" + "github.com/pkg/xattr" "github.com/rs/zerolog/log" tusd "github.com/tus/tusd/pkg/handler" ) @@ -73,6 +74,30 @@ func (fs *ocisfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read if err != nil { return err } + + if fs.pw.EnableHome { + if u, ok := user.ContextGetUser(ctx); ok { + err = node.writeMetadata(u.Id) + } else { + log := appctx.GetLogger(ctx) + log.Error().Msg("home support enabled but no user in context") + err = errors.Wrap(errtypes.UserRequired("userrequired"), "error getting user from ctx") + } + } else { + err = node.writeMetadata(nil) + } + if err != nil { + return err + } + + if fs.pw.TreeTimeAccounting { + // mark the home node as the end of propagation q + if err = xattr.Set(nodePath, propagationAttr, []byte("1")); err != nil { + appctx.GetLogger(ctx).Error().Err(err).Interface("node", node).Msg("could not mark node to propagate") + return err + } + } + return fs.tp.Propagate(ctx, node) } @@ -379,6 +404,13 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) { if err != nil { return } + if n.pw.TreeTimeAccounting { + // mark the home node as the end of propagation q + if err = xattr.Set(targetPath, propagationAttr, []byte("1")); err != nil { + appctx.GetLogger(ctx).Error().Err(err).Interface("node", n).Msg("could not mark node to propagate") + return + } + } // link child name to parent if it is new childNameLink := filepath.Join(upload.fs.pw.Root, "nodes", n.ParentID, n.Name)