diff --git a/go.mod b/go.mod index 13629c3625..6ef8082acb 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 - golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 google.golang.org/api v0.63.0 gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/gcfg.v1 v1.2.0 @@ -129,9 +129,8 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.24.2 // indirect - k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect - k8s.io/kms v0.26.0 // indirect - k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect + k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect + k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect @@ -191,3 +190,7 @@ replace ( k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.26.0 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.26.0 ) + +// https://github.com/kubernetes/kubernetes/issues/112793 +// Remove after k8s.io/cloud-provider v0.25.3 will be released, updated upstream and rebased. +replace k8s.io/cloud-provider v0.25.2 => github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 diff --git a/go.sum b/go.sum index e811274bec..9a33799a8e 100644 --- a/go.sum +++ b/go.sum @@ -299,9 +299,8 @@ github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= -github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= -github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= -github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 h1:/4N1vluQjoe+C2ic9w2rc6nruPJR+FqP/G4QAX49PXg= +github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38/go.mod h1:5iKXFWrW3xc/xYOh8WgzjDZ3pDQzUcPoEh4dxEsshgk= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -420,10 +419,8 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= -go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= @@ -672,53 +669,44 @@ gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.26.0 h1:IpPlZnxBpV1xl7TGk/X6lFtpgjgntCg8PJ+qrPHAC7I= -k8s.io/api v0.26.0/go.mod h1:k6HDTaIFC8yn1i6pSClSqIwLABIcLV9l5Q4EcngKnQg= -k8s.io/apiextensions-apiserver v0.26.0 h1:Gy93Xo1eg2ZIkNX/8vy5xviVSxwQulsnUdQ00nEdpDo= -k8s.io/apiextensions-apiserver v0.26.0/go.mod h1:7ez0LTiyW5nq3vADtK6C3kMESxadD51Bh6uz3JOlqWQ= -k8s.io/apimachinery v0.26.0 h1:1feANjElT7MvPqp0JT6F3Ss6TWDwmcjLypwoPpEf7zg= -k8s.io/apimachinery v0.26.0/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= -k8s.io/apiserver v0.26.0 h1:q+LqIK5EZwdznGZb8bq0+a+vCqdeEEe4Ux3zsOjbc4o= -k8s.io/apiserver v0.26.0/go.mod h1:aWhlLD+mU+xRo+zhkvP/gFNbShI4wBDHS33o0+JGI84= -k8s.io/client-go v0.26.0 h1:lT1D3OfO+wIi9UFolCrifbjUUgu7CpLca0AD8ghRLI8= -k8s.io/client-go v0.26.0/go.mod h1:I2Sh57A79EQsDmn7F7ASpmru1cceh3ocVT9KlX2jEZg= -k8s.io/cloud-provider v0.26.0 h1:kO2BIgCou71QNRHGkpFi/8lnas9UIr+fJz1l/nuiOMo= -k8s.io/cloud-provider v0.26.0/go.mod h1:JwfUAH67C8f7t6tOC4v4ty+DuvIYVjNF6bGVYSDCqqs= -k8s.io/cloud-provider-gcp/crd v0.0.0-20221216171017-7050bed673bb h1:/VR4G07ibUHVF3oSGBpsU8VNkhPcXxoB+ursiDef8aQ= -k8s.io/cloud-provider-gcp/crd v0.0.0-20221216171017-7050bed673bb/go.mod h1:sYxsuNMb/rnrVIbGgG66UJmOlb2UoQlKgxg2Kfs0Qkw= -k8s.io/code-generator v0.26.0 h1:ZDY+7Gic9p/lACgD1G72gQg2CvNGeAYZTPIncv+iALM= -k8s.io/code-generator v0.26.0/go.mod h1:OMoJ5Dqx1wgaQzKgc+ZWaZPfGjdRq/Y3WubFrZmeI3I= -k8s.io/component-base v0.26.0 h1:0IkChOCohtDHttmKuz+EP3j3+qKmV55rM9gIFTXA7Vs= -k8s.io/component-base v0.26.0/go.mod h1:lqHwlfV1/haa14F/Z5Zizk5QmzaVf23nQzCwVOQpfC8= -k8s.io/component-helpers v0.26.0 h1:KNgwqs3EUdK0HLfW4GhnbD+q/Zl9U021VfIU7qoVYFk= -k8s.io/component-helpers v0.26.0/go.mod h1:jHN01qS/Jdj95WCbTe9S2VZ9yxpxXNY488WjF+yW4fo= -k8s.io/controller-manager v0.26.0 h1:6xIWxs3+Xhj/hoyzELCk871PrIed4J4QqEh57LzQe+8= -k8s.io/controller-manager v0.26.0/go.mod h1:GxUYtQDBE/RHh7AnZSZqwi2xBPIXlOaWsnDLflKGYrE= -k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= -k8s.io/gengo v0.0.0-20220902162205-c0856e24416d h1:U9tB195lKdzwqicbJvyJeOXV7Klv+wNAWENRnXEGi08= -k8s.io/gengo v0.0.0-20220902162205-c0856e24416d/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= +k8s.io/api v0.25.2 h1:v6G8RyFcwf0HR5jQGIAYlvtRNrxMJQG1xJzaSeVnIS8= +k8s.io/api v0.25.2/go.mod h1:qP1Rn4sCVFwx/xIhe+we2cwBLTXNcheRyYXwajonhy0= +k8s.io/apiextensions-apiserver v0.25.2 h1:8uOQX17RE7XL02ngtnh3TgifY7EhekpK+/piwzQNnBo= +k8s.io/apiextensions-apiserver v0.25.2/go.mod h1:iRwwRDlWPfaHhuBfQ0WMa5skdQfrE18QXJaJvIDLvE8= +k8s.io/apimachinery v0.25.2 h1:WbxfAjCx+AeN8Ilp9joWnyJ6xu9OMeS/fsfjK/5zaQs= +k8s.io/apimachinery v0.25.2/go.mod h1:hqqA1X0bsgsxI6dXsJ4HnNTBOmJNxyPp8dw3u2fSHwA= +k8s.io/apiserver v0.25.2 h1:YePimobk187IMIdnmsMxsfIbC5p4eX3WSOrS9x6FEYw= +k8s.io/apiserver v0.25.2/go.mod h1:30r7xyQTREWCkG2uSjgjhQcKVvAAlqoD+YyrqR6Cn+I= +k8s.io/client-go v0.25.2 h1:SUPp9p5CwM0yXGQrwYurw9LWz+YtMwhWd0GqOsSiefo= +k8s.io/client-go v0.25.2/go.mod h1:i7cNU7N+yGQmJkewcRD2+Vuj4iz7b30kI8OcL3horQ4= +k8s.io/code-generator v0.25.2 h1:qEHux0+E1c+j1MhsWn9+4Z6av8zrZBixOTPW064rSiY= +k8s.io/code-generator v0.25.2/go.mod h1:f61OcU2VqVQcjt/6TrU0sta1TA5hHkOO6ZZPwkL9Eys= +k8s.io/component-base v0.25.2 h1:Nve/ZyHLUBHz1rqwkjXm/Re6IniNa5k7KgzxZpTfSQY= +k8s.io/component-base v0.25.2/go.mod h1:90W21YMr+Yjg7MX+DohmZLzjsBtaxQDDwaX4YxDkl60= +k8s.io/component-helpers v0.25.2 h1:A4xQEFq7tbnhB3CTwZTLcQtyEhFFZN2TyQjNgziuSEI= +k8s.io/component-helpers v0.25.2/go.mod h1:iuyfZG2jGWYvR5F/yGFUYNdL/IFz2smcwpNaOqP+YNM= +k8s.io/controller-manager v0.25.2 h1:+5fn/tmPFo6w2yMIK8Y229gzEMUy+Htc/5xFKP1OZl4= +k8s.io/controller-manager v0.25.2/go.mod h1:JIccNEOdh0JSNsWANn0xJ9nkYMiyHeaoSOLrB/iLYD8= +k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 h1:TT1WdmqqXareKxZ/oNXEUSwKlLiHzPMyB0t8BaFeBYI= +k8s.io/gengo v0.0.0-20211129171323-c02415ce4185/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= -k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= -k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kms v0.26.0 h1:5+GOQLvUajSd0z5ODF52RzB2rHo1HJUSYsVC3Ri3VgI= -k8s.io/kms v0.26.0/go.mod h1:ReC1IEGuxgfN+PDCIpR6w8+XMmDE7uJhxcCwMZFdIYc= -k8s.io/kube-controller-manager v0.26.0 h1:Ex/Pz57e+9cRO6/LC/jUz6U6apSmaqjVa2A3YQ1ad5Y= -k8s.io/kube-controller-manager v0.26.0/go.mod h1:e7C5Xk68HhOUprH6CE55jt7PhTUeEaBNwnwGeoe3OEo= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kubelet v0.26.0 h1:08bDb5IoUH/1K1t2NUwnGIIWxjm9LSqn6k3FWw1tJGI= -k8s.io/kubelet v0.26.0/go.mod h1:DluF+d8jS2nE/Hs7CC3QM+OZlIEb22NTOihQ3EDwCQ4= -k8s.io/kubernetes v1.26.0 h1:fL8VMr4xlfTazPORLhz5fsvO5I3bsFpmynVxZTH1ItQ= -k8s.io/kubernetes v1.26.0/go.mod h1:z0aCJwn6DxzB/dDiWLbQaJO5jWOR2qoaCMnmSAx45XM= -k8s.io/metrics v0.26.0 h1:U/NzZHKDrIVGL93AUMRkqqXjOah3wGvjSnKmG/5NVCs= -k8s.io/metrics v0.26.0/go.mod h1:cf5MlG4ZgWaEFZrR9+sOImhZ2ICMpIdNurA+D8snIs8= -k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -k8s.io/utils v0.0.0-20221107191617-1a15be271d1d h1:0Smp/HP1OH4Rvhe+4B8nWGERtlqAGSftbSbbmm45oFs= -k8s.io/utils v0.0.0-20221107191617-1a15be271d1d/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/klog/v2 v2.70.1 h1:7aaoSdahviPmR+XkS7FyxlkkXs6tHISSG03RxleQAVQ= +k8s.io/klog/v2 v2.70.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +k8s.io/kube-controller-manager v0.25.2 h1:TFfe6efK0+X8EU0wwzkCBNoqU5ZlouRLG0F9BSM8KMA= +k8s.io/kube-controller-manager v0.25.2/go.mod h1:87mw+tGY3jcCmiDBHlxqu69ZHMFR9vzFxAIrky2/XYA= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 h1:MQ8BAZPZlWk3S9K4a9NCkIFQtZShWqoha7snGixVgEA= +k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1/go.mod h1:C/N6wCaBHeBHkHUesQOQy2/MZqGgMAFPqGsGQLdbZBU= +k8s.io/kubelet v0.25.2 h1:L0PXLc2kTfIf6bm+wv4/1dIWwgXWDRTxTErxqFR4nqc= +k8s.io/kubelet v0.25.2/go.mod h1:/ASc/pglUA3TeRMG4hRKSjTa7arT0D6yqLzwqSxwMlY= +k8s.io/kubernetes v1.25.2 h1:khlfsw0tr0YIbs0fs4uNAy4TcQJuBwhjpyHpkrCNWvs= +k8s.io/kubernetes v1.25.2/go.mod h1:OlhMy4mww2SLr1KtdV1vC014/MwfYlts/C66gtnDkKE= +k8s.io/metrics v0.25.2 h1:105TuPaIFfr4EHzN56WwZJO7r1UesuDytNTzeMqGySo= +k8s.io/metrics v0.25.2/go.mod h1:4NDAauOuEJ+NWO2+hWkhFE4rWBx/plLWJOYU3vGl0sA= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/lQhTmy+Hr/Cpue4= +k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 h1:LYqFq+6Cj2D0gFfrJvL7iElD4ET6ir3VDdhDdTK7rgc= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33/go.mod h1:soWkSNf2tZC7aMibXEqVhCd73GOY5fJikn8qbdzemB0= -sigs.k8s.io/controller-tools v0.6.0/go.mod h1:baRMVPrctU77F+rfAuH2uPqW93k6yQnZA2dhUOr7ihc= sigs.k8s.io/controller-tools v0.8.0 h1:uUkfTGEwrguqYYfcI2RRGUnC8mYdCFDqfwPKUcNJh1o= sigs.k8s.io/controller-tools v0.8.0/go.mod h1:qE2DXhVOiEq5ijmINcFbqi9GZrrUjzB1TuJU0xa6eoY= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= diff --git a/vendor/k8s.io/cloud-provider/controllers/service/controller.go b/vendor/k8s.io/cloud-provider/controllers/service/controller.go index 2a67e5f716..1a554463b7 100644 --- a/vendor/k8s.io/cloud-provider/controllers/service/controller.go +++ b/vendor/k8s.io/cloud-provider/controllers/service/controller.go @@ -85,14 +85,16 @@ type Controller struct { eventRecorder record.EventRecorder nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced - // services and nodes that need to be synced - serviceQueue workqueue.RateLimitingInterface - nodeQueue workqueue.RateLimitingInterface - // lastSyncedNodes is used when reconciling node state and keeps track of - // the last synced set of nodes. This field is concurrently safe because the - // nodeQueue is serviced by only one go-routine, so node events are not - // processed concurrently. - lastSyncedNodes []*v1.Node + // services that need to be synced + queue workqueue.RateLimitingInterface + + // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time + // and protects internal states (needFullSync) of nodeSync + nodeSyncLock sync.Mutex + // nodeSyncCh triggers nodeSyncLoop to run + nodeSyncCh chan interface{} + // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. + needFullSync bool } // New returns a new service controller to keep cloud provider service resources @@ -118,9 +120,9 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. + nodeSyncCh: make(chan interface{}, 1), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -164,7 +166,7 @@ func New( return } - if !shouldSyncUpdatedNode(oldNode, curNode) { + if !shouldSyncNode(oldNode, curNode) { return } @@ -244,10 +246,33 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr <-ctx.Done() } -// worker runs a worker thread that just dequeues items, processes them, and marks them done. -// It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *Controller) serviceWorker(ctx context.Context) { - for c.processNextServiceItem(ctx) { +// triggerNodeSync triggers a nodeSync asynchronously +func (c *Controller) triggerNodeSync() { + c.nodeSyncLock.Lock() + defer c.nodeSyncLock.Unlock() + newHosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) + // if node list cannot be retrieve, trigger full node sync to be safe. + c.needFullSync = true + } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { + // Here the last known state is recorded as knownHosts. For each + // LB update, the latest node list is retrieved. This is to prevent + // a stale set of nodes were used to be update loadbalancers when + // there are many loadbalancers in the clusters. nodeSyncInternal + // would be triggered until all loadbalancers are updated to the new state. + klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") + c.needFullSync = true + c.knownHosts = newHosts + } + + select { + case c.nodeSyncCh <- struct{}{}: + klog.V(4).Info("Triggering nodeSync") + return + default: + klog.V(4).Info("A pending nodeSync is already in queue") + return } } @@ -422,7 +447,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S } func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { - nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...) + nodes, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) if err != nil { return nil, err } @@ -640,15 +665,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } -func serviceKeys(services []*v1.Service) sets.String { - ret := sets.NewString() - for _, service := range services { - key, _ := cache.MetaNamespaceKeyFunc(service) - ret.Insert(key) - } - return ret -} - func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -657,21 +673,65 @@ func nodeNames(nodes []*v1.Node) sets.String { return ret } -func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { - // Evaluate the individual node exclusion predicate before evaluating the - // compounded result of all predicates. We don't sync ETP=local services - // for changes on the readiness condition, hence if a node remains NotReady - // and a user adds the exclusion label we will need to sync as to make sure - // this change is reflected correctly on ETP=local services. The sync - // function compares lastSyncedNodes with the new (existing) set of nodes - // for each service, so services which are synced with the same set of nodes - // should be skipped internally in the sync function. This is needed as to - // trigger a global sync for all services and make sure no service gets - // skipped due to a changing node predicate. - if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) { +func nodeSlicesEqualForLB(x, y []*v1.Node) bool { + if len(x) != len(y) { + return false + } + return nodeNames(x).Equal(nodeNames(y)) +} + +func (c *Controller) getNodeConditionPredicate() NodeConditionPredicate { + return func(node *v1.Node) bool { + if _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers]; hasExcludeBalancerLabel { + return false + } + + // Remove nodes that are about to be deleted by the cluster autoscaler. + for _, taint := range node.Spec.Taints { + if taint.Key == ToBeDeletedTaint { + klog.V(4).Infof("Ignoring node %v with autoscaler taint %+v", node.Name, taint) + return false + } + } + + // If we have no info, don't accept + if len(node.Status.Conditions) == 0 { + return false + } + for _, cond := range node.Status.Conditions { + // We consider the node for load balancing only when its NodeReady condition status + // is ConditionTrue + if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue { + klog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status) + return false + } + } + return true + } +} + +func shouldSyncNode(oldNode, newNode *v1.Node) bool { + if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable { return true } - return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) + + if !reflect.DeepEqual(oldNode.Labels, newNode.Labels) { + return true + } + + return nodeReadyConditionStatus(oldNode) != nodeReadyConditionStatus(newNode) +} + +func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus { + for _, condition := range node.Status.Conditions { + if condition.Type != v1.NodeReady { + continue + } + + return condition.Status + } + + return "" } // syncNodes handles updating the hosts pointed to by all load @@ -693,29 +753,24 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { return servicesToRetry } -// nodeSyncService syncs the nodes for one load balancer type service. The return value -// indicates if we should retry. Hence, this functions returns false if we've updated -// load balancers and finished doing it successfully, or didn't try to at all because -// there's no need. This function returns true if we tried to update load balancers and -// failed, indicating to the caller that we should try again. -func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { - retSuccess := false - retNeedRetry := true +// nodeSyncService syncs the nodes for one load balancer type service +func (c *Controller) nodeSyncService(svc *v1.Service) bool { if svc == nil || !wantsLoadBalancer(svc) { - return retSuccess - } - newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) - oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) - if nodeNames(newNodes).Equal(nodeNames(oldNodes)) { - return retSuccess + return false } klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) - if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { + hosts, err := listWithPredicate(c.nodeLister, c.getNodeConditionPredicate()) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + return true + } + + if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) - return retNeedRetry + return true } klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name) - return retSuccess + return false } // updateLoadBalancerHosts updates all existing load balancers so that @@ -724,20 +779,11 @@ func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.N func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) - // Include all nodes and let nodeSyncService filter and figure out if - // the update is relevant for the service in question. - nodes, err := listWithPredicates(c.nodeLister) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return serviceKeys(services) - } - // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} - doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -746,7 +792,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) - c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } @@ -922,75 +967,19 @@ func (c *Controller) patchStatus(service *v1.Service, previousStatus, newStatus // some set of criteria defined by the function. type NodeConditionPredicate func(node *v1.Node) bool -var ( - allNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{ - nodeIncludedPredicate, - nodeUnTaintedPredicate, - nodeReadyPredicate, - } - etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{ - nodeIncludedPredicate, - nodeUnTaintedPredicate, - } -) - -func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate { - if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { - return etpLocalNodePredicates - } - return allNodePredicates -} - -// We consider the node for load balancing only when the node is not labelled for exclusion. -func nodeIncludedPredicate(node *v1.Node) bool { - _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers] - return !hasExcludeBalancerLabel -} - -// We consider the node for load balancing only when its not tainted for deletion by the cluster autoscaler. -func nodeUnTaintedPredicate(node *v1.Node) bool { - for _, taint := range node.Spec.Taints { - if taint.Key == ToBeDeletedTaint { - return false - } - } - return true -} - -// We consider the node for load balancing only when its NodeReady condition status is ConditionTrue -func nodeReadyPredicate(node *v1.Node) bool { - for _, cond := range node.Status.Conditions { - if cond.Type == v1.NodeReady { - return cond.Status == v1.ConditionTrue - } - } - return false -} - -// listWithPredicate gets nodes that matches all predicate functions. -func listWithPredicates(nodeLister corelisters.NodeLister, predicates ...NodeConditionPredicate) ([]*v1.Node, error) { +// listWithPredicate gets nodes that matches predicate function. +func listWithPredicate(nodeLister corelisters.NodeLister, predicate NodeConditionPredicate) ([]*v1.Node, error) { nodes, err := nodeLister.List(labels.Everything()) if err != nil { return nil, err } - return filterWithPredicates(nodes, predicates...), nil -} -func filterWithPredicates(nodes []*v1.Node, predicates ...NodeConditionPredicate) []*v1.Node { var filtered []*v1.Node for i := range nodes { - if respectsPredicates(nodes[i], predicates...) { + if predicate(nodes[i]) { filtered = append(filtered, nodes[i]) } } - return filtered -} -func respectsPredicates(node *v1.Node, predicates ...NodeConditionPredicate) bool { - for _, p := range predicates { - if !p(node) { - return false - } - } - return true + return filtered, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 084ecd1f98..98e7a6b814 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -247,9 +247,6 @@ github.com/spf13/cobra # github.com/spf13/pflag v1.0.5 => github.com/spf13/pflag v1.0.5 ## explicit; go 1.12 github.com/spf13/pflag -# github.com/stoewer/go-strcase v1.2.0 -## explicit; go 1.11 -github.com/stoewer/go-strcase # github.com/stretchr/testify v1.8.0 => github.com/stretchr/testify v1.7.0 ## explicit; go 1.13 github.com/stretchr/testify/assert @@ -1181,7 +1178,7 @@ k8s.io/client-go/util/jsonpath k8s.io/client-go/util/keyutil k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue -# k8s.io/cloud-provider v0.26.0 => k8s.io/cloud-provider v0.26.0 +# k8s.io/cloud-provider v0.25.2 => github.com/openshift/kubernetes-cloud-provider v0.0.0-20221007081959-e07817829a38 ## explicit; go 1.19 k8s.io/cloud-provider k8s.io/cloud-provider/api