Skip to content

Commit

Permalink
Fix corner cases when assimilating new files/directories
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Jul 11, 2024
1 parent 1324a67 commit 1622428
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 102 deletions.
286 changes: 194 additions & 92 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,50 +46,83 @@ import (
type ScanDebouncer struct {
after time.Duration
f func(item scanItem)
pending map[string]*time.Timer
pending map[string]*queueItem
inProgress sync.Map

mutex sync.Mutex
}

type EventAction int

const (
ActionCreate EventAction = iota
ActionUpdate
ActionMove
ActionDelete
)

type queueItem struct {
item scanItem
timer *time.Timer
}

// NewScanDebouncer returns a new SpaceDebouncer instance
func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer {
return &ScanDebouncer{
after: d,
f: f,
pending: map[string]*time.Timer{},
pending: map[string]*queueItem{},
inProgress: sync.Map{},
}
}

// Debounce restars the debounce timer for the given space
// Debounce restarts the debounce timer for the given space
func (d *ScanDebouncer) Debounce(item scanItem) {
d.mutex.Lock()
defer d.mutex.Unlock()

path := item.Path
force := item.ForceRescan
recurse := item.Recurse
if t := d.pending[item.Path]; t != nil {
force = force || item.ForceRescan
t.Stop()
force = force || t.item.ForceRescan
recurse = recurse || t.item.Recurse
t.timer.Stop()
}

d.pending[item.Path] = time.AfterFunc(d.after, func() {
if _, ok := d.inProgress.Load(path); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
d.pending[path].Reset(d.after)
d.mutex.Unlock()
return
}
d.pending[item.Path] = &queueItem{
item: item,
timer: time.AfterFunc(d.after, func() {
if _, ok := d.inProgress.Load(path); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
d.pending[path].timer.Reset(d.after)
d.mutex.Unlock()
return
}

d.inProgress.Store(path, true)
defer d.inProgress.Delete(path)
d.f(scanItem{
Path: path,
ForceRescan: force,
})
})
delete(d.pending, path)
d.inProgress.Store(path, true)
defer d.inProgress.Delete(path)
d.f(scanItem{
Path: path,
ForceRescan: force,
Recurse: recurse,
})
}),
}
}

// InProgress returns true if the given path is currently being processed
func (d *ScanDebouncer) InProgress(path string) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
if _, ok := d.pending[path]; ok {
return true
}

_, ok := d.inProgress.Load(path)
return ok
}

func (t *Tree) workScanQueue() {
Expand All @@ -103,17 +136,73 @@ func (t *Tree) workScanQueue() {
log.Error().Err(err).Str("path", item.Path).Msg("failed to assimilate item")
continue
}

if item.Recurse {
err = t.WarmupIDCache(item.Path, true)
if err != nil {
log.Error().Err(err).Str("path", item.Path).Msg("failed to warmup id cache")
}
}
}
}()
}
}

// Scan scans the given path and updates the id chache
func (t *Tree) Scan(path string, forceRescan bool) error {
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: forceRescan,
})
func (t *Tree) Scan(path string, action EventAction, isDir bool, recurse bool) error {
// cases:
switch action {
case ActionCreate:
if !isDir {
// 1. New file (could be emitted as part of a new directory)
// -> assimilate file
// -> scan parent directory recursively
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: false,
})
}
t.scanDebouncer.Debounce(scanItem{
Path: filepath.Dir(path),
ForceRescan: true,
Recurse: true,
})
} else {
// 2. New directory
// -> scan directory
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: true,
Recurse: true,
})
}

case ActionUpdate:
// 3. Updated file
// -> update file unless parent directory is being rescanned
if !t.scanDebouncer.InProgress(filepath.Dir(path)) {
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: true,
})
}

case ActionMove:
// 4. Moved file
// -> update file
// 5. Moved directory
// -> update directory and all children
t.scanDebouncer.Debounce(scanItem{
Path: path,
ForceRescan: isDir,
Recurse: isDir,
})

case ActionDelete:
t.HandleFileDelete(path)
}

return nil
}

Expand Down Expand Up @@ -183,44 +272,43 @@ func (t *Tree) getOwnerAndIDs(path string) (*userv1beta1.UserId, string, string,
return owner, nodeID, spaceID, parentID, nil
}

