Skip to content

Commit

Permalink
Fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Jul 11, 2024
1 parent a626272 commit 4e2d6c5
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions pkg/storage/fs/posix/tree/assimilation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
type ScanDebouncer struct {
after time.Duration
f func(item scanItem)
pending map[string]*queueItem
pending sync.Map
inProgress sync.Map

mutex sync.Mutex
Expand All @@ -71,7 +71,7 @@ func NewScanDebouncer(d time.Duration, f func(item scanItem)) *ScanDebouncer {
return &ScanDebouncer{
after: d,
f: f,
pending: map[string]*queueItem{},
pending: sync.Map{},
inProgress: sync.Map{},
}
}
Expand All @@ -84,24 +84,28 @@ func (d *ScanDebouncer) Debounce(item scanItem) {
path := item.Path
force := item.ForceRescan
recurse := item.Recurse
if t := d.pending[item.Path]; t != nil {
force = force || t.item.ForceRescan
recurse = recurse || t.item.Recurse
t.timer.Stop()
if i, ok := d.pending.Load(item.Path); ok {
queueItem := i.(*queueItem)
force = force || queueItem.item.ForceRescan
recurse = recurse || queueItem.item.Recurse
queueItem.timer.Stop()
}

d.pending[item.Path] = &queueItem{
d.pending.Store(item.Path, &queueItem{
item: item,
timer: time.AfterFunc(d.after, func() {
if _, ok := d.inProgress.Load(path); ok {
// Reschedule this run for when the previous run has finished
d.mutex.Lock()
d.pending[path].timer.Reset(d.after)
if i, ok := d.pending.Load(path); ok {
i.(*queueItem).timer.Reset(d.after)
}

d.mutex.Unlock()
return
}

delete(d.pending, path)
d.pending.Delete(path)
d.inProgress.Store(path, true)
defer d.inProgress.Delete(path)
d.f(scanItem{
Expand All @@ -110,14 +114,14 @@ func (d *ScanDebouncer) Debounce(item scanItem) {
Recurse: recurse,
})
}),
}
})
}

// InProgress returns true if the given path is currently being processed
func (d *ScanDebouncer) InProgress(path string) bool {
d.mutex.Lock()
defer d.mutex.Unlock()
if _, ok := d.pending[path]; ok {
if _, ok := d.pending.Load(path); ok {
return true
}

Expand Down

0 comments on commit 4e2d6c5

Please sign in to comment.