diff --git a/pkg/storage/fs/posix/tree/assimilation.go b/pkg/storage/fs/posix/tree/assimilation.go index 84962496544..40185ddc639 100644 --- a/pkg/storage/fs/posix/tree/assimilation.go +++ b/pkg/storage/fs/posix/tree/assimilation.go @@ -46,7 +46,7 @@ import ( type ScanDebouncer struct { after time.Duration f func(item scanItem) - pending map[string]*queueItem + pending sync.Map inProgress sync.Map mutex sync.Mutex @@ -71,7 +71,7 @@ func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer { return &ScanDebouncer{ after: d, f: f, - pending: map[string]*queueItem{}, + pending: sync.Map{}, inProgress: sync.Map{}, } } @@ -84,24 +84,28 @@ func (d *ScanDebouncer) Debounce(item scanItem) { path := item.Path force := item.ForceRescan recurse := item.Recurse - if t := d.pending[item.Path]; t != nil { - force = force || t.item.ForceRescan - recurse = recurse || t.item.Recurse - t.timer.Stop() + if i, ok := d.pending.Load(item.Path); ok { + queueItem := i.(*queueItem) + force = force || queueItem.item.ForceRescan + recurse = recurse || queueItem.item.Recurse + queueItem.timer.Stop() } - d.pending[item.Path] = &queueItem{ + d.pending.Store(item.Path, &queueItem{ item: item, timer: time.AfterFunc(d.after, func() { if _, ok := d.inProgress.Load(path); ok { // Reschedule this run for when the previous run has finished d.mutex.Lock() - d.pending[path].timer.Reset(d.after) + if i, ok := d.pending.Load(path); ok { + i.(*queueItem).timer.Reset(d.after) + } + d.mutex.Unlock() return } - delete(d.pending, path) + d.pending.Delete(path) d.inProgress.Store(path, true) defer d.inProgress.Delete(path) d.f(scanItem{ @@ -110,14 +114,14 @@ func (d *ScanDebouncer) Debounce(item scanItem) { Recurse: recurse, }) }), - } + }) } // InProgress returns true if the given path is currently being processed func (d *ScanDebouncer) InProgress(path string) bool { d.mutex.Lock() defer d.mutex.Unlock() - if _, ok := d.pending[path]; ok { + if _, ok := d.pending.Load(path); ok { return true }