From 0b7f9161258b6e34a0fdb57a6094444fddf8cec7 Mon Sep 17 00:00:00 2001 From: ChengXiangdong Date: Mon, 23 Oct 2023 19:49:15 +0800 Subject: [PATCH] fix thread pool issue --- pkg/cloudprovider/huaweicloud/huaweicloud.go | 80 ++++++++------------ pkg/common/common.go | 64 ++++++++++++++++ 2 files changed, 94 insertions(+), 50 deletions(-) diff --git a/pkg/cloudprovider/huaweicloud/huaweicloud.go b/pkg/cloudprovider/huaweicloud/huaweicloud.go index 897f4847a..1fd1401ac 100644 --- a/pkg/cloudprovider/huaweicloud/huaweicloud.go +++ b/pkg/cloudprovider/huaweicloud/huaweicloud.go @@ -640,22 +640,24 @@ func IsPodActive(p v1.Pod) bool { type LoadBalancerServiceListener struct { Basic - stopChannel chan struct{} kubeClient *corev1.CoreV1Client + stopChannel chan struct{} + goroutinePool *common.ExecutePool serviceCache map[string]*v1.Service invalidServiceCache *gocache.Cache } func (e *LoadBalancerServiceListener) stopListenerSlice() { klog.Warningf("Stop listening to Endpoints") + e.stopChannel <- struct{}{} close(e.stopChannel) } -var queue = make(chan v1.Service, 3) - func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Service, bool)) { klog.Infof("starting EndpointListener") + e.goroutinePool.Start() + for { endpointsList, err := e.kubeClient.Endpoints(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{Limit: 1}) @@ -693,17 +695,11 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv return } klog.V(6).Infof("New Endpoints added, namespace: %s, name: %s", newEndpoint.Namespace, newEndpoint.Name) - queue <- v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: newEndpoint.Namespace, - Name: newEndpoint.Name, - }, - } - go func() { - s := <-queue - klog.V(6).Infof("process endpoints: %s / %s", s.Namespace, s.Name) - e.dispatcher(s.Namespace, s.Name, endpointAdded, handle) - }() + + e.goroutinePool.Submit(func() { + klog.V(6).Infof("process endpoints: %s / %s", newEndpoint.Namespace, newEndpoint.Name) + e.dispatcher(newEndpoint.Namespace, newEndpoint.Name, endpointAdded, handle) + }) }, UpdateFunc: func(oldObj, newObj interface{}) { newEndpoint := newObj.(*v1.Endpoints) @@ -713,17 +709,10 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv } klog.V(6).Infof("Endpoint update, namespace: %s, name: %s", newEndpoint.Namespace, newEndpoint.Name) - queue <- v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: newEndpoint.Namespace, - Name: newEndpoint.Name, - }, - } - go func() { - s := <-queue - klog.V(6).Infof("process endpoints: %s / %s", s.Namespace, s.Name) - e.dispatcher(s.Namespace, s.Name, endpointUpdate, handle) - }() + e.goroutinePool.Submit(func() { + klog.V(6).Infof("process endpoints: %s / %s", newEndpoint.Namespace, newEndpoint.Name) + e.dispatcher(newEndpoint.Namespace, newEndpoint.Name, endpointUpdate, handle) + }) }, DeleteFunc: func(obj interface{}) {}, }, 5*time.Second) @@ -762,17 +751,11 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv } klog.Infof("Found service was updated, namespace: %s, name: %s", svs.Namespace, svs.Name) - queue <- v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: svs.Namespace, - Name: svs.Name, - }, - } - go func() { - s := <-queue - klog.V(4).Infof("process endpoints: %s / %s", s.Namespace, s.Name) + + e.goroutinePool.Submit(func() { + klog.V(4).Infof("process endpoints: %s / %s", svs.Namespace, svs.Name) handle(svs, false) - }() + }) }, DeleteFunc: func(obj interface{}) { svs, _ := obj.(*v1.Service) @@ -781,17 +764,10 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv } klog.Infof("Found service was deleted, namespace: %s, name: %s", svs.Namespace, svs.Name) - queue <- v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: svs.Namespace, - Name: svs.Name, - }, - } - go func() { - s := <-queue - klog.V(4).Infof("process endpoints: %s / %s", s.Namespace, s.Name) + e.goroutinePool.Submit(func() { + klog.V(4).Infof("process endpoints: %s / %s", svs.Namespace, svs.Name) handle(svs, true) - }() + }) }, }, 5*time.Second) if err != nil { @@ -808,7 +784,8 @@ func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Serv func (e *LoadBalancerServiceListener) dispatcher(namespace, name, eType string, handle func(*v1.Service, bool)) { key := fmt.Sprintf("%s/%s", namespace, name) - if _, ok := e.invalidServiceCache.Get(key); ok { + if v, ok := e.invalidServiceCache.Get(key); ok { + klog.V(6).Infof("Service %s/%s not found, will not try again within 10 minutes: %s", namespace, name, v) return } @@ -816,7 +793,7 @@ func (e *LoadBalancerServiceListener) dispatcher(namespace, name, eType string, if err != nil { klog.Errorf("failed to query service, error: %s", err) if strings.Contains(err.Error(), "not found") { - e.invalidServiceCache.Set(key, "", gocache.DefaultExpiration) + e.invalidServiceCache.Set(key, err.Error(), 10*time.Minute) } return } @@ -902,10 +879,12 @@ func (e *LoadBalancerServiceListener) autoRemoveHealthCheckRule(handle func(node func (h *CloudProvider) listenerDeploy() error { listener := LoadBalancerServiceListener{ - Basic: h.Basic, + Basic: h.Basic, + kubeClient: h.kubeClient, + stopChannel: make(chan struct{}, 1), - kubeClient: h.kubeClient, - serviceCache: make(map[string]*v1.Service, 0), + goroutinePool: common.NewExecutePool(5), + serviceCache: make(map[string]*v1.Service), invalidServiceCache: gocache.New(5*time.Minute, 10*time.Minute), } @@ -979,6 +958,7 @@ func (h *CloudProvider) listenerDeploy() error { }) }, func() { listener.stopListenerSlice() + listener.goroutinePool.Stop() }) return nil } diff --git a/pkg/common/common.go b/pkg/common/common.go index 0f062c437..49c7bae13 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -17,6 +17,10 @@ limitations under the License. package common import ( + "k8s.io/klog/v2" + "os" + "os/signal" + "syscall" "time" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/sdkerr" @@ -53,3 +57,63 @@ func WaitForCompleted(condition wait.ConditionFunc) error { } return wait.ExponentialBackoff(backoff, condition) } + +type JobHandle func() +type ExecutePool struct { + workerNum int + queueCh chan JobHandle + stopCh chan struct{} +} + +func NewExecutePool(size int) *ExecutePool { + return &ExecutePool{ + workerNum: size, + queueCh: make(chan JobHandle, 2000), + } +} + +func (w *ExecutePool) Start() { + // Make sure it is not started repeatedly. + if w.stopCh != nil { + w.stopCh <- struct{}{} + close(w.stopCh) + } + stopCh := make(chan struct{}, 1) + w.stopCh = stopCh + + for i := 0; i < w.workerNum; i++ { + klog.Infof("start goroutine pool handler: %v/%v", i, w.workerNum) + go func() { + for { + select { + case handler, ok := <-w.queueCh: + if !ok { + klog.Errorf("goroutine pool exiting") + return + } + handler() + case <-stopCh: + klog.Info("goroutine pool stopping") + return + } + } + }() + } + + go func() { + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGTERM) + <-exit + w.Stop() + }() +} + +func (w *ExecutePool) Stop() { + w.stopCh <- struct{}{} + close(w.stopCh) + close(w.queueCh) +} + +func (w *ExecutePool) Submit(work JobHandle) { + w.queueCh <- work +}