diff --git a/go.mod b/go.mod index 1f995b54d0..7b29e9f5dd 100644 --- a/go.mod +++ b/go.mod @@ -124,9 +124,12 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/kms v0.26.2 // indirect - k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect - sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 // indirect + k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect + sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) + +// https://github.com/kubernetes/kubernetes/issues/112793 +// Remove after k8s.io/cloud-provider v0.25.3 will be released, updated upstream and rebased. +replace k8s.io/cloud-provider v0.25.2 => github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 diff --git a/go.sum b/go.sum index 7aab16ce3d..5f3b6c0fe0 100644 --- a/go.sum +++ b/go.sum @@ -317,12 +317,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= -github.com/onsi/ginkgo/v2 v2.8.1 h1:xFTEVwOFa1D/Ty24Ws1npBWkDYEV9BqZrsDxVrVkrrU= -github.com/onsi/ginkgo/v2 v2.8.1/go.mod h1:N1/NbDngAFcSLdyZ+/aYTYGSlq9qMCS/cNKGJjy+csc= -github.com/onsi/gomega v1.27.1 h1:rfztXRbg6nv/5f+Raen9RcGoSecHIFgBBLQK3Wdj754= -github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfadcIAw= -github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= -github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/onsi/ginkgo/v2 v2.2.0 h1:3ZNA3L1c5FYDFTTxbFeVGGD8jYvjYauHD30YgLxVsNI= +github.com/onsi/ginkgo/v2 v2.2.0/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7AG4VIk= +github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY= +github.com/onsi/gomega v1.20.2/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= +github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 h1:/4N1vluQjoe+C2ic9w2rc6nruPJR+FqP/G4QAX49PXg= +github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38/go.mod h1:5iKXFWrW3xc/xYOh8WgzjDZ3pDQzUcPoEh4dxEsshgk= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -789,37 +789,34 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.26.2 h1:dM3cinp3PGB6asOySalOZxEG4CZ0IAdJsrYZXE/ovGQ= -k8s.io/api v0.26.2/go.mod h1:1kjMQsFE+QHPfskEcVNgL3+Hp88B80uj0QtSOlj8itU= -k8s.io/apimachinery v0.26.2 h1:da1u3D5wfR5u2RpLhE/ZtZS2P7QvDgLZTi9wrNZl/tQ= -k8s.io/apimachinery v0.26.2/go.mod h1:ats7nN1LExKHvJ9TmwootT00Yz05MuYqPXEXaVeOy5I= -k8s.io/apiserver v0.26.2 h1:Pk8lmX4G14hYqJd1poHGC08G03nIHVqdJMR0SD3IH3o= -k8s.io/apiserver v0.26.2/go.mod h1:GHcozwXgXsPuOJ28EnQ/jXEM9QeG6HT22YxSNmpYNh8= -k8s.io/client-go v0.26.2 h1:s1WkVujHX3kTp4Zn4yGNFK+dlDXy1bAAkIl+cFAiuYI= -k8s.io/client-go v0.26.2/go.mod h1:u5EjOuSyBa09yqqyY7m3abZeovO/7D/WehVVlZ2qcqU= -k8s.io/cloud-provider v0.26.2 h1:VlLGDayUV5VBpvMSBFqmpz2HHTjBLUw02wuZzNeEsW0= -k8s.io/cloud-provider v0.26.2/go.mod h1:/Am9R0merLIZgVqPTE4Z1JkBcCrp2uXImHCxnvVARxc= -k8s.io/component-base v0.26.2 h1:IfWgCGUDzrD6wLLgXEstJKYZKAFS2kO+rBRi0p3LqcI= -k8s.io/component-base v0.26.2/go.mod h1:DxbuIe9M3IZPRxPIzhch2m1eT7uFrSBJUBuVCQEBivs= -k8s.io/component-helpers v0.26.2 h1:+JJ1gwyVsqSwZCJVLJotx/IPq2pMpo0kifeAzfo6i3U= -k8s.io/component-helpers v0.26.2/go.mod h1:PRvoduZ5/IeKGGbZRki3J2cTQVwZLD+EUxIEbvvX0W4= -k8s.io/controller-manager v0.26.2 h1:Y4g50VqaXkr02v5FNTWDQ47ZPFNM1ls00F0+FoKKaTM= -k8s.io/controller-manager v0.26.2/go.mod h1:h8yv0MO3jjo9px49uResC9laZekvOmQRmrRLwe9n6Zw= -k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= -k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kms v0.26.2 h1:GM1gg3tFK3OUU/QQFi93yGjG3lJT8s8l3Wkn2+VxBLM= -k8s.io/kms v0.26.2/go.mod h1:69qGnf1NsFOQP07fBYqNLZklqEHSJF024JqYCaeVxHg= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kubelet v0.26.2 h1:egg7YfhCpH9wvLwQdL2Mzuy4/kC6hO91azY0jgdYPWA= -k8s.io/kubelet v0.26.2/go.mod h1:IXthU5hcJQE6+K33LuaYYO0wUcYO8glhl/ip1Hzux44= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 h1:KTgPnR10d5zhztWptI952TNtt/4u5h3IzDXkdIMuo2Y= -k8s.io/utils v0.0.0-20221128185143-99ec85e7a448/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/api v0.25.2 h1:v6G8RyFcwf0HR5jQGIAYlvtRNrxMJQG1xJzaSeVnIS8= +k8s.io/api v0.25.2/go.mod h1:qP1Rn4sCVFwx/xIhe+we2cwBLTXNcheRyYXwajonhy0= +k8s.io/apimachinery v0.25.2 h1:WbxfAjCx+AeN8Ilp9joWnyJ6xu9OMeS/fsfjK/5zaQs= +k8s.io/apimachinery v0.25.2/go.mod h1:hqqA1X0bsgsxI6dXsJ4HnNTBOmJNxyPp8dw3u2fSHwA= +k8s.io/apiserver v0.25.2 h1:YePimobk187IMIdnmsMxsfIbC5p4eX3WSOrS9x6FEYw= +k8s.io/apiserver v0.25.2/go.mod h1:30r7xyQTREWCkG2uSjgjhQcKVvAAlqoD+YyrqR6Cn+I= +k8s.io/client-go v0.25.2 h1:SUPp9p5CwM0yXGQrwYurw9LWz+YtMwhWd0GqOsSiefo= +k8s.io/client-go v0.25.2/go.mod h1:i7cNU7N+yGQmJkewcRD2+Vuj4iz7b30kI8OcL3horQ4= +k8s.io/component-base v0.25.2 h1:Nve/ZyHLUBHz1rqwkjXm/Re6IniNa5k7KgzxZpTfSQY= +k8s.io/component-base v0.25.2/go.mod h1:90W21YMr+Yjg7MX+DohmZLzjsBtaxQDDwaX4YxDkl60= +k8s.io/component-helpers v0.25.2 h1:A4xQEFq7tbnhB3CTwZTLcQtyEhFFZN2TyQjNgziuSEI= +k8s.io/component-helpers v0.25.2/go.mod h1:iuyfZG2jGWYvR5F/yGFUYNdL/IFz2smcwpNaOqP+YNM= +k8s.io/controller-manager v0.25.2 h1:+5fn/tmPFo6w2yMIK8Y229gzEMUy+Htc/5xFKP1OZl4= +k8s.io/controller-manager v0.25.2/go.mod h1:JIccNEOdh0JSNsWANn0xJ9nkYMiyHeaoSOLrB/iLYD8= +k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= +k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= +k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= +k8s.io/kubelet v0.25.2 h1:L0PXLc2kTfIf6bm+wv4/1dIWwgXWDRTxTErxqFR4nqc= +k8s.io/kubelet v0.25.2/go.mod h1:/ASc/pglUA3TeRMG4hRKSjTa7arT0D6yqLzwqSxwMlY= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI= -sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 h1:LYqFq+6Cj2D0gFfrJvL7iElD4ET6ir3VDdhDdTK7rgc= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33/go.mod h1:soWkSNf2tZC7aMibXEqVhCd73GOY5fJikn8qbdzemB0= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= diff --git a/vendor/k8s.io/cloud-provider/controllers/service/controller.go b/vendor/k8s.io/cloud-provider/controllers/service/controller.go index 2a67e5f716..1a554463b7 100644 --- a/vendor/k8s.io/cloud-provider/controllers/service/controller.go +++ b/vendor/k8s.io/cloud-provider/controllers/service/controller.go @@ -85,14 +85,16 @@ type Controller struct { eventRecorder record.EventRecorder nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced - // services and nodes that need to be synced - serviceQueue workqueue.RateLimitingInterface - nodeQueue workqueue.RateLimitingInterface - // lastSyncedNodes is used when reconciling node state and keeps track of - // the last synced set of nodes. This field is concurrently safe because the - // nodeQueue is serviced by only one go-routine, so node events are not - // processed concurrently. - lastSyncedNodes []*v1.Node + // services that need to be synced + queue workqueue.RateLimitingInterface + + // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time + // and protects internal states (needFullSync) of nodeSync + nodeSyncLock sync.Mutex + // nodeSyncCh triggers nodeSyncLoop to run + nodeSyncCh chan interface{} + // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. + needFullSync bool } // New returns a new service controller to keep cloud provider service resources @@ -118,9 +120,9 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. + nodeSyncCh: make(chan interface{}, 1), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -164,7 +166,7 @@ func New( return } - if !shouldSyncUpdatedNode(oldNode, curNode) { + if !shouldSyncNode(oldNode, curNode) { return } @@ -244,10 +246,33 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr <-ctx.Done() } -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *Controller) serviceWorker(ctx context.Context) { - for c.processNextServiceItem(ctx) { +// triggerNodeSync triggers a nodeSync asynchronously +func (c *Controller) triggerNodeSync() { + c.nodeSyncLock.Lock() + defer c.nodeSyncLock.Unlock() + newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) + // if node list cannot be retrieve, trigger full node sync to be safe. + c.needFullSync = true + } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { + // Here the last known state is recorded as knownHosts. For each + // LB update, the latest node list is retrieved. This is to prevent + // a stale set of nodes were used to be update loadbalancers when + // there are many loadbalancers in the clusters. nodeSyncInternal + // would be triggered until all loadbalancers are updated to the new state. + klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") + c.needFullSync = true + c.knownHosts = newHosts + } + + select { + case c.nodeSyncCh <- struct{}{}: + klog.V(4).Info("Triggering nodeSync") + return + default: + klog.V(4).Info("A pending nodeSync is already in queue") + return } } @@ -422,7 +447,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S } func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { - nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...) + nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) if err != nil { return nil, err } @@ -640,15 +665,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } -func serviceKeys(services []*v1.Service) sets.String { - ret := sets.NewString() - for _, service := range services { - key, _ := cache.MetaNamespaceKeyFunc(service) - ret.Insert(key) - } - return ret -} - func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -657,21 +673,65 @@ func nodeNames(nodes []*v1.Node) sets.String { return ret } -func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { - // Evaluate the individual node exclusion predicate before evaluating the - // compounded result of all predicates. We don't sync ETP=local services - // for changes on the readiness condition, hence if a node remains NotReady - // and a user adds the exclusion label we will need to sync as to make sure - // this change is reflected correctly on ETP=local services. The sync - // function compares lastSyncedNodes with the new (existing) set of nodes - // for each service, so services which are synced with the same set of nodes - // should be skipped internally in the sync function. This is needed as to - // trigger a global sync for all services and make sure no service gets - // skipped due to a changing node predicate. - if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) { +func nodeSlicesEqualForLB(x, y []*v1.Node) bool { + if len(x) != len(y) { + return false + } + return nodeNames(x).Equal(nodeNames(y)) +} + +func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate { + return func(node *v1.Node) bool { + if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel { + return false + } + + // Remove nodes that are about to be deleted by the cluster autoscaler. + for _, taint := range node.Spec.Taints { + if taint.Key == ToBeDeletedTaint { + klog.V(4).Infof("Ignoring node %v with autoscaler taint %+v", node.Name, taint) + return false + } + } + + // If we have no info, don't accept + if len(node.Status.Conditions) == 0 { + return false + } + for _, cond := range node.Status.Conditions { + // We consider the node for load balancing only when its NodeReady condition status + // is ConditionTrue + if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue { + klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status) + return false + } + } + return true + } +} + +func shouldSyncNode(oldNode, newNode *v1.Node) bool { + if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable { return true } - return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) + + if !reflect.DeepEqual(oldNode.Labels, newNode.Labels) { + return true + } + + return nodeReadyConditionStatus(oldNode) != nodeReadyConditionStatus(newNode) +} + +func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus { + for _, condition := range node.Status.Conditions { + if condition.Type != v1.NodeReady { + continue + } + + return condition.Status + } + + return "" } // syncNodes handles updating the hosts pointed to by all load @@ -693,29 +753,24 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { return servicesToRetry } -// nodeSyncService syncs the nodes for one load balancer type service. The return value -// indicates if we should retry. Hence, this functions returns false if we've updated -// load balancers and finished doing it successfully, or didn't try to at all because -// there's no need. This function returns true if we tried to update load balancers and -// failed, indicating to the caller that we should try again. -func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { - retSuccess := false - retNeedRetry := true +// nodeSyncService syncs the nodes for one load balancer type service +func (c *Controller) nodeSyncService(svc *v1.Service) bool { if svc == nil || !wantsLoadBalancer(svc) { - return retSuccess - } - newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) - oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) - if nodeNames(newNodes).Equal(nodeNames(oldNodes)) { - return retSuccess + return false } klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) - if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { + hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + return true + } + + if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) - return retNeedRetry + return true } klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name) - return retSuccess + return false } // updateLoadBalancerHosts updates all existing load balancers so that @@ -724,20 +779,11 @@ func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.N func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) - // Include all nodes and let nodeSyncService filter and figure out if - // the update is relevant for the service in question. - nodes, err := listWithPredicates(c.nodeLister) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return serviceKeys(services) - } - // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} - doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -746,7 +792,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) - c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } @@ -922,75 +967,19 @@ func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus // some set of criteria defined by the function. type NodeConditionPredicate func(node *v1.Node) bool -var ( - allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{ - nodeIncludedPredicate, - nodeUnTaintedPredicate, - nodeReadyPredicate, - } - etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{ - nodeIncludedPredicate, - nodeUnTaintedPredicate, - } -) - -func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate { - if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { - return etpLocalNodePredicates - } - return allNodePredicates -} - -// We consider the node for load balancing only when the node is not labelled for exclusion. -func nodeIncludedPredicate(node *v1.Node) bool { - _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers] - return !hasExcludeBalancerLabel -} - -// We consider the node for load balancing only when its not tainted for deletion by the cluster autoscaler. -func nodeUnTaintedPredicate(node *v1.Node) bool { - for _, taint := range node.Spec.Taints { - if taint.Key == ToBeDeletedTaint { - return false - } - } - return true -} - -// We consider the node for load balancing only when its NodeReady condition status is ConditionTrue -func nodeReadyPredicate(node *v1.Node) bool { - for _, cond := range node.Status.Conditions { - if cond.Type == v1.NodeReady { - return cond.Status == v1.ConditionTrue - } - } - return false -} - -// listWithPredicate gets nodes that matches all predicate functions. -func listWithPredicates(nodeLister corelisters.NodeLister, predicates ...NodeConditionPredicate) ([]*v1.Node, error) { +// listWithPredicate gets nodes that matches predicate function. +func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) { nodes, err := nodeLister.List(labels.Everything()) if err != nil { return nil, err } - return filterWithPredicates(nodes, predicates...), nil -} -func filterWithPredicates(nodes []*v1.Node, predicates ...NodeConditionPredicate) []*v1.Node { var filtered []*v1.Node for i := range nodes { - if respectsPredicates(nodes[i], predicates...) { + if predicate(nodes[i]) { filtered = append(filtered, nodes[i]) } } - return filtered -} -func respectsPredicates(node *v1.Node, predicates ...NodeConditionPredicate) bool { - for _, p := range predicates { - if !p(node) { - return false - } - } - return true + return filtered, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 46118414ad..21c98a2d7b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1216,7 +1216,7 @@ k8s.io/client-go/util/homedir k8s.io/client-go/util/keyutil k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue -# k8s.io/cloud-provider v0.26.2 +# k8s.io/cloud-provider v0.25.2 => github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 ## explicit; go 1.19 k8s.io/cloud-provider k8s.io/cloud-provider/api @@ -1337,7 +1337,7 @@ k8s.io/utils/path k8s.io/utils/pointer k8s.io/utils/strings/slices k8s.io/utils/trace -# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 +# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 ## explicit; go 1.17 sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics diff --git a/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go b/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go index cb186cefc2..d1d99e68e3 100644 --- a/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go +++ b/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go @@ -134,9 +134,6 @@ type grpcTunnel struct { // closing should only be accessed through atomic methods. // TODO: switch this to an atomic.Bool once the client is exclusively buit with go1.19+ closing uint32 - - // Stores the current metrics.ClientConnectionStatus - prevStatus atomic.Value } type clientConn interface { @@ -184,11 +181,13 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context, go tunnel.serve(tunnelCtx) - return tunnel, nil + m := metrics.Metrics.GetClientConnectionsMetric() + m.WithLabelValues(string(prevStatus)).Dec() + m.WithLabelValues(string(status)).Inc() } func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel { - t := grpcTunnel{ + return &grpcTunnel{ stream: stream, clientConn: c, pendingDial: pendingDialManager{pendingDials: make(map[int64]pendingDial)}, @@ -196,36 +195,6 @@ func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *g readTimeoutSeconds: 10, done: make(chan struct{}), } - s := metrics.ClientConnectionStatusCreated - t.prevStatus.Store(s) - metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(s)).Inc() - return &t -} - -func (t *grpcTunnel) updateMetric(status metrics.ClientConnectionStatus) { - select { - case <-t.Done(): - return - default: - } - - prevStatus := t.prevStatus.Swap(status).(metrics.ClientConnectionStatus) - - m := metrics.Metrics.GetClientConnectionsMetric() - m.WithLabelValues(string(prevStatus)).Dec() - m.WithLabelValues(string(status)).Inc() -} - -// closeMetric should be called exactly once to finalize client_connections metric. -func (t *grpcTunnel) closeMetric() { - select { - case <-t.Done(): - return - default: - } - prevStatus := t.prevStatus.Load().(metrics.ClientConnectionStatus) - - metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(prevStatus)).Dec() } func (t *grpcTunnel) serve(tunnelCtx context.Context) { @@ -237,14 +206,12 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // close any channels remaining for these connections. t.conns.closeAll() - t.closeMetric() - close(t.done) }() for { pkt, err := t.stream.Recv() - if err == io.EOF { + if err == io.EOF || t.isClosing() { return } const segment = commonmetrics.SegmentToClient @@ -273,19 +240,13 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context // // In either scenario, we should return here and close the tunnel as it is no longer needed. - kvs := []interface{}{"dialID", resp.Random, "connectID", resp.ConnectID} - if resp.Error != "" { - kvs = append(kvs, "error", resp.Error) - } - klog.V(1).InfoS("DialResp not recognized; dropped", kvs...) + klog.V(1).InfoS("DialResp not recognized; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random) return } result := dialResult{connid: resp.ConnectID} if resp.Error != "" { - result.err = &dialFailure{resp.Error, metrics.DialFailureEndpoint} - } else { - t.updateMetric(metrics.ClientConnectionStatusOk) + result.err = &dialFailure{resp.Error, DialFailureEndpoint} } select { // try to send to the result channel @@ -320,7 +281,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { klog.V(1).InfoS("DIAL_CLS after dial finished", "dialID", resp.Random) } else { result := dialResult{ - err: &dialFailure{"dial closed", metrics.DialFailureDialClosed}, + err: &dialFailure{"dial closed", DialFailureDialClosed}, } select { case pendingDial.resultCh <- result: @@ -373,15 +334,6 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // Dial connects to the address on the named network, similar to // what net.Dial does. The only supported protocol is tcp. func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) { - conn, err := t.dialContext(requestCtx, protocol, address) - if err != nil { - _, reason := GetDialFailureReason(err) - metrics.Metrics.ObserveDialFailure(reason) - } - return conn, err -} - -func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) { select { case <-t.done: return nil, errors.New("tunnel is closed") @@ -446,14 +398,14 @@ func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address s case <-time.After(30 * time.Second): klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random) go t.closeDial(random) - return nil, &dialFailure{"dial timeout, backstop", metrics.DialFailureTimeout} + return nil, &dialFailure{"dial timeout, backstop", DialFailureTimeout} case <-requestCtx.Done(): klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random) go t.closeDial(random) - return nil, &dialFailure{"dial timeout, context", metrics.DialFailureContext} + return nil, &dialFailure{"dial timeout, context", DialFailureContext} case <-t.done: klog.V(5).InfoS("Tunnel closed while waiting for DialResp", "dialID", random) - return nil, &dialFailure{"tunnel closed", metrics.DialFailureTunnelClosed} + return nil, &dialFailure{"tunnel closed", DialFailureTunnelClosed} } return c, nil @@ -473,10 +425,7 @@ func (t *grpcTunnel) closeDial(dialID int64) { }, }, } - const segment = commonmetrics.SegmentFromClient - metrics.Metrics.ObservePacket(segment, req.Type) if err := t.stream.Send(req); err != nil { - metrics.Metrics.ObserveStreamError(segment, err, req.Type) klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID) } t.closeTunnel() @@ -491,19 +440,38 @@ func (t *grpcTunnel) isClosing() bool { return atomic.LoadUint32(&t.closing) != 0 } -func GetDialFailureReason(err error) (isDialFailure bool, reason metrics.DialFailureReason) { +func GetDialFailureReason(err error) (isDialFailure bool, reason DialFailureReason) { var df *dialFailure if errors.As(err, &df) { return true, df.reason } - return false, metrics.DialFailureUnknown + return false, DialFailureUnknown } type dialFailure struct { msg string - reason metrics.DialFailureReason + reason DialFailureReason } func (df *dialFailure) Error() string { return df.msg } + +type DialFailureReason string + +const ( + DialFailureUnknown DialFailureReason = "unknown" + // DialFailureTimeout indicates the hard 30 second timeout was hit. + DialFailureTimeout DialFailureReason = "timeout" + // DialFailureContext indicates that the context was cancelled or reached it's deadline before + // the dial response was returned. + DialFailureContext DialFailureReason = "context" + // DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint. + DialFailureEndpoint DialFailureReason = "endpoint" + // DialFailureDialClosed indicates that the client received a CloseDial response, indicating the + // connection was closed before the dial could complete. + DialFailureDialClosed DialFailureReason = "dialclosed" + // DialFailureTunnelClosed indicates that the client connection was closed before the dial could + // complete. + DialFailureTunnelClosed DialFailureReason = "tunnelclosed" +)