Skip to content

Commit

Permalink
Merge pull request #125 from ttimonen/ttimonen/watch
Browse files Browse the repository at this point in the history
Fix watcher data race
  • Loading branch information
matthyx authored Jul 19, 2024
2 parents 62ef3ff + de95b6c commit a3044a6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 58 deletions.
65 changes: 18 additions & 47 deletions pkg/registry/file/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package file

import (
"errors"
"strings"
"path"
"slices"
"sync"

"github.com/puzpuzpuz/xsync/v2"
Expand Down Expand Up @@ -65,48 +66,27 @@ type watchDispatcher struct {
}

func newWatchDispatcher() watchDispatcher {
wbk := xsync.NewMapOf[watchersList]()
return watchDispatcher{watchesByKey: wbk}
return watchDispatcher{xsync.NewMapOf[watchersList]()}
}

func extractKeysToNotify(key string) ([]string, error) {
resKeys := []string{}
func extractKeysToNotify(key string) []string {
if key[0] != '/' {
return resKeys, errInvalidKey
return []string{}
}

sep := '/'
currentKey := strings.Builder{}

for idx, char := range key {
consumed := false
last := idx == (len(key) - 1)

if char == sep {
resKeys = append(resKeys, currentKey.String())
consumed = true
}

currentKey.WriteRune(char)

if last && !consumed {
resKeys = append(resKeys, currentKey.String())
}
ret := []string{"/"}
for left := key; left != "/"; left = path.Dir(left) {
ret = append(ret, left)
}
resKeys[0] = "/"

return resKeys, nil
slices.Sort(ret)
return ret
}

// Register registers a watcher for a given key
func (wd *watchDispatcher) Register(key string, w *watcher) {
existingWatchers, ok := wd.watchesByKey.Load(key)
if ok {
existingWatchers = append(existingWatchers, w)
wd.watchesByKey.Store(key, existingWatchers)
} else {
wd.watchesByKey.Store(key, watchersList{w})
}
wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) {
return append(l, w), false
})
}

// Added dispatches an "Added" event to appropriate watchers
Expand All @@ -127,22 +107,13 @@ func (wd *watchDispatcher) Modified(key string, obj runtime.Object) {
// notify notifies the listeners of a given key about an event of a given eventType about a given obj
func (wd *watchDispatcher) notify(key string, eventType watch.EventType, obj runtime.Object) {
// Don’t block callers by publishing in a separate goroutine
// TODO(ttimonen) This is kind of expensive way to manage queue, yet watchers might block each other.
go func() {
event := watch.Event{Type: eventType, Object: obj}
keysToNotify, err := extractKeysToNotify(key)
if err != nil {
return
}

for idx := range keysToNotify {
notifiedKey := keysToNotify[idx]
watchers, ok := wd.watchesByKey.Load(notifiedKey)
if !ok {
continue
}

for idx := range watchers {
watchers[idx].notify(event)
for _, part := range extractKeysToNotify(key) {
ws, _ := wd.watchesByKey.Load(part)
for _, w := range ws {
w.notify(event)
}
}
}()
Expand Down
15 changes: 4 additions & 11 deletions pkg/registry/file/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,19 @@ const (

func TestExtractKeysToNotify(t *testing.T) {
tt := []struct {
name string
inputKey string
expectedKeys []string
expectedError error
name string
inputKey string
expectedKeys []string
}{
{
"root key should produce only itself",
"/",
[]string{"/"},
nil,
},
{
"API resource key should produce root and itself",
"/spdx.softwarecomposition.kubescape.io",
[]string{"/", "/spdx.softwarecomposition.kubescape.io"},
nil,
},
{
"Full resource key should produce the full lineage",
Expand All @@ -47,22 +44,18 @@ func TestExtractKeysToNotify(t *testing.T) {
"/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3filtereds/kubescape",
"/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3filtereds/kubescape/titi",
},
nil,
},
{
"Missing leading slash should produce an error",
"spdx.softwarecomposition.kubescape.io/sbomspdxv2p3filtereds/kubescape/titi",
[]string{},
errInvalidKey,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
got, err := extractKeysToNotify(tc.inputKey)

got := extractKeysToNotify(tc.inputKey)
assert.Equal(t, tc.expectedKeys, got)
assert.ErrorIs(t, err, tc.expectedError)
})
}

Expand Down

0 comments on commit a3044a6

Please sign in to comment.