func (t *Tree) assimilate(item scanItem) error {
var err error
func (t *Tree) findSpaceId(path string) (string, node.Attributes, error) {
// find the space id, scope by the according user
spaceID := []byte("")
spaceCandidate := item.Path
spaceCandidate := path
spaceAttrs := node.Attributes{}
for strings.HasPrefix(spaceCandidate, t.options.Root) {
spaceAttrs, err = t.lookup.MetadataBackend().All(context.Background(), spaceCandidate)
if err == nil && len(spaceAttrs[prefixes.SpaceIDAttr]) > 0 {
spaceID = spaceAttrs[prefixes.SpaceIDAttr]
spaceAttrs, err := t.lookup.MetadataBackend().All(context.Background(), spaceCandidate)
spaceID := spaceAttrs[prefixes.SpaceIDAttr]
if err == nil && len(spaceID) > 0 {
if t.options.UseSpaceGroups {
// set the uid and gid for the space
fi, err := os.Stat(spaceCandidate)
if err != nil {
return err
return "", spaceAttrs, err
}
sys := fi.Sys().(*syscall.Stat_t)
gid := int(sys.Gid)
_, err = t.userMapper.ScopeUserByIds(-1, gid)
if err != nil {
return err
return "", spaceAttrs, err
}
}
break

return string(spaceID), spaceAttrs, nil
}
spaceCandidate = filepath.Dir(spaceCandidate)
}
if len(spaceID) == 0 {
return fmt.Errorf("did not find space id for path")
}
return "", spaceAttrs, fmt.Errorf("could not find space for path %s", path)
}

func (t *Tree) assimilate(item scanItem) error {
var id []byte
if !item.ForceRescan {
// already assimilated?
id, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.IDAttr)
if err == nil {
return t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path)
}
var err error

// First find the space id
spaceID, spaceAttrs, err := t.findSpaceId(item.Path)
if err != nil {
return err
}

// lock the file for assimilation
Expand All @@ -242,57 +330,55 @@ func (t *Tree) assimilate(item scanItem) error {
if err == nil {
previousPath, ok := t.lookup.(*lookup.Lookup).GetCachedID(context.Background(), string(spaceID), string(id))

// This item had already been assimilated in the past. Update the path
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), item.Path)
if item.ForceRescan {
previousParentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
if err != nil {
return err
}

fi, err := t.updateFile(item.Path, string(id), string(spaceID))
if err != nil {
return err
}
previousParentID, _ := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)

// was it moved?
if ok && previousPath != item.Path {
// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(previousPath)
fi, err := t.updateFile(item.Path, string(id), string(spaceID))
if err != nil {
return err
}

if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false) }()
}
// was it moved?
if ok && len(previousParentID) > 0 && previousPath != item.Path {
// purge original metadata. Only delete the path entry using DeletePath(reverse lookup), not the whole entry pair.
_ = t.lookup.(*lookup.Lookup).IDCache.DeletePath(context.Background(), previousPath)
_ = t.lookup.MetadataBackend().Purge(previousPath)

parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
if err == nil && len(parentID) > 0 {
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: string(spaceID),
OpaqueId: string(parentID),
},
Path: filepath.Base(item.Path),
}
oldRef := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: string(spaceID),
OpaqueId: string(previousParentID),
},
Path: filepath.Base(previousPath),
}
t.PublishEvent(events.ItemMoved{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
OldReference: oldRef,
Timestamp: utils.TSNow(),
})
if fi.IsDir() {
// if it was moved and it is a directory we need to propagate the move
go func() { _ = t.WarmupIDCache(item.Path, false) }()
}

parentID, err := t.lookup.MetadataBackend().Get(context.Background(), item.Path, prefixes.ParentidAttr)
if err == nil && len(parentID) > 0 {
ref := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: string(spaceID),
OpaqueId: string(parentID),
},
Path: filepath.Base(item.Path),
}
oldRef := &provider.Reference{
ResourceId: &provider.ResourceId{
StorageId: t.options.MountID,
SpaceId: string(spaceID),
OpaqueId: string(previousParentID),
},
Path: filepath.Base(previousPath),
}
t.PublishEvent(events.ItemMoved{
SpaceOwner: user,
Executant: user,
Owner: user,
Ref: ref,
OldReference: oldRef,
Timestamp: utils.TSNow(),
})
}
// }
}
} else {
// assimilate new file
Expand Down Expand Up @@ -455,17 +541,25 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
return err
}

return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
sizes := make(map[string]int64)
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

// skip lock files
if strings.HasSuffix(path, ".flock") || strings.HasSuffix(path, ".mlock") {
return nil
}

// calculate tree sizes
if !info.IsDir() {
dir := filepath.Dir(path)
sizes[dir] += info.Size()
}

attribs, err := t.lookup.MetadataBackend().All(context.Background(), path)
if err == nil {
if err == nil && len(attribs[prefixes.IDAttr]) > 0 {
nodeSpaceID := attribs[prefixes.SpaceIDAttr]
if len(nodeSpaceID) > 0 {
spaceID = nodeSpaceID
Expand Down Expand Up @@ -496,10 +590,18 @@ func (t *Tree) WarmupIDCache(root string, assimilate bool) error {
id, ok := attribs[prefixes.IDAttr]
if ok {
_ = t.lookup.(*lookup.Lookup).CacheID(context.Background(), string(spaceID), string(id), path)
} else if assimilate {
_ = t.Scan(path, false)
}
} else if assimilate {
_ = t.assimilate(scanItem{Path: path, ForceRescan: true})
}
return nil
})
if err != nil {
return err
}

for dir, size := range sizes {
_ = t.lookup.MetadataBackend().Set(context.Background(), dir, prefixes.TreesizeAttr, []byte(fmt.Sprintf("%d", size)))
}
return nil
}
Loading

0 comments on commit 1622428

Please sign in to comment.