Skip to content

Commit

Permalink
Merge pull request #3845 from aduffeck/fix-propagate
Browse files Browse the repository at this point in the history
Fix propagation in concurrency scenarios
  • Loading branch information
aduffeck authored May 4, 2023
2 parents 62a542c + e106f3f commit 756a843
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 90 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-propagation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Fix propagation

Fix propagation in concurrency scenarios

https://github.com/cs3org/reva/pull/3845
178 changes: 88 additions & 90 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,9 @@ func (t *Tree) removeNode(path string, n *node.Node) error {
// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if !t.options.TreeTimeAccounting && !t.options.TreeSizeAccounting {
if !t.options.TreeTimeAccounting && (!t.options.TreeSizeAccounting || sizeDiff == 0) {
// no propagation enabled
sublog.Debug().Msg("propagation disabled")
sublog.Debug().Msg("propagation disabled or nothing to propagate")
return
}

Expand All @@ -707,11 +707,33 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
for err == nil && n.ID != root.ID {
sublog.Debug().Msg("propagating")

if n, err = n.Parent(); err != nil {
break
attrs := node.Attributes{}

var f *lockedfile.File
// lock parent before reading treesize or tree time
switch t.lookup.MetadataBackend().(type) {
case metadata.MessagePackBackend:
f, err = lockedfile.OpenFile(t.lookup.MetadataBackend().MetadataPath(n.ParentPath()), os.O_RDWR|os.O_CREATE, 0600)
case metadata.XattrsBackend:
// we have to use dedicated lockfiles to lock directories
// this only works because the xattr backend also locks folders with separate lock files
f, err = lockedfile.OpenFile(n.ParentPath()+filelocks.LockFileSuffix, os.O_RDWR|os.O_CREATE, 0600)
}
if err != nil {
return err
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
}()

sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if n, err = n.Parent(); err != nil {
return err
}

// TODO none, sync and async?
if !n.HasPropagation() {
Expand All @@ -720,102 +742,78 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
return nil
}

if t.options.TreeTimeAccounting || (t.options.TreeSizeAccounting && sizeDiff != 0) {
attrs := node.Attributes{}
sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()

var f *lockedfile.File
// lock node before reading treesize or tree time
switch t.lookup.MetadataBackend().(type) {
case metadata.MessagePackBackend:
f, err = lockedfile.OpenFile(t.lookup.MetadataBackend().MetadataPath(n.InternalPath()), os.O_RDWR|os.O_CREATE, 0600)
case metadata.XattrsBackend:
// we have to use dedicated lockfiles to lock directories
// this only works because the xattr backend also locks folders with separate lock files
f, err = lockedfile.OpenFile(n.InternalPath()+filelocks.LockFileSuffix, os.O_RDWR|os.O_CREATE, 0600)
}
if err != nil {
return err
if t.options.TreeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

var tmTime time.Time
tmTime, err = n.GetTMTime()
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
sublog.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
}()

if t.options.TreeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

var tmTime time.Time
tmTime, err = n.GetTMTime()
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
sublog.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}

if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}

attrs.SetString(prefixes.TmpEtagAttr, "")
if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}

// size accounting
if t.options.TreeSizeAccounting && sizeDiff != 0 {
var newSize uint64
attrs.SetString(prefixes.TmpEtagAttr, "")
}

// read treesize
treeSize, err := n.GetTreeSize()
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
newSize, err = t.calculateTreeSize(ctx, n.InternalPath())
if err != nil {
return err
}
case err != nil:
// size accounting
if t.options.TreeSizeAccounting && sizeDiff != 0 {
var newSize uint64

// read treesize
treeSize, err := n.GetTreeSize()
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
newSize, err = t.calculateTreeSize(ctx, n.InternalPath())
if err != nil {
return err
default:
if sizeDiff > 0 {
newSize = treeSize + uint64(sizeDiff)
} else {
newSize = treeSize - uint64(-sizeDiff)
}
}

// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
}

if err = n.SetXattrs(attrs, false); err != nil {
case err != nil:
return err
default:
if sizeDiff > 0 {
newSize = treeSize + uint64(sizeDiff)
} else {
newSize = treeSize - uint64(-sizeDiff)
}
}

// Release node lock early, ignore already closed error
cerr := f.Close()
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
return cerr
}
// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
}

if err = n.SetXattrs(attrs, false); err != nil {
return err
}

// Release node lock early, ignore already closed error
cerr := f.Close()
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
return cerr
}
}
if err != nil {
Expand Down

0 comments on commit 756a843

Please sign in to comment.