From a24887e1e7a9dc71c0eaed7525c73756c0ed5c86 Mon Sep 17 00:00:00 2001 From: Igor Karpukhin Date: Thu, 18 Apr 2024 12:26:31 +0200 Subject: [PATCH] CLOUDP-244156: Fix the concurrent writes for the ResourceWatcher (#1520) Fix the concurrent writes for the ResourceWatcher --- pkg/controller/watch/resource_watcher.go | 8 ++++++++ pkg/controller/watch/resource_watcher_test.go | 16 +++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/controller/watch/resource_watcher.go b/pkg/controller/watch/resource_watcher.go index f6946f3c7c..7259f4f2f6 100644 --- a/pkg/controller/watch/resource_watcher.go +++ b/pkg/controller/watch/resource_watcher.go @@ -1,24 +1,30 @@ package watch import ( + "sync" + "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/client" ) func NewResourceWatcher() ResourceWatcher { return ResourceWatcher{ + mtx: &sync.Mutex{}, WatchedResources: map[WatchedObject]map[client.ObjectKey]bool{}, } } // ResourceWatcher is the object containing the map of watched_resource -> []dependant_resource. type ResourceWatcher struct { + mtx *sync.Mutex WatchedResources map[WatchedObject]map[client.ObjectKey]bool } // EnsureResourcesAreWatched registers a dependant for the watched objects. // This will let the controller to react on the events for the watched objects and trigger reconciliation for dependants. func (r ResourceWatcher) EnsureResourcesAreWatched(dependant client.ObjectKey, resourceKind string, log *zap.SugaredLogger, watchedObjectsKeys ...client.ObjectKey) { + r.mtx.Lock() + defer r.mtx.Unlock() for _, watchedObjectKey := range watchedObjectsKeys { r.addWatchedResourceIfNotAdded(watchedObjectKey, resourceKind, dependant, log) } @@ -28,6 +34,8 @@ func (r ResourceWatcher) EnsureResourcesAreWatched(dependant client.ObjectKey, r } func (r ResourceWatcher) EnsureMultiplesResourcesAreWatched(dependant client.ObjectKey, log *zap.SugaredLogger, resources ...WatchedObject) { + r.mtx.Lock() + defer r.mtx.Unlock() for _, res := range resources { r.addWatchedResourceIfNotAdded(res.Resource, res.ResourceKind, dependant, log) log.Debugf("resource watcher: watching %v to trigger reconciliation for %v", res.Resource, dependant) diff --git a/pkg/controller/watch/resource_watcher_test.go b/pkg/controller/watch/resource_watcher_test.go index 9453aafdc7..98348194fb 100644 --- a/pkg/controller/watch/resource_watcher_test.go +++ b/pkg/controller/watch/resource_watcher_test.go @@ -1,6 +1,7 @@ package watch import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -17,9 +18,18 @@ func TestEnsureResourcesAreWatched(t *testing.T) { project2 := kube.ObjectKey("test", "project2") connectionSecret := kube.ObjectKey("test", "connectionSecret") - watcher.EnsureResourcesAreWatched(project1, "Secret", zap.S(), connectionSecret) - watcher.EnsureResourcesAreWatched(project2, "Secret", zap.S(), connectionSecret) - + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + watcher.EnsureResourcesAreWatched(project1, "Secret", zap.S(), connectionSecret) + }() + go func() { + defer wg.Done() + watcher.EnsureResourcesAreWatched(project2, "Secret", zap.S(), connectionSecret) + }() + wg.Wait() expectedWatched := map[WatchedObject]map[client.ObjectKey]bool{ {ResourceKind: "Secret", Resource: connectionSecret}: {project1: true, project2: true}, }