From e0c8517a98d18da856554bc96b5000a55955a0de Mon Sep 17 00:00:00 2001 From: Shriram Sharma Date: Mon, 6 Jun 2022 16:40:07 -0700 Subject: [PATCH 1/2] fixes 224 concurrent map issue Signed-off-by: Shriram Sharma --- admiral/pkg/controller/admiral/service.go | 10 +++- .../pkg/controller/admiral/service_test.go | 59 +++++++++++++++++-- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/admiral/pkg/controller/admiral/service.go b/admiral/pkg/controller/admiral/service.go index 0798354a..fb9f4521 100644 --- a/admiral/pkg/controller/admiral/service.go +++ b/admiral/pkg/controller/admiral/service.go @@ -2,17 +2,19 @@ package admiral import ( "fmt" + "time" + "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" - "time" + + "sync" k8sV1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "sync" ) // Handler interface contains the methods that are required @@ -69,6 +71,8 @@ func (s *serviceCache) getKey(service *k8sV1.Service) string { } func (s *serviceCache) Get(key string) *ServiceClusterEntry { + defer s.mutex.Unlock() + s.mutex.Lock() return s.cache[key] } @@ -146,7 +150,7 @@ func NewServiceController(stopCh <-chan struct{}, handler ServiceHandler, config &k8sV1.Service{}, resyncPeriod, cache.Indexers{}, ) - NewController("service-ctrl-" + config.Host , stopCh, &serviceController, serviceController.informer) + NewController("service-ctrl-"+config.Host, stopCh, &serviceController, serviceController.informer) return &serviceController, nil } diff --git a/admiral/pkg/controller/admiral/service_test.go b/admiral/pkg/controller/admiral/service_test.go index 72d74215..01ee1f4c 100644 --- a/admiral/pkg/controller/admiral/service_test.go +++ b/admiral/pkg/controller/admiral/service_test.go @@ -1,15 +1,20 @@ package admiral import ( + "context" + "sync" + "testing" + "time" + "github.com/google/go-cmp/cmp" "github.com/istio-ecosystem/admiral/admiral/pkg/controller/common" "github.com/istio-ecosystem/admiral/admiral/pkg/test" - "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/clientcmd" - "sync" - "testing" - "time" ) func TestNewServiceController(t *testing.T) { @@ -248,3 +253,49 @@ func TestServiceCache_GetLoadBalancer(t *testing.T) { }) } } + +func TestConcurrentGetAndPut(t *testing.T) { + serviceCache := serviceCache{} + serviceCache.cache = make(map[string]*ServiceClusterEntry) + serviceCache.mutex = &sync.Mutex{} + + serviceCache.Put(&v1.Service{ + ObjectMeta: metaV1.ObjectMeta{Name: "testname", Namespace: "testns"}, + }) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) + defer cancel() + + var wg sync.WaitGroup + wg.Add(2) + // Producer go routine + go func(ctx context.Context) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + serviceCache.Put(&v1.Service{ + ObjectMeta: metaV1.ObjectMeta{Name: "testname", Namespace: string(uuid.NewUUID())}, + }) + } + } + }(ctx) + + // Consumer go routine + go func(ctx context.Context) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + assert.NotNil(t, serviceCache.Get("testns")) + } + } + }(ctx) + + wg.Wait() + +} From 7e90188ff5c358996d00e4b7589e003503d69f95 Mon Sep 17 00:00:00 2001 From: Shriram Sharma Date: Tue, 7 Jun 2022 09:05:44 -0700 Subject: [PATCH 2/2] made changes as per review comments Signed-off-by: Shriram Sharma --- admiral/pkg/controller/admiral/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/admiral/pkg/controller/admiral/service.go b/admiral/pkg/controller/admiral/service.go index fb9f4521..2dc6a1e2 100644 --- a/admiral/pkg/controller/admiral/service.go +++ b/admiral/pkg/controller/admiral/service.go @@ -71,8 +71,8 @@ func (s *serviceCache) getKey(service *k8sV1.Service) string { } func (s *serviceCache) Get(key string) *ServiceClusterEntry { - defer s.mutex.Unlock() s.mutex.Lock() + defer s.mutex.Unlock() return s.cache[key] }