From 5e378d7447635ede6444b11bc5274390f8d4a99e Mon Sep 17 00:00:00 2001 From: Jayant Jain Date: Wed, 25 Jan 2023 16:52:50 +0000 Subject: [PATCH] update vendor to v1.26.1 due to #5372 --- cluster-autoscaler/go.mod | 84 ++--- cluster-autoscaler/go.sum | 82 ++--- .../github.com/google/cel-go/cel/program.go | 5 +- .../vendor/k8s.io/api/core/v1/generated.proto | 3 +- .../vendor/k8s.io/api/core/v1/types.go | 3 +- .../pkg/util/httpstream/spdy/roundtripper.go | 12 +- .../server/egressselector/egress_selector.go | 5 + .../applyconfigurations/internal/internal.go | 2 + .../client-go/discovery/discovery_client.go | 2 +- .../vendor/k8s.io/client-go/rest/request.go | 35 +- .../k8s.io/client-go/rest/with_retry.go | 17 +- .../k8s.io/component-base/metrics/registry.go | 14 + .../controller/daemon/daemon_controller.go | 60 +-- .../desired_state_of_world_populator.go | 10 +- .../kubernetes/pkg/proxy/winkernel/hns.go | 21 +- .../kubernetes/pkg/proxy/winkernel/proxier.go | 341 +++++++++++++++--- .../framework/preemption/preemption.go | 5 +- .../scheduler/framework/runtime/framework.go | 5 +- cluster-autoscaler/vendor/modules.txt | 100 ++--- .../konnectivity-client/pkg/client/client.go | 119 ++++-- .../konnectivity-client/pkg/client/conn.go | 9 + .../pkg/client/metrics/metrics.go | 162 +++++++++ .../pkg/common/metrics/metrics.go | 78 ++++ 23 files changed, 910 insertions(+), 264 deletions(-) create mode 100644 cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics/metrics.go create mode 100644 cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics/metrics.go diff --git a/cluster-autoscaler/go.mod b/cluster-autoscaler/go.mod index c63f28750925..b04293a53f04 100644 --- a/cluster-autoscaler/go.mod +++ b/cluster-autoscaler/go.mod @@ -35,15 +35,15 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/gcfg.v1 v1.2.0 gopkg.in/yaml.v2 v2.4.0 - k8s.io/api v0.26.0 - k8s.io/apimachinery v0.26.0 - k8s.io/apiserver v0.26.0 - k8s.io/client-go v0.26.0 - k8s.io/cloud-provider v0.26.0 - k8s.io/component-base v0.26.0 - k8s.io/component-helpers v0.26.0 + k8s.io/api v0.26.1 + k8s.io/apimachinery v0.26.1 + k8s.io/apiserver v0.26.1 + k8s.io/client-go v0.26.1 + k8s.io/cloud-provider v0.26.1 + k8s.io/component-base v0.26.1 + k8s.io/component-helpers v0.26.1 k8s.io/klog/v2 v2.80.1 - k8s.io/kubernetes v1.26.0 + k8s.io/kubernetes v1.26.1 k8s.io/legacy-cloud-providers v0.0.0 k8s.io/utils v0.0.0-20221107191617-1a15be271d1d sigs.k8s.io/cloud-provider-azure v1.24.2 @@ -95,7 +95,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/cadvisor v0.46.0 // indirect - github.com/google/cel-go v0.12.5 // indirect + github.com/google/cel-go v0.12.6 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect @@ -169,16 +169,16 @@ require ( gopkg.in/warnings.v0 v0.1.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/cri-api v0.0.0 // indirect - k8s.io/csi-translation-lib v0.26.0 // indirect + k8s.io/csi-translation-lib v0.26.1 // indirect k8s.io/dynamic-resource-allocation v0.0.0 // indirect - k8s.io/kms v0.26.0 // indirect + k8s.io/kms v0.26.1 // indirect k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect k8s.io/kube-proxy v0.0.0 // indirect k8s.io/kube-scheduler v0.0.0 // indirect k8s.io/kubectl v0.0.0 // indirect - k8s.io/kubelet v0.26.0 // indirect + k8s.io/kubelet v0.26.1 // indirect k8s.io/mount-utils v0.26.0-alpha.0 // indirect - sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 // indirect + sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect @@ -190,60 +190,60 @@ replace github.com/digitalocean/godo => github.com/digitalocean/godo v1.27.0 replace github.com/rancher/go-rancher => github.com/rancher/go-rancher v0.1.0 -replace k8s.io/api => k8s.io/api v0.26.0 +replace k8s.io/api => k8s.io/api v0.26.1 -replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.26.0 +replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.26.1 -replace k8s.io/apimachinery => k8s.io/apimachinery v0.26.1-rc.0 +replace k8s.io/apimachinery => k8s.io/apimachinery v0.26.2-rc.0 -replace k8s.io/apiserver => k8s.io/apiserver v0.26.0 +replace k8s.io/apiserver => k8s.io/apiserver v0.26.1 -replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.26.0 +replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.26.1 -replace k8s.io/client-go => k8s.io/client-go v0.26.0 +replace k8s.io/client-go => k8s.io/client-go v0.26.1 -replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.26.0 +replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.26.1 -replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.26.0 +replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.26.1 -replace k8s.io/code-generator => k8s.io/code-generator v0.26.1-rc.0 +replace k8s.io/code-generator => k8s.io/code-generator v0.26.2-rc.0 -replace k8s.io/component-base => k8s.io/component-base v0.26.0 +replace k8s.io/component-base => k8s.io/component-base v0.26.1 -replace k8s.io/component-helpers => k8s.io/component-helpers v0.26.0 +replace k8s.io/component-helpers => k8s.io/component-helpers v0.26.1 -replace k8s.io/controller-manager => k8s.io/controller-manager v0.26.0 +replace k8s.io/controller-manager => k8s.io/controller-manager v0.26.1 -replace k8s.io/cri-api => k8s.io/cri-api v0.26.1-rc.0 +replace k8s.io/cri-api => k8s.io/cri-api v0.26.2-rc.0 -replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.26.0 +replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.26.1 -replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.26.0 +replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.26.1 -replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.26.0 +replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.26.1 -replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.26.0 +replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.26.1 -replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.26.0 +replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.26.1 -replace k8s.io/kubectl => k8s.io/kubectl v0.26.0 +replace k8s.io/kubectl => k8s.io/kubectl v0.26.1 -replace k8s.io/kubelet => k8s.io/kubelet v0.26.0 +replace k8s.io/kubelet => k8s.io/kubelet v0.26.1 -replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.26.0 +replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.26.1 -replace k8s.io/metrics => k8s.io/metrics v0.26.0 +replace k8s.io/metrics => k8s.io/metrics v0.26.1 -replace k8s.io/mount-utils => k8s.io/mount-utils v0.26.1-rc.0 +replace k8s.io/mount-utils => k8s.io/mount-utils v0.26.2-rc.0 -replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.26.0 +replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.26.1 -replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.26.0 +replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.26.1 -replace k8s.io/sample-controller => k8s.io/sample-controller v0.26.0 +replace k8s.io/sample-controller => k8s.io/sample-controller v0.26.1 -replace k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.26.0 +replace k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.26.1 -replace k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.26.0 +replace k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.26.1 -replace k8s.io/kms => k8s.io/kms v0.26.1-rc.0 +replace k8s.io/kms => k8s.io/kms v0.26.2-rc.0 diff --git a/cluster-autoscaler/go.sum b/cluster-autoscaler/go.sum index e0faca473abc..f5b0953bbeb2 100644 --- a/cluster-autoscaler/go.sum +++ b/cluster-autoscaler/go.sum @@ -319,8 +319,8 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/cadvisor v0.46.0 h1:ryTIniqhN8/wR8UA1RuYSXHvsAtdpk/01XwTZtYHekY= github.com/google/cadvisor v0.46.0/go.mod h1:YnCDnR8amaS0HoMEjheOI0TMPzFKCBLc30mciLEjwGI= -github.com/google/cel-go v0.12.5 h1:DmzaiSgoaqGCjtpPQWl26/gND+yRpim56H1jCVev6d8= -github.com/google/cel-go v0.12.5/go.mod h1:Jk7ljRzLBhkmiAwBoUxB1sZSCVBAzkqPF25olK/iRDw= +github.com/google/cel-go v0.12.6 h1:kjeKudqV0OygrAqA9fX6J55S8gj+Jre2tckIm5RoG4M= +github.com/google/cel-go v0.12.6/go.mod h1:Jk7ljRzLBhkmiAwBoUxB1sZSCVBAzkqPF25olK/iRDw= github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -650,7 +650,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= @@ -1130,55 +1130,55 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.26.0 h1:IpPlZnxBpV1xl7TGk/X6lFtpgjgntCg8PJ+qrPHAC7I= -k8s.io/api v0.26.0/go.mod h1:k6HDTaIFC8yn1i6pSClSqIwLABIcLV9l5Q4EcngKnQg= -k8s.io/apimachinery v0.26.1-rc.0 h1:3eoGy0bC3cheClZ1RfKsZRsHdTOjAM//aqhxVlSLxKE= -k8s.io/apimachinery v0.26.1-rc.0/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= -k8s.io/apiserver v0.26.0 h1:q+LqIK5EZwdznGZb8bq0+a+vCqdeEEe4Ux3zsOjbc4o= -k8s.io/apiserver v0.26.0/go.mod h1:aWhlLD+mU+xRo+zhkvP/gFNbShI4wBDHS33o0+JGI84= -k8s.io/client-go v0.26.0 h1:lT1D3OfO+wIi9UFolCrifbjUUgu7CpLca0AD8ghRLI8= -k8s.io/client-go v0.26.0/go.mod h1:I2Sh57A79EQsDmn7F7ASpmru1cceh3ocVT9KlX2jEZg= -k8s.io/cloud-provider v0.26.0 h1:kO2BIgCou71QNRHGkpFi/8lnas9UIr+fJz1l/nuiOMo= -k8s.io/cloud-provider v0.26.0/go.mod h1:JwfUAH67C8f7t6tOC4v4ty+DuvIYVjNF6bGVYSDCqqs= -k8s.io/component-base v0.26.0 h1:0IkChOCohtDHttmKuz+EP3j3+qKmV55rM9gIFTXA7Vs= -k8s.io/component-base v0.26.0/go.mod h1:lqHwlfV1/haa14F/Z5Zizk5QmzaVf23nQzCwVOQpfC8= -k8s.io/component-helpers v0.26.0 h1:KNgwqs3EUdK0HLfW4GhnbD+q/Zl9U021VfIU7qoVYFk= -k8s.io/component-helpers v0.26.0/go.mod h1:jHN01qS/Jdj95WCbTe9S2VZ9yxpxXNY488WjF+yW4fo= -k8s.io/cri-api v0.26.1-rc.0 h1:mq430QOK1tA1/tN24/AZq50b3SuS5ngnYUph5WCzDS0= -k8s.io/cri-api v0.26.1-rc.0/go.mod h1:I5TGOn/ziMzqIcUvsYZzVE8xDAB1JBkvcwvR0yDreuw= -k8s.io/csi-translation-lib v0.26.0 h1:bCvlfw53Kmyn7cvXeYGe9aqqzR1b0xrGs2XEWHFW+es= -k8s.io/csi-translation-lib v0.26.0/go.mod h1:zRKLRqER6rA8NCKQBhVIdkyDHKgNlu2BK1RKTHjcw+8= -k8s.io/dynamic-resource-allocation v0.26.0 h1:zljrsqa0PxrIwNklTnGBA/az6+33SUQwsNNNKdVTzwg= -k8s.io/dynamic-resource-allocation v0.26.0/go.mod h1:K+hO5A+QsSknRjlhfbUtvZVYUblOldvYyT51eGrZyWI= +k8s.io/api v0.26.1 h1:f+SWYiPd/GsiWwVRz+NbFyCgvv75Pk9NK6dlkZgpCRQ= +k8s.io/api v0.26.1/go.mod h1:xd/GBNgR0f707+ATNyPmQ1oyKSgndzXij81FzWGsejg= +k8s.io/apimachinery v0.26.2-rc.0 h1:f9BARTuEy0MguW4KGK6VwEBT9BCe03lYde0wnWxBilk= +k8s.io/apimachinery v0.26.2-rc.0/go.mod h1:tnPmbONNJ7ByJNz9+n9kMjNP8ON+1qoAIIC70lztu74= +k8s.io/apiserver v0.26.1 h1:6vmnAqCDO194SVCPU3MU8NcDgSqsUA62tBUSWrFXhsc= +k8s.io/apiserver v0.26.1/go.mod h1:wr75z634Cv+sifswE9HlAo5FQ7UoUauIICRlOE+5dCg= +k8s.io/client-go v0.26.1 h1:87CXzYJnAMGaa/IDDfRdhTzxk/wzGZ+/HUQpqgVSZXU= +k8s.io/client-go v0.26.1/go.mod h1:IWNSglg+rQ3OcvDkhY6+QLeasV4OYHDjdqeWkDQZwGE= +k8s.io/cloud-provider v0.26.1 h1:qEZmsGWGptOtVSpeMdTsapHX2BEqIk7rc5MA4caBqE0= +k8s.io/cloud-provider v0.26.1/go.mod h1:6PheIxRySYuRBBxtTUADya8S2rbr18xKi+fhGbLkduc= +k8s.io/component-base v0.26.1 h1:4ahudpeQXHZL5kko+iDHqLj/FSGAEUnSVO0EBbgDd+4= +k8s.io/component-base v0.26.1/go.mod h1:VHrLR0b58oC035w6YQiBSbtsf0ThuSwXP+p5dD/kAWU= +k8s.io/component-helpers v0.26.1 h1:Y5h1OYUJTGyHZlSAsc7mcfNsWF08S/MlrQyF/vn93mU= +k8s.io/component-helpers v0.26.1/go.mod h1:jxNTnHb1axLe93MyVuvKj9T/+f4nxBVrj/xf01/UNFk= +k8s.io/cri-api v0.26.2-rc.0 h1:9UF9xL3xqVaHNjr71lSn6lGiNgiY8iDt852l9kkOoqw= +k8s.io/cri-api v0.26.2-rc.0/go.mod h1:I5TGOn/ziMzqIcUvsYZzVE8xDAB1JBkvcwvR0yDreuw= +k8s.io/csi-translation-lib v0.26.1 h1:GQT88qX4e903HlFne1ovGFilvsd7kJUVi6SWOkOg2SQ= +k8s.io/csi-translation-lib v0.26.1/go.mod h1:tbcXKaVAS3G9iIAi+8Ujp+LPLetZ+vZ2AsZj+c1yXd8= +k8s.io/dynamic-resource-allocation v0.26.1 h1:rQKykCT4c/TWDhIMGk91Azn3lvQznIRySaoTmFeeC4U= +k8s.io/dynamic-resource-allocation v0.26.1/go.mod h1:EDVbmVRc5fdAG+ezTdVyolgBWI2wgsUByhv8PnIxPKc= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kms v0.26.1-rc.0 h1:JB4/QyqY/dBX991K1FxGDIzhmG7g4Pw3FEBmzweDPG8= -k8s.io/kms v0.26.1-rc.0/go.mod h1:ReC1IEGuxgfN+PDCIpR6w8+XMmDE7uJhxcCwMZFdIYc= +k8s.io/kms v0.26.2-rc.0 h1:KYSckRzaCJwR2aCNia6ptLTOSfGhPkyO3li6ZM8B2Ho= +k8s.io/kms v0.26.2-rc.0/go.mod h1:ReC1IEGuxgfN+PDCIpR6w8+XMmDE7uJhxcCwMZFdIYc= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/kube-proxy v0.26.0 h1:VBC83bWr5L4GKSxRFz0YBbwGgQITc0+p8avGzw0LNKo= -k8s.io/kube-proxy v0.26.0/go.mod h1:4kz3dPdMUnspJnFgoJG9lWn1UCiho85Gyn1WLInK0XA= -k8s.io/kube-scheduler v0.26.0 h1:PjSF4cF9X7cAMj5MZ9ZSq2RJ2VkcKKCKj6fy/EbxtA0= -k8s.io/kube-scheduler v0.26.0/go.mod h1:FmptJbq36ATKYxeR+UqAvUtFaLeoFWgoDk1cdCpVPYQ= -k8s.io/kubectl v0.26.0 h1:xmrzoKR9CyNdzxBmXV7jW9Ln8WMrwRK6hGbbf69o4T0= -k8s.io/kubectl v0.26.0/go.mod h1:eInP0b+U9XUJWSYeU9XZnTA+cVYuWyl3iYPGtru0qhQ= -k8s.io/kubelet v0.26.0 h1:08bDb5IoUH/1K1t2NUwnGIIWxjm9LSqn6k3FWw1tJGI= -k8s.io/kubelet v0.26.0/go.mod h1:DluF+d8jS2nE/Hs7CC3QM+OZlIEb22NTOihQ3EDwCQ4= -k8s.io/kubernetes v1.26.0 h1:fL8VMr4xlfTazPORLhz5fsvO5I3bsFpmynVxZTH1ItQ= -k8s.io/kubernetes v1.26.0/go.mod h1:z0aCJwn6DxzB/dDiWLbQaJO5jWOR2qoaCMnmSAx45XM= -k8s.io/legacy-cloud-providers v0.26.0 h1:PudIbGlvQLwSkden/vSfRE4YtsyKCRjQs0OAiP1w8M8= -k8s.io/legacy-cloud-providers v0.26.0/go.mod h1:dOOgYhHiMNWNla/XyM4Ppgjcrn3HulGa93/0vUO5/z4= -k8s.io/mount-utils v0.26.1-rc.0 h1:v7GKm3S5IdmcZvd7gM0QtANdVJRIPpycvgiT/o9y85I= -k8s.io/mount-utils v0.26.1-rc.0/go.mod h1:au99w4FWU5ZWelLb3Yx6kJc8RZ387IyWVM9tN65Yhxo= +k8s.io/kube-proxy v0.26.1 h1:uYt22aiLhIYKxMfmP0mxOMZn0co9UXwlA2uV0uJTDt4= +k8s.io/kube-proxy v0.26.1/go.mod h1:z7TSAvTeD8xmEzNGgwoiXZ0BCE13IPKXp/tSoBBNzaM= +k8s.io/kube-scheduler v0.26.1 h1:OsNOWNPYUeMIkm+LpX5V3Z7EhhZ8wYnOPIoL4dz2g4U= +k8s.io/kube-scheduler v0.26.1/go.mod h1:9SZcwHMANGrfXCJUOw/rPi8Iwd0X4HXx4czCHcR4HiU= +k8s.io/kubectl v0.26.1 h1:K8A0Jjlwg8GqrxOXxAbjY5xtmXYeYjLU96cHp2WMQ7s= +k8s.io/kubectl v0.26.1/go.mod h1:miYFVzldVbdIiXMrHZYmL/EDWwJKM+F0sSsdxsATFPo= +k8s.io/kubelet v0.26.1 h1:wQyCQYmLW6GN3v7gVTxnc3jAE4zMYDlzdF3FZV4rKas= +k8s.io/kubelet v0.26.1/go.mod h1:gFVZ1Ab4XdjtnYdVRATwGwku7FhTxo6LVEZwYoQaDT8= +k8s.io/kubernetes v1.26.1 h1:N+qxlptxpSU/VSLvqBGWyyw/kNhJRpEn1b5YP57+5rk= +k8s.io/kubernetes v1.26.1/go.mod h1:dEfAfGVZBOr2uZLeVazLPj/8E+t8jYFbQqCiBudkB8o= +k8s.io/legacy-cloud-providers v0.26.1 h1:1uWqF3CkFVzwOQyFObJRjTMkIe8bMBthxrJLoYx00l8= +k8s.io/legacy-cloud-providers v0.26.1/go.mod h1:Jw8kvGYWOKSuuefb6ascH5z/YvTZ/ouwzVqUKmm5upk= +k8s.io/mount-utils v0.26.2-rc.0 h1:4OSQU30WVXG4oZfWnXDlMMTBiyEaWBXwNeaYOHWb7Do= +k8s.io/mount-utils v0.26.2-rc.0/go.mod h1:au99w4FWU5ZWelLb3Yx6kJc8RZ387IyWVM9tN65Yhxo= k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20221107191617-1a15be271d1d h1:0Smp/HP1OH4Rvhe+4B8nWGERtlqAGSftbSbbmm45oFs= k8s.io/utils v0.0.0-20221107191617-1a15be271d1d/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= -sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 h1:LYqFq+6Cj2D0gFfrJvL7iElD4ET6ir3VDdhDdTK7rgc= -sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33/go.mod h1:soWkSNf2tZC7aMibXEqVhCd73GOY5fJikn8qbdzemB0= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 h1:+xBL5uTc+BkPBwmMi3vYfUJjq+N3K+H6PXeETwf5cPI= +sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35/go.mod h1:WxjusMwXlKzfAs4p9km6XJRndVt2FROgMVCE4cdohFo= sigs.k8s.io/cloud-provider-azure v1.24.2 h1:t0c3Q7GAGQ0oqyl/KiHLtkS4obEYJpAMRYUuhEtgs/k= sigs.k8s.io/cloud-provider-azure v1.24.2/go.mod h1:uKqonMQbC2zqwq7NIWOfQLgrsMzD02Wj5UFFl1te1GY= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= diff --git a/cluster-autoscaler/vendor/github.com/google/cel-go/cel/program.go b/cluster-autoscaler/vendor/github.com/google/cel-go/cel/program.go index 672c83ef7167..6219a4da588e 100644 --- a/cluster-autoscaler/vendor/github.com/google/cel-go/cel/program.go +++ b/cluster-autoscaler/vendor/github.com/google/cel-go/cel/program.go @@ -213,7 +213,10 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) { factory := func(state interpreter.EvalState, costTracker *interpreter.CostTracker) (Program, error) { costTracker.Estimator = p.callCostEstimator costTracker.Limit = p.costLimit - decs := decorators + // Limit capacity to guarantee a reallocation when calling 'append(decs, ...)' below. This + // prevents the underlying memory from being shared between factory function calls causing + // undesired mutations. + decs := decorators[:len(decorators):len(decorators)] var observers []interpreter.EvalObserver if p.evalOpts&(OptExhaustiveEval|OptTrackState) != 0 { diff --git a/cluster-autoscaler/vendor/k8s.io/api/core/v1/generated.proto b/cluster-autoscaler/vendor/k8s.io/api/core/v1/generated.proto index 854bcdeba029..9264bfd98b8e 100644 --- a/cluster-autoscaler/vendor/k8s.io/api/core/v1/generated.proto +++ b/cluster-autoscaler/vendor/k8s.io/api/core/v1/generated.proto @@ -4514,7 +4514,8 @@ message ResourceRequirements { // // This field is immutable. // - // +listType=set + // +listType=map + // +listMapKey=name // +featureGate=DynamicResourceAllocation // +optional repeated ResourceClaim claims = 3; diff --git a/cluster-autoscaler/vendor/k8s.io/api/core/v1/types.go b/cluster-autoscaler/vendor/k8s.io/api/core/v1/types.go index 87230fd91812..4be1df0c1dbc 100644 --- a/cluster-autoscaler/vendor/k8s.io/api/core/v1/types.go +++ b/cluster-autoscaler/vendor/k8s.io/api/core/v1/types.go @@ -2322,7 +2322,8 @@ type ResourceRequirements struct { // // This field is immutable. // - // +listType=set + // +listType=map + // +listMapKey=name // +featureGate=DynamicResourceAllocation // +optional Claims []ResourceClaim `json:"claims,omitempty" protobuf:"bytes,3,opt,name=claims"` diff --git a/cluster-autoscaler/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/cluster-autoscaler/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go index ea0481799b7e..27c3d2d56451 100644 --- a/cluster-autoscaler/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go +++ b/cluster-autoscaler/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go @@ -184,12 +184,15 @@ func (s *SpdyRoundTripper) dialWithHttpProxy(req *http.Request, proxyURL *url.UR //nolint:staticcheck // SA1019 ignore deprecated httputil.NewProxyClientConn proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil) - _, err = proxyClientConn.Do(&proxyReq) + response, err := proxyClientConn.Do(&proxyReq) //nolint:staticcheck // SA1019 ignore deprecated httputil.ErrPersistEOF: it might be // returned from the invocation of proxyClientConn.Do if err != nil && err != httputil.ErrPersistEOF { return nil, err } + if response != nil && response.StatusCode >= 300 || response.StatusCode < 200 { + return nil, fmt.Errorf("CONNECT request to %s returned response: %s", proxyURL.Redacted(), response.Status) + } rwc, _ := proxyClientConn.Hijack() @@ -294,9 +297,10 @@ func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string { if proxyURL == nil || proxyURL.User == nil { return "" } - credentials := proxyURL.User.String() - encodedAuth := base64.StdEncoding.EncodeToString([]byte(credentials)) - return fmt.Sprintf("Basic %s", encodedAuth) + username := proxyURL.User.Username() + password, _ := proxyURL.User.Password() + auth := username + ":" + password + return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) } // RoundTrip executes the Request and upgrades it. After a successful upgrade, diff --git a/cluster-autoscaler/vendor/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go b/cluster-autoscaler/vendor/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go index 661c5ad9a737..6d27633971e8 100644 --- a/cluster-autoscaler/vendor/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go +++ b/cluster-autoscaler/vendor/k8s.io/apiserver/pkg/server/egressselector/egress_selector.go @@ -36,6 +36,7 @@ import ( utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apiserver/pkg/apis/apiserver" egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics" + compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/tracing" "k8s.io/klog/v2" client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client" @@ -43,6 +44,10 @@ import ( var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext +func init() { + client.Metrics.RegisterMetrics(compbasemetrics.NewKubeRegistry().Registerer()) +} + // EgressSelector is the map of network context type to context dialer, for network egress. type EgressSelector struct { egressToDialer map[EgressType]utilnet.DialFunc diff --git a/cluster-autoscaler/vendor/k8s.io/client-go/applyconfigurations/internal/internal.go b/cluster-autoscaler/vendor/k8s.io/client-go/applyconfigurations/internal/internal.go index afad3b12e182..4f3636b27da5 100644 --- a/cluster-autoscaler/vendor/k8s.io/client-go/applyconfigurations/internal/internal.go +++ b/cluster-autoscaler/vendor/k8s.io/client-go/applyconfigurations/internal/internal.go @@ -6553,6 +6553,8 @@ var schemaYAML = typed.YAMLObject(`types: elementType: namedType: io.k8s.api.core.v1.ResourceClaim elementRelationship: associative + keys: + - name - name: limits type: map: diff --git a/cluster-autoscaler/vendor/k8s.io/client-go/discovery/discovery_client.go b/cluster-autoscaler/vendor/k8s.io/client-go/discovery/discovery_client.go index 9025e888ec1f..43906190fb7b 100644 --- a/cluster-autoscaler/vendor/k8s.io/client-go/discovery/discovery_client.go +++ b/cluster-autoscaler/vendor/k8s.io/client-go/discovery/discovery_client.go @@ -196,7 +196,7 @@ func (d *DiscoveryClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[s } // Discovery groups and (possibly) resources downloaded from /apis. apiGroups, apiResources, aerr := d.downloadAPIs() - if err != nil { + if aerr != nil { return nil, nil, aerr } // Merge apis groups into the legacy groups. diff --git a/cluster-autoscaler/vendor/k8s.io/client-go/rest/request.go b/cluster-autoscaler/vendor/k8s.io/client-go/rest/request.go index 560f73f0020a..96e725692d37 100644 --- a/cluster-autoscaler/vendor/k8s.io/client-go/rest/request.go +++ b/cluster-autoscaler/vendor/k8s.io/client-go/rest/request.go @@ -34,6 +34,7 @@ import ( "time" "golang.org/x/net/http2" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -116,8 +117,11 @@ type Request struct { subresource string // output - err error - body io.Reader + err error + + // only one of body / bodyBytes may be set. requests using body are not retriable. + body io.Reader + bodyBytes []byte retryFn requestRetryFunc } @@ -443,12 +447,15 @@ func (r *Request) Body(obj interface{}) *Request { return r } glogBody("Request Body", data) - r.body = bytes.NewReader(data) + r.body = nil + r.bodyBytes = data case []byte: glogBody("Request Body", t) - r.body = bytes.NewReader(t) + r.body = nil + r.bodyBytes = t case io.Reader: r.body = t + r.bodyBytes = nil case runtime.Object: // callers may pass typed interface pointers, therefore we must check nil with reflection if reflect.ValueOf(t).IsNil() { @@ -465,7 +472,8 @@ func (r *Request) Body(obj interface{}) *Request { return r } glogBody("Request Body", data) - r.body = bytes.NewReader(data) + r.body = nil + r.bodyBytes = data r.SetHeader("Content-Type", r.c.content.ContentType) default: r.err = fmt.Errorf("unknown type used for body: %+v", obj) @@ -825,9 +833,6 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { if err != nil { return nil, err } - if r.body != nil { - req.Body = io.NopCloser(r.body) - } resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) retry.After(ctx, r, resp, err) @@ -889,8 +894,20 @@ func (r *Request) requestPreflightCheck() error { } func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) { + var body io.Reader + switch { + case r.body != nil && r.bodyBytes != nil: + return nil, fmt.Errorf("cannot set both body and bodyBytes") + case r.body != nil: + body = r.body + case r.bodyBytes != nil: + // Create a new reader specifically for this request. + // Giving each request a dedicated reader allows retries to avoid races resetting the request body. + body = bytes.NewReader(r.bodyBytes) + } + url := r.URL().String() - req, err := http.NewRequest(r.verb, url, r.body) + req, err := http.NewRequest(r.verb, url, body) if err != nil { return nil, err } diff --git a/cluster-autoscaler/vendor/k8s.io/client-go/rest/with_retry.go b/cluster-autoscaler/vendor/k8s.io/client-go/rest/with_retry.go index b04e3e9eff7b..207060a5cc2a 100644 --- a/cluster-autoscaler/vendor/k8s.io/client-go/rest/with_retry.go +++ b/cluster-autoscaler/vendor/k8s.io/client-go/rest/with_retry.go @@ -153,6 +153,11 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq * return false } + if restReq.body != nil { + // we have an opaque reader, we can't safely reset it + return false + } + r.attempts++ r.retryAfter = &RetryAfter{Attempt: r.attempts} if r.attempts > r.maxRetries { @@ -209,18 +214,6 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error { return nil } - // At this point we've made atleast one attempt, post which the response - // body should have been fully read and closed in order for it to be safe - // to reset the request body before we reconnect, in order for us to reuse - // the same TCP connection. - if seeker, ok := request.body.(io.Seeker); ok && request.body != nil { - if _, err := seeker.Seek(0, io.SeekStart); err != nil { - err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err) - r.trackPreviousError(err) - return err - } - } - // if we are here, we have made attempt(s) at least once before. if request.backoff != nil { delay := request.backoff.CalculateBackoff(url) diff --git a/cluster-autoscaler/vendor/k8s.io/component-base/metrics/registry.go b/cluster-autoscaler/vendor/k8s.io/component-base/metrics/registry.go index af7d1b8bffd8..9a7138c11f8e 100644 --- a/cluster-autoscaler/vendor/k8s.io/component-base/metrics/registry.go +++ b/cluster-autoscaler/vendor/k8s.io/component-base/metrics/registry.go @@ -157,6 +157,10 @@ type KubeRegistry interface { Reset() // RegisterMetaMetrics registers metrics about the number of registered metrics. RegisterMetaMetrics() + // Registerer exposes the underlying prometheus registerer + Registerer() prometheus.Registerer + // Gatherer exposes the underlying prometheus gatherer + Gatherer() prometheus.Gatherer } // kubeRegistry is a wrapper around a prometheus registry-type object. Upon initialization @@ -188,6 +192,16 @@ func (kr *kubeRegistry) Register(c Registerable) error { return nil } +// Registerer exposes the underlying prometheus.Registerer +func (kr *kubeRegistry) Registerer() prometheus.Registerer { + return kr.PromRegistry +} + +// Gatherer exposes the underlying prometheus.Gatherer +func (kr *kubeRegistry) Gatherer() prometheus.Gatherer { + return kr.PromRegistry +} + // MustRegister works like Register but registers any number of // Collectors and panics upon the first registration that causes an // error. diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go index 1263214d329c..573654d382d1 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go @@ -887,6 +887,32 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( return nodesNeedingDaemonPods, podsToDelete } +func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ds *apps.DaemonSet, nodeList []*v1.Node, hash, key string, old []*apps.ControllerRevision) error { + err := dsc.manage(ctx, ds, nodeList, hash) + if err != nil { + return err + } + + // Process rolling updates if we're ready. + if dsc.expectations.SatisfiedExpectations(key) { + switch ds.Spec.UpdateStrategy.Type { + case apps.OnDeleteDaemonSetStrategyType: + case apps.RollingUpdateDaemonSetStrategyType: + err = dsc.rollingUpdate(ctx, ds, nodeList, hash) + } + if err != nil { + return err + } + } + + err = dsc.cleanupHistory(ctx, ds, old) + if err != nil { + return fmt.Errorf("failed to clean up revisions of DaemonSet: %w", err) + } + + return nil +} + // manage manages the scheduling and running of Pods of ds on nodes. // After figuring out which nodes should run a Pod of ds but not yet running one and // which nodes should not run a Pod of ds but currently running one, it calls function @@ -1136,7 +1162,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ds * err = storeDaemonSetStatus(ctx, dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen) if err != nil { - return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err) + return fmt.Errorf("error storing status for daemon set %#v: %w", ds, err) } // Resync the DaemonSet after MinReadySeconds as a last line of defense to guard against clock-skew. @@ -1210,29 +1236,21 @@ func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false) } - err = dsc.manage(ctx, ds, nodeList, hash) - if err != nil { + err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old) + statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true) + switch { + case err != nil && statusErr != nil: + // If there was an error, and we failed to update status, + // log it and return the original error. + klog.ErrorS(statusErr, "Failed to update status", "daemonSet", klog.KObj(ds)) return err + case err != nil: + return err + case statusErr != nil: + return statusErr } - // Process rolling updates if we're ready. - if dsc.expectations.SatisfiedExpectations(dsKey) { - switch ds.Spec.UpdateStrategy.Type { - case apps.OnDeleteDaemonSetStrategyType: - case apps.RollingUpdateDaemonSetStrategyType: - err = dsc.rollingUpdate(ctx, ds, nodeList, hash) - } - if err != nil { - return err - } - } - - err = dsc.cleanupHistory(ctx, ds, old) - if err != nil { - return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) - } - - return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true) + return nil } // NodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go index a18df61916d9..a1c2f034401d 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go @@ -205,10 +205,18 @@ func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() { } for _, pod := range dswp.podManager.GetPods() { - if dswp.podStateProvider.ShouldPodContainersBeTerminating(pod.UID) { + // Keep consistency of adding pod during reconstruction + if dswp.hasAddedPods && dswp.podStateProvider.ShouldPodContainersBeTerminating(pod.UID) { // Do not (re)add volumes for pods that can't also be starting containers continue } + + if !dswp.hasAddedPods && dswp.podStateProvider.ShouldPodRuntimeBeRemoved(pod.UID) { + // When kubelet restarts, we need to add pods to dsw if there is a possibility + // that the container may still be running + continue + } + dswp.processPodVolumes(pod, mountedVolumesForPod) } } diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/hns.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/hns.go index 9d3889d60ccb..db0f835c55af 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/hns.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/hns.go @@ -48,6 +48,8 @@ type hns struct{} var ( // LoadBalancerFlagsIPv6 enables IPV6. LoadBalancerFlagsIPv6 hcn.LoadBalancerFlags = 2 + // LoadBalancerPortMappingFlagsVipExternalIP enables VipExternalIP. + LoadBalancerPortMappingFlagsVipExternalIP hcn.LoadBalancerPortMappingFlags = 16 ) func (hns hns) getNetworkByName(name string) (*hnsNetworkInfo, error) { @@ -242,6 +244,20 @@ func (hns hns) deleteEndpoint(hnsID string) error { return err } +// findLoadBalancerID will construct a id from the provided loadbalancer fields +func findLoadBalancerID(endpoints []endpointsInfo, vip string, protocol, internalPort, externalPort uint16) (loadBalancerIdentifier, error) { + // Compute hash from backends (endpoint IDs) + hash, err := hashEndpoints(endpoints) + if err != nil { + klog.V(2).ErrorS(err, "Error hashing endpoints", "endpoints", endpoints) + return loadBalancerIdentifier{}, err + } + if len(vip) > 0 { + return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, vip: vip, endpointsHash: hash}, nil + } + return loadBalancerIdentifier{protocol: protocol, internalPort: internalPort, externalPort: externalPort, endpointsHash: hash}, nil +} + func (hns hns) getAllLoadBalancers() (map[loadBalancerIdentifier]*loadBalancerInfo, error) { lbs, err := hcn.ListLoadBalancers() var id loadBalancerIdentifier @@ -305,6 +321,9 @@ func (hns hns) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancerFlag if flags.localRoutedVIP { lbPortMappingFlags |= hcn.LoadBalancerPortMappingFlagsLocalRoutedVIP } + if flags.isVipExternalIP { + lbPortMappingFlags |= LoadBalancerPortMappingFlagsVipExternalIP + } lbFlags := hcn.LoadBalancerFlagsNone if flags.isDSR { @@ -391,7 +410,7 @@ func hashEndpoints[T string | endpointsInfo](endpoints []T) (hash [20]byte, err for _, ep := range endpoints { switch x := any(ep).(type) { case endpointsInfo: - id = x.hnsID + id = strings.ToUpper(x.hnsID) case string: id = x } diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go index 887c9adece30..5c6f41ef7119 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/proxy/winkernel/proxier.go @@ -107,6 +107,7 @@ type loadBalancerIdentifier struct { type loadBalancerFlags struct { isILB bool isDSR bool + isVipExternalIP bool localRoutedVIP bool useMUX bool preserveDIP bool @@ -127,6 +128,8 @@ type serviceInfo struct { hns HostNetworkService preserveDIP bool localTrafficDSR bool + winProxyOptimization bool + internalTrafficLocal bool } type hnsNetworkInfo struct { @@ -143,7 +146,12 @@ type remoteSubnetInfo struct { drMacAddress string } -const NETWORK_TYPE_OVERLAY = "overlay" +const ( + NETWORK_TYPE_OVERLAY = "overlay" + // MAX_COUNT_STALE_LOADBALANCERS is the maximum number of stale loadbalancers which cleanedup in single syncproxyrules. + // If there are more stale loadbalancers to clean, it will go to next iteration of syncproxyrules. + MAX_COUNT_STALE_LOADBALANCERS = 20 +) func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { var h HostNetworkService @@ -156,6 +164,44 @@ func newHostNetworkService() (HostNetworkService, hcn.SupportedFeatures) { return h, supportedFeatures } +// logFormattedEndpoints will log all endpoints and its states which are taking part in endpointmap change. +// This mostly for debugging purpose and verbosity is set to 5. +func logFormattedEndpoints(logMsg string, logLevel klog.Level, svcPortName proxy.ServicePortName, eps []proxy.Endpoint) { + if klog.V(logLevel).Enabled() { + var epInfo string + for _, v := range eps { + epInfo = epInfo + fmt.Sprintf("\n %s={Ready:%v,Serving:%v,Terminating:%v,IsRemote:%v}", v.String(), v.IsReady(), v.IsServing(), v.IsTerminating(), !v.GetIsLocal()) + } + klog.V(logLevel).InfoS(logMsg, "svcPortName", svcPortName, "endpoints", epInfo) + } +} + +// This will cleanup stale load balancers which are pending delete +// in last iteration. This function will act like a self healing of stale +// loadbalancer entries. +func (proxier *Proxier) cleanupStaleLoadbalancers() { + i := 0 + countStaleLB := len(proxier.mapStaleLoadbalancers) + if countStaleLB == 0 { + return + } + klog.V(3).InfoS("Cleanup of stale loadbalancers triggered", "LB Count", countStaleLB) + for lbID := range proxier.mapStaleLoadbalancers { + i++ + if err := proxier.hns.deleteLoadBalancer(lbID); err == nil { + delete(proxier.mapStaleLoadbalancers, lbID) + } + if i == MAX_COUNT_STALE_LOADBALANCERS { + // The remaining stale loadbalancers will be cleaned up in next iteration + break + } + } + countStaleLB = len(proxier.mapStaleLoadbalancers) + if countStaleLB > 0 { + klog.V(3).InfoS("Stale loadbalancers still remaining", "LB Count", countStaleLB, "stale_lb_ids", proxier.mapStaleLoadbalancers) + } +} + func getNetworkName(hnsNetworkName string) (string, error) { if len(hnsNetworkName) == 0 { klog.V(3).InfoS("Flag --network-name not set, checking environment variable") @@ -312,16 +358,24 @@ func conjureMac(macPrefix string, ip net.IP) string { } func (proxier *Proxier) endpointsMapChange(oldEndpointsMap, newEndpointsMap proxy.EndpointsMap) { - for svcPortName := range oldEndpointsMap { - proxier.onEndpointsMapChange(&svcPortName) + // This will optimize remote endpoint and loadbalancer deletion based on the annotation + var svcPortMap = make(map[proxy.ServicePortName]bool) + var logLevel klog.Level = 5 + for svcPortName, eps := range oldEndpointsMap { + logFormattedEndpoints("endpointsMapChange oldEndpointsMap", logLevel, svcPortName, eps) + svcPortMap[svcPortName] = true + proxier.onEndpointsMapChange(&svcPortName, false) } - for svcPortName := range newEndpointsMap { - proxier.onEndpointsMapChange(&svcPortName) + for svcPortName, eps := range newEndpointsMap { + logFormattedEndpoints("endpointsMapChange newEndpointsMap", logLevel, svcPortName, eps) + // redundantCleanup true means cleanup is called second time on the same svcPort + redundantCleanup := svcPortMap[svcPortName] + proxier.onEndpointsMapChange(&svcPortName, redundantCleanup) } } -func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) { +func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName, redundantCleanup bool) { svc, exists := proxier.svcPortMap[*svcPortName] @@ -333,8 +387,15 @@ func (proxier *Proxier) onEndpointsMapChange(svcPortName *proxy.ServicePortName) return } + if svcInfo.winProxyOptimization && redundantCleanup { + // This is a second cleanup call. + // Second cleanup on the same svcPort will be ignored if the + // winProxyOptimization is Enabled + return + } + klog.V(3).InfoS("Endpoints are modified. Service is stale", "servicePortName", svcPortName) - svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, true) } else { // If no service exists, just cleanup the remote endpoints klog.V(3).InfoS("Endpoints are orphaned, cleaning up") @@ -381,7 +442,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) { } klog.V(3).InfoS("Updating existing service port", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP(), "port", svcInfo.Port(), "protocol", svcInfo.Protocol()) - svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[*svcPortName], proxier.mapStaleLoadbalancers, false) } } @@ -426,6 +487,13 @@ func newSourceVIP(hns HostNetworkService, network string, ip string, mac string, return ep, err } +func (ep *endpointsInfo) DecrementRefCount() { + klog.V(3).InfoS("Decrementing Endpoint RefCount", "endpointsInfo", ep) + if !ep.GetIsLocal() && ep.refCount != nil && *ep.refCount > 0 { + *ep.refCount-- + } +} + func (ep *endpointsInfo) Cleanup() { klog.V(3).InfoS("Endpoint cleanup", "endpointsInfo", ep) if !ep.GetIsLocal() && ep.refCount != nil { @@ -461,7 +529,13 @@ func (refCountMap endPointsReferenceCountMap) getRefCount(hnsID string) *uint16 func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service, bsvcPortInfo *proxy.BaseServicePortInfo) proxy.ServicePort { info := &serviceInfo{BaseServicePortInfo: bsvcPortInfo} preserveDIP := service.Annotations["preserve-destination"] == "true" + // Annotation introduced to enable optimized loadbalancing + winProxyOptimization := !(strings.ToUpper(service.Annotations["winProxyOptimization"]) == "DISABLED") localTrafficDSR := service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal + var internalTrafficLocal bool + if service.Spec.InternalTrafficPolicy != nil { + internalTrafficLocal = *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal + } err := hcn.DSRSupported() if err != nil { preserveDIP = false @@ -478,6 +552,9 @@ func (proxier *Proxier) newServiceInfo(port *v1.ServicePort, service *v1.Service info.targetPort = targetPort info.hns = proxier.hns info.localTrafficDSR = localTrafficDSR + info.winProxyOptimization = winProxyOptimization + info.internalTrafficLocal = internalTrafficLocal + klog.V(3).InfoS("Flags enabled for service", "service", service.Name, "localTrafficDSR", localTrafficDSR, "winProxyOptimization", winProxyOptimization, "internalTrafficLocal", internalTrafficLocal, "preserveDIP", preserveDIP) for _, eip := range service.Spec.ExternalIPs { info.externalIPs = append(info.externalIPs, &externalIPInfo{ip: eip}) @@ -561,6 +638,7 @@ type Proxier struct { forwardHealthCheckVip bool rootHnsEndpointName string + mapStaleLoadbalancers map[string]bool // This maintains entries of stale load balancers which are pending delete in last iteration } type localPort struct { @@ -719,6 +797,7 @@ func NewProxier( healthzPort: healthzPort, rootHnsEndpointName: config.RootHnsEndpointName, forwardHealthCheckVip: config.ForwardHealthCheckVip, + mapStaleLoadbalancers: make(map[string]bool), } ipFamily := v1.IPv4Protocol @@ -780,15 +859,25 @@ func CleanupLeftovers() (encounteredError bool) { return encounteredError } -func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { +func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint, mapStaleLoadbalancers map[string]bool, isEndpointChange bool) { klog.V(3).InfoS("Service cleanup", "serviceInfo", svcInfo) - // Skip the svcInfo.policyApplied check to remove all the policies - svcInfo.deleteLoadBalancerPolicy() + // if it's an endpoint change and winProxyOptimization annotation enable, skip lb deletion and remoteEndpoint deletion + winProxyOptimization := isEndpointChange && svcInfo.winProxyOptimization + if winProxyOptimization { + klog.V(3).InfoS("Skipped loadbalancer deletion.", "hnsID", svcInfo.hnsID, "nodePorthnsID", svcInfo.nodePorthnsID, "winProxyOptimization", svcInfo.winProxyOptimization, "isEndpointChange", isEndpointChange) + } else { + // Skip the svcInfo.policyApplied check to remove all the policies + svcInfo.deleteLoadBalancerPolicy(mapStaleLoadbalancers) + } // Cleanup Endpoints references for _, ep := range endpoints { epInfo, ok := ep.(*endpointsInfo) if ok { - epInfo.Cleanup() + if winProxyOptimization { + epInfo.DecrementRefCount() + } else { + epInfo.Cleanup() + } } } if svcInfo.remoteEndpoint != nil { @@ -798,10 +887,11 @@ func (svcInfo *serviceInfo) cleanupAllPolicies(endpoints []proxy.Endpoint) { svcInfo.policyApplied = false } -func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { +func (svcInfo *serviceInfo) deleteLoadBalancerPolicy(mapStaleLoadbalancer map[string]bool) { // Remove the Hns Policy corresponding to this service hns := svcInfo.hns if err := hns.deleteLoadBalancer(svcInfo.hnsID); err != nil { + mapStaleLoadbalancer[svcInfo.hnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns loadbalancer policy resource.", "hnsID", svcInfo.hnsID, "ClusterIP", svcInfo.ClusterIP()) } else { // On successful delete, remove hnsId @@ -809,6 +899,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } if err := hns.deleteLoadBalancer(svcInfo.nodePorthnsID); err != nil { + mapStaleLoadbalancer[svcInfo.nodePorthnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns NodePort policy resource.", "hnsID", svcInfo.nodePorthnsID, "NodePort", svcInfo.NodePort()) } else { // On successful delete, remove hnsId @@ -816,6 +907,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } for _, externalIP := range svcInfo.externalIPs { + mapStaleLoadbalancer[externalIP.hnsID] = true if err := hns.deleteLoadBalancer(externalIP.hnsID); err != nil { klog.V(1).ErrorS(err, "Error deleting Hns ExternalIP policy resource.", "hnsID", externalIP.hnsID, "IP", externalIP.ip) } else { @@ -824,7 +916,9 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { } } for _, lbIngressIP := range svcInfo.loadBalancerIngressIPs { + klog.V(3).InfoS("Loadbalancer Hns LoadBalancer delete triggered for loadBalancer Ingress resources in cleanup", "lbIngressIP", lbIngressIP) if err := hns.deleteLoadBalancer(lbIngressIP.hnsID); err != nil { + mapStaleLoadbalancer[lbIngressIP.hnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns IngressIP policy resource.", "hnsID", lbIngressIP.hnsID, "IP", lbIngressIP.ip) } else { // On successful delete, remove hnsId @@ -833,6 +927,7 @@ func (svcInfo *serviceInfo) deleteLoadBalancerPolicy() { if lbIngressIP.healthCheckHnsID != "" { if err := hns.deleteLoadBalancer(lbIngressIP.healthCheckHnsID); err != nil { + mapStaleLoadbalancer[lbIngressIP.healthCheckHnsID] = true klog.V(1).ErrorS(err, "Error deleting Hns IngressIP HealthCheck policy resource.", "hnsID", lbIngressIP.healthCheckHnsID, "IP", lbIngressIP.ip) } else { // On successful delete, remove hnsId @@ -992,7 +1087,7 @@ func (proxier *Proxier) cleanupAllPolicies() { klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName) continue } - svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName]) + svcInfo.cleanupAllPolicies(proxier.endpointsMap[svcName], proxier.mapStaleLoadbalancers, false) } } @@ -1009,6 +1104,56 @@ func isNetworkNotFoundError(err error) bool { return false } +// isAllEndpointsTerminating function will return true if all the endpoints are terminating. +// If atleast one is not terminating, then return false +func (proxier *Proxier) isAllEndpointsTerminating(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { + for _, epInfo := range proxier.endpointsMap[svcName] { + ep, ok := epInfo.(*endpointsInfo) + if !ok { + continue + } + if isLocalTrafficDSR && !ep.GetIsLocal() { + // KEP-1669: Ignore remote endpoints when the ExternalTrafficPolicy is Local (DSR Mode) + continue + } + // If Readiness Probe fails and pod is not under delete, then + // the state of the endpoint will be - Ready:False, Serving:False, Terminating:False + if !ep.IsReady() && !ep.IsTerminating() { + // Ready:false, Terminating:False, ignore + continue + } + if !ep.IsTerminating() { + return false + } + } + return true +} + +// isAllEndpointsNonServing function will return true if all the endpoints are non serving. +// If atleast one is serving, then return false +func (proxier *Proxier) isAllEndpointsNonServing(svcName proxy.ServicePortName, isLocalTrafficDSR bool) bool { + for _, epInfo := range proxier.endpointsMap[svcName] { + ep, ok := epInfo.(*endpointsInfo) + if !ok { + continue + } + if isLocalTrafficDSR && !ep.GetIsLocal() { + continue + } + if ep.IsServing() { + return false + } + } + return true +} + +// updateQueriedEndpoints updates the queriedEndpoints map with newly created endpoint details +func updateQueriedEndpoints(newHnsEndpoint *endpointsInfo, queriedEndpoints map[string]*endpointsInfo) { + // store newly created endpoints in queriedEndpoints + queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint + queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint +} + // This is where all of the hns save/restore calls happen. // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { @@ -1125,9 +1270,7 @@ func (proxier *Proxier) syncProxyRules() { newHnsEndpoint.refCount = proxier.endPointsRefCount.getRefCount(newHnsEndpoint.hnsID) *newHnsEndpoint.refCount++ svcInfo.remoteEndpoint = newHnsEndpoint - // store newly created endpoints in queriedEndpoints - queriedEndpoints[newHnsEndpoint.hnsID] = newHnsEndpoint - queriedEndpoints[newHnsEndpoint.ip] = newHnsEndpoint + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } } @@ -1137,6 +1280,19 @@ func (proxier *Proxier) syncProxyRules() { // Create Remote endpoints for every endpoint, corresponding to the service containsPublicIP := false containsNodeIP := false + var allEndpointsTerminating, allEndpointsNonServing bool + someEndpointsServing := true + + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints) && len(svcInfo.loadBalancerIngressIPs) > 0 { + // Check should be done only if comes under the feature gate or enabled + // The check should be done only if Spec.Type == Loadbalancer. + allEndpointsTerminating = proxier.isAllEndpointsTerminating(svcName, svcInfo.localTrafficDSR) + allEndpointsNonServing = proxier.isAllEndpointsNonServing(svcName, svcInfo.localTrafficDSR) + someEndpointsServing = !allEndpointsNonServing + klog.V(4).InfoS("Terminating status checked for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "allEndpointsTerminating", allEndpointsTerminating, "allEndpointsNonServing", allEndpointsNonServing, "localTrafficDSR", svcInfo.localTrafficDSR) + } else { + klog.V(4).InfoS("Skipped terminating status check for all endpoints", "svcClusterIP", svcInfo.ClusterIP(), "proxyEndpointsFeatureGateEnabled", utilfeature.DefaultFeatureGate.Enabled(kubefeatures.ProxyTerminatingEndpoints), "ingressLBCount", len(svcInfo.loadBalancerIngressIPs)) + } for _, epInfo := range proxier.endpointsMap[svcName] { ep, ok := epInfo.(*endpointsInfo) @@ -1145,9 +1301,25 @@ func (proxier *Proxier) syncProxyRules() { continue } - if !ep.IsReady() { + if svcInfo.internalTrafficLocal && svcInfo.localTrafficDSR && !ep.GetIsLocal() { + // No need to use or create remote endpoint when internal and external traffic policy is remote + klog.V(3).InfoS("Skipping the endpoint. Both internalTraffic and external traffic policies are local", "EpIP", ep.ip, " EpPort", ep.port) continue } + + if someEndpointsServing { + + if !allEndpointsTerminating && !ep.IsReady() { + klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is either not ready or all not all endpoints are terminating", "EpIP", ep.ip, " EpPort", ep.port, "allEndpointsTerminating", allEndpointsTerminating, "IsEpReady", ep.IsReady()) + continue + } + if !ep.IsServing() { + klog.V(3).InfoS("Skipping the endpoint for LB creation. Endpoint is not serving", "EpIP", ep.ip, " EpPort", ep.port, "IsEpServing", ep.IsServing()) + continue + } + + } + var newHnsEndpoint *endpointsInfo hnsNetworkName := proxier.network.name var err error @@ -1205,6 +1377,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Remote endpoint creation failed", "endpointsInfo", hnsEndpoint) continue } + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } else { hnsEndpoint := &endpointsInfo{ @@ -1218,6 +1391,7 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Remote endpoint creation failed") continue } + updateQueriedEndpoints(newHnsEndpoint, queriedEndpoints) } } // For Overlay networks 'SourceVIP' on an Load balancer Policy can either be chosen as @@ -1266,7 +1440,14 @@ func (proxier *Proxier) syncProxyRules() { klog.InfoS("Load Balancer already exists -- Debug ", "hnsID", svcInfo.hnsID) } + // In ETP:Cluster, if all endpoints are under termination, + // it will have serving and terminating, else only ready and serving if len(hnsEndpoints) == 0 { + if svcInfo.winProxyOptimization { + // Deleting loadbalancers when there are no endpoints to serve. + klog.V(3).InfoS("Cleanup existing ", "endpointsInfo", hnsEndpoints, "serviceName", svcName) + svcInfo.deleteLoadBalancerPolicy(proxier.mapStaleLoadbalancers) + } klog.ErrorS(nil, "Endpoint information not available for service, not applying any policy", "serviceName", svcName) continue } @@ -1283,23 +1464,41 @@ func (proxier *Proxier) syncProxyRules() { klog.InfoS("Session Affinity is not supported on this version of Windows") } - hnsLoadBalancer, err := hns.getLoadBalancer( - hnsEndpoints, - loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, - sourceVip, - svcInfo.ClusterIP().String(), - Enum(svcInfo.Protocol()), - uint16(svcInfo.targetPort), - uint16(svcInfo.Port()), - queriedLoadBalancers, - ) - if err != nil { - klog.ErrorS(err, "Policy creation failed") - continue - } + endpointsAvailableForLB := !allEndpointsTerminating && !allEndpointsNonServing + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), hnsEndpoints, queriedLoadBalancers) + + if endpointsAvailableForLB { + + // clusterIPEndpoints is the endpoint list used for creating ClusterIP loadbalancer. + clusterIPEndpoints := hnsEndpoints + if svcInfo.internalTrafficLocal { + // Take local endpoints for clusterip loadbalancer when internal traffic policy is local. + clusterIPEndpoints = hnsLocalEndpoints + } + + // If all endpoints are terminating, then no need to create Cluster IP LoadBalancer + // Cluster IP LoadBalancer creation + hnsLoadBalancer, err := hns.getLoadBalancer( + clusterIPEndpoints, + loadBalancerFlags{isDSR: proxier.isDSR, isIPv6: proxier.isIPv6Mode, sessionAffinity: sessionAffinityClientIP}, + sourceVip, + svcInfo.ClusterIP().String(), + Enum(svcInfo.Protocol()), + uint16(svcInfo.targetPort), + uint16(svcInfo.Port()), + queriedLoadBalancers, + ) + if err != nil { + klog.ErrorS(err, "Policy creation failed") + continue + } + + svcInfo.hnsID = hnsLoadBalancer.hnsID + klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) - svcInfo.hnsID = hnsLoadBalancer.hnsID - klog.V(3).InfoS("Hns LoadBalancer resource created for cluster ip resources", "clusterIP", svcInfo.ClusterIP(), "hnsID", hnsLoadBalancer.hnsID) + } else { + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for cluster ip resources. Reason : all endpoints are terminating", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) + } // If nodePort is specified, user should be able to use nodeIP:nodePort to reach the backend endpoints if svcInfo.NodePort() > 0 { @@ -1310,10 +1509,13 @@ func (proxier *Proxier) syncProxyRules() { nodePortEndpoints = hnsLocalEndpoints } - if len(nodePortEndpoints) > 0 { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &svcInfo.nodePorthnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), nodePortEndpoints, queriedLoadBalancers) + + if len(nodePortEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to create Node Port LoadBalancer hnsLoadBalancer, err := hns.getLoadBalancer( nodePortEndpoints, - loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, localRoutedVIP: true, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, "", Enum(svcInfo.Protocol()), @@ -1329,7 +1531,7 @@ func (proxier *Proxier) syncProxyRules() { svcInfo.nodePorthnsID = hnsLoadBalancer.hnsID klog.V(3).InfoS("Hns LoadBalancer resource created for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "hnsID", hnsLoadBalancer.hnsID) + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for nodePort resources", "clusterIP", svcInfo.ClusterIP(), "nodeport", svcInfo.NodePort(), "allEndpointsTerminating", allEndpointsTerminating) } } @@ -1341,11 +1543,14 @@ func (proxier *Proxier) syncProxyRules() { externalIPEndpoints = hnsLocalEndpoints } - if len(externalIPEndpoints) > 0 { + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &externalIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), externalIPEndpoints, queriedLoadBalancers) + + if len(externalIPEndpoints) > 0 && endpointsAvailableForLB { + // If all endpoints are in terminating stage, then no need to External IP LoadBalancer // Try loading existing policies, if already available hnsLoadBalancer, err = hns.getLoadBalancer( externalIPEndpoints, - loadBalancerFlags{isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.localTrafficDSR, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, externalIP.ip, Enum(svcInfo.Protocol()), @@ -1360,7 +1565,7 @@ func (proxier *Proxier) syncProxyRules() { externalIP.hnsID = hnsLoadBalancer.hnsID klog.V(3).InfoS("Hns LoadBalancer resource created for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) } else { - klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "hnsID", hnsLoadBalancer.hnsID) + klog.V(3).InfoS("Skipped creating Hns LoadBalancer for externalIP resources", "externalIP", externalIP, "allEndpointsTerminating", allEndpointsTerminating) } } // Create a Load Balancer Policy for each loadbalancer ingress @@ -1371,10 +1576,12 @@ func (proxier *Proxier) syncProxyRules() { lbIngressEndpoints = hnsLocalEndpoints } + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.hnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), lbIngressEndpoints, queriedLoadBalancers) + if len(lbIngressEndpoints) > 0 { hnsLoadBalancer, err := hns.getLoadBalancer( lbIngressEndpoints, - loadBalancerFlags{isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, + loadBalancerFlags{isVipExternalIP: true, isDSR: svcInfo.preserveDIP || svcInfo.localTrafficDSR, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP, sessionAffinity: sessionAffinityClientIP, isIPv6: proxier.isIPv6Mode}, sourceVip, lbIngressIP.ip, Enum(svcInfo.Protocol()), @@ -1392,11 +1599,15 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Skipped creating Hns LoadBalancer for loadBalancer Ingress resources", "lbIngressIP", lbIngressIP) } - if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil { + if proxier.forwardHealthCheckVip && gatewayHnsendpoint != nil && endpointsAvailableForLB { + // Avoid creating health check loadbalancer if all the endpoints are terminating nodeport := proxier.healthzPort if svcInfo.HealthCheckNodePort() != 0 { nodeport = svcInfo.HealthCheckNodePort() } + + proxier.deleteExistingLoadBalancer(hns, svcInfo.winProxyOptimization, &lbIngressIP.healthCheckHnsID, sourceVip, Enum(svcInfo.Protocol()), uint16(svcInfo.targetPort), uint16(svcInfo.Port()), []endpointsInfo{*gatewayHnsendpoint}, queriedLoadBalancers) + hnsHealthCheckLoadBalancer, err := hns.getLoadBalancer( []endpointsInfo{*gatewayHnsendpoint}, loadBalancerFlags{isDSR: false, useMUX: svcInfo.preserveDIP, preserveDIP: svcInfo.preserveDIP}, @@ -1413,6 +1624,8 @@ func (proxier *Proxier) syncProxyRules() { } lbIngressIP.healthCheckHnsID = hnsHealthCheckLoadBalancer.hnsID klog.V(3).InfoS("Hns Health Check LoadBalancer resource created for loadBalancer Ingress resources", "ip", lbIngressIP) + } else { + klog.V(3).InfoS("Skipped creating Hns Health Check LoadBalancer for loadBalancer Ingress resources", "ip", lbIngressIP, "allEndpointsTerminating", allEndpointsTerminating) } } svcInfo.policyApplied = true @@ -1444,7 +1657,51 @@ func (proxier *Proxier) syncProxyRules() { // remove stale endpoint refcount entries for hnsID, referenceCount := range proxier.endPointsRefCount { if *referenceCount <= 0 { + klog.V(3).InfoS("Deleting unreferenced remote endpoint", "hnsID", hnsID) + proxier.hns.deleteEndpoint(hnsID) delete(proxier.endPointsRefCount, hnsID) } } + // This will cleanup stale load balancers which are pending delete + // in last iteration + proxier.cleanupStaleLoadbalancers() +} + +// deleteExistingLoadBalancer checks whether loadbalancer delete is needed or not. +// If it is needed, the function will delete the existing loadbalancer and return true, else false. +func (proxier *Proxier) deleteExistingLoadBalancer(hns HostNetworkService, winProxyOptimization bool, lbHnsID *string, sourceVip string, protocol, intPort, extPort uint16, endpoints []endpointsInfo, queriedLoadBalancers map[loadBalancerIdentifier]*loadBalancerInfo) bool { + + if !winProxyOptimization || *lbHnsID == "" { + // Loadbalancer delete not needed + return false + } + + lbID, lbIdErr := findLoadBalancerID( + endpoints, + sourceVip, + protocol, + intPort, + extPort, + ) + + if lbIdErr != nil { + return proxier.deleteLoadBalancer(hns, lbHnsID) + } + + if _, ok := queriedLoadBalancers[lbID]; ok { + // The existing loadbalancer in the system is same as what we try to delete and recreate. So we skip deleting. + return false + } + + return proxier.deleteLoadBalancer(hns, lbHnsID) +} + +func (proxier *Proxier) deleteLoadBalancer(hns HostNetworkService, lbHnsID *string) bool { + klog.V(3).InfoS("Hns LoadBalancer delete triggered for loadBalancer resources", "lbHnsID", *lbHnsID) + if err := hns.deleteLoadBalancer(*lbHnsID); err != nil { + // This will be cleanup by cleanupStaleLoadbalancer fnction. + proxier.mapStaleLoadbalancers[*lbHnsID] = true + } + *lbHnsID = "" + return true } diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/preemption/preemption.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/preemption/preemption.go index 32e1e1ba7434..26a5c20541bc 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/preemption/preemption.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/preemption/preemption.go @@ -362,7 +362,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. WithType(v1.DisruptionTarget). WithStatus(v1.ConditionTrue). WithReason("PreemptionByKubeScheduler"). - WithMessage(fmt.Sprintf("Kube-scheduler: preempting to accommodate a higher priority pod: %s", klog.KObj(pod))). + WithMessage(fmt.Sprintf("%s: preempting to accommodate a higher priority pod", pod.Spec.SchedulerName)). WithLastTransitionTime(metav1.Now()), ) @@ -378,8 +378,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. return } } - fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", - pod.Namespace, pod.Name, c.Name()) + fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by a pod on node %v", c.Name()) } fh.Parallelizer().Until(ctx, len(c.Victims().Pods), preemptPod, ev.PluginName) diff --git a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go index 53e270c2ab7d..3a17cb28dd73 100644 --- a/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go +++ b/cluster-autoscaler/vendor/k8s.io/kubernetes/pkg/scheduler/framework/runtime/framework.go @@ -730,11 +730,10 @@ func (f *frameworkImpl) RunFilterPlugins( if !pluginStatus.IsUnschedulable() { // Filter plugins are not supposed to return any status other than // Success or Unschedulable. - errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name()) - return map[string]*framework.Status{pl.Name(): errStatus} + pluginStatus = framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())) } pluginStatus.SetFailedPlugin(pl.Name()) - statuses[pl.Name()] = pluginStatus + return map[string]*framework.Status{pl.Name(): pluginStatus} } } diff --git a/cluster-autoscaler/vendor/modules.txt b/cluster-autoscaler/vendor/modules.txt index 0a4a681206d9..672c0e8328af 100644 --- a/cluster-autoscaler/vendor/modules.txt +++ b/cluster-autoscaler/vendor/modules.txt @@ -336,7 +336,7 @@ github.com/google/cadvisor/utils/sysfs github.com/google/cadvisor/utils/sysinfo github.com/google/cadvisor/version github.com/google/cadvisor/watcher -# github.com/google/cel-go v0.12.5 +# github.com/google/cel-go v0.12.6 ## explicit; go 1.17 github.com/google/cel-go/cel github.com/google/cel-go/checker @@ -939,7 +939,7 @@ gopkg.in/yaml.v2 # gopkg.in/yaml.v3 v3.0.1 ## explicit gopkg.in/yaml.v3 -# k8s.io/api v0.26.0 => k8s.io/api v0.26.0 +# k8s.io/api v0.26.1 => k8s.io/api v0.26.1 ## explicit; go 1.19 k8s.io/api/admission/v1 k8s.io/api/admission/v1beta1 @@ -994,7 +994,7 @@ k8s.io/api/scheduling/v1beta1 k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 -# k8s.io/apimachinery v0.26.0 => k8s.io/apimachinery v0.26.1-rc.0 +# k8s.io/apimachinery v0.26.1 => k8s.io/apimachinery v0.26.2-rc.0 ## explicit; go 1.19 k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors @@ -1054,7 +1054,7 @@ k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/netutil k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/apiserver v0.26.0 => k8s.io/apiserver v0.26.0 +# k8s.io/apiserver v0.26.1 => k8s.io/apiserver v0.26.1 ## explicit; go 1.19 k8s.io/apiserver/pkg/admission k8s.io/apiserver/pkg/admission/cel @@ -1193,7 +1193,7 @@ k8s.io/apiserver/plugin/pkg/audit/truncate k8s.io/apiserver/plugin/pkg/audit/webhook k8s.io/apiserver/plugin/pkg/authenticator/token/webhook k8s.io/apiserver/plugin/pkg/authorizer/webhook -# k8s.io/client-go v0.26.0 => k8s.io/client-go v0.26.0 +# k8s.io/client-go v0.26.1 => k8s.io/client-go v0.26.1 ## explicit; go 1.19 k8s.io/client-go/applyconfigurations/admissionregistration/v1 k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1 @@ -1513,7 +1513,7 @@ k8s.io/client-go/util/homedir k8s.io/client-go/util/keyutil k8s.io/client-go/util/retry k8s.io/client-go/util/workqueue -# k8s.io/cloud-provider v0.26.0 => k8s.io/cloud-provider v0.26.0 +# k8s.io/cloud-provider v0.26.1 => k8s.io/cloud-provider v0.26.1 ## explicit; go 1.19 k8s.io/cloud-provider k8s.io/cloud-provider/api @@ -1523,7 +1523,7 @@ k8s.io/cloud-provider/service/helpers k8s.io/cloud-provider/volume k8s.io/cloud-provider/volume/errors k8s.io/cloud-provider/volume/helpers -# k8s.io/component-base v0.26.0 => k8s.io/component-base v0.26.0 +# k8s.io/component-base v0.26.1 => k8s.io/component-base v0.26.1 ## explicit; go 1.19 k8s.io/component-base/cli/flag k8s.io/component-base/codec @@ -1550,7 +1550,7 @@ k8s.io/component-base/tracing k8s.io/component-base/tracing/api/v1 k8s.io/component-base/version k8s.io/component-base/version/verflag -# k8s.io/component-helpers v0.26.0 => k8s.io/component-helpers v0.26.0 +# k8s.io/component-helpers v0.26.1 => k8s.io/component-helpers v0.26.1 ## explicit; go 1.19 k8s.io/component-helpers/apimachinery/lease k8s.io/component-helpers/node/topology @@ -1560,16 +1560,16 @@ k8s.io/component-helpers/scheduling/corev1 k8s.io/component-helpers/scheduling/corev1/nodeaffinity k8s.io/component-helpers/storage/ephemeral k8s.io/component-helpers/storage/volume -# k8s.io/cri-api v0.0.0 => k8s.io/cri-api v0.26.1-rc.0 +# k8s.io/cri-api v0.0.0 => k8s.io/cri-api v0.26.2-rc.0 ## explicit; go 1.19 k8s.io/cri-api/pkg/apis k8s.io/cri-api/pkg/apis/runtime/v1 k8s.io/cri-api/pkg/errors -# k8s.io/csi-translation-lib v0.26.0 => k8s.io/csi-translation-lib v0.26.0 +# k8s.io/csi-translation-lib v0.26.1 => k8s.io/csi-translation-lib v0.26.1 ## explicit; go 1.19 k8s.io/csi-translation-lib k8s.io/csi-translation-lib/plugins -# k8s.io/dynamic-resource-allocation v0.0.0 => k8s.io/dynamic-resource-allocation v0.26.0 +# k8s.io/dynamic-resource-allocation v0.0.0 => k8s.io/dynamic-resource-allocation v0.26.1 ## explicit; go 1.19 k8s.io/dynamic-resource-allocation/resourceclaim # k8s.io/klog/v2 v2.80.1 @@ -1580,7 +1580,7 @@ k8s.io/klog/v2/internal/clock k8s.io/klog/v2/internal/dbg k8s.io/klog/v2/internal/serialize k8s.io/klog/v2/internal/severity -# k8s.io/kms v0.26.0 => k8s.io/kms v0.26.1-rc.0 +# k8s.io/kms v0.26.1 => k8s.io/kms v0.26.2-rc.0 ## explicit; go 1.19 k8s.io/kms/apis/v1beta1 k8s.io/kms/apis/v2alpha1 @@ -1603,19 +1603,19 @@ k8s.io/kube-openapi/pkg/spec3 k8s.io/kube-openapi/pkg/util k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/validation/spec -# k8s.io/kube-proxy v0.0.0 => k8s.io/kube-proxy v0.26.0 +# k8s.io/kube-proxy v0.0.0 => k8s.io/kube-proxy v0.26.1 ## explicit; go 1.19 k8s.io/kube-proxy/config/v1alpha1 -# k8s.io/kube-scheduler v0.0.0 => k8s.io/kube-scheduler v0.26.0 +# k8s.io/kube-scheduler v0.0.0 => k8s.io/kube-scheduler v0.26.1 ## explicit; go 1.19 k8s.io/kube-scheduler/config/v1 k8s.io/kube-scheduler/config/v1beta2 k8s.io/kube-scheduler/config/v1beta3 k8s.io/kube-scheduler/extender/v1 -# k8s.io/kubectl v0.0.0 => k8s.io/kubectl v0.26.0 +# k8s.io/kubectl v0.0.0 => k8s.io/kubectl v0.26.1 ## explicit; go 1.19 k8s.io/kubectl/pkg/scale -# k8s.io/kubelet v0.26.0 => k8s.io/kubelet v0.26.0 +# k8s.io/kubelet v0.26.1 => k8s.io/kubelet v0.26.1 ## explicit; go 1.19 k8s.io/kubelet/config/v1 k8s.io/kubelet/config/v1alpha1 @@ -1632,7 +1632,7 @@ k8s.io/kubelet/pkg/apis/pluginregistration/v1 k8s.io/kubelet/pkg/apis/podresources/v1 k8s.io/kubelet/pkg/apis/podresources/v1alpha1 k8s.io/kubelet/pkg/apis/stats/v1alpha1 -# k8s.io/kubernetes v1.26.0 +# k8s.io/kubernetes v1.26.1 ## explicit; go 1.19 k8s.io/kubernetes/cmd/kube-proxy/app k8s.io/kubernetes/cmd/kubelet/app @@ -1902,7 +1902,7 @@ k8s.io/kubernetes/pkg/volume/vsphere_volume k8s.io/kubernetes/pkg/windows/service k8s.io/kubernetes/test/utils k8s.io/kubernetes/third_party/forked/golang/expansion -# k8s.io/legacy-cloud-providers v0.0.0 => k8s.io/legacy-cloud-providers v0.26.0 +# k8s.io/legacy-cloud-providers v0.0.0 => k8s.io/legacy-cloud-providers v0.26.1 ## explicit; go 1.19 k8s.io/legacy-cloud-providers/aws k8s.io/legacy-cloud-providers/azure @@ -1945,7 +1945,7 @@ k8s.io/legacy-cloud-providers/gce/gcpcredential k8s.io/legacy-cloud-providers/vsphere k8s.io/legacy-cloud-providers/vsphere/vclib k8s.io/legacy-cloud-providers/vsphere/vclib/diskmanagers -# k8s.io/mount-utils v0.26.0-alpha.0 => k8s.io/mount-utils v0.26.1-rc.0 +# k8s.io/mount-utils v0.26.0-alpha.0 => k8s.io/mount-utils v0.26.2-rc.0 ## explicit; go 1.19 k8s.io/mount-utils # k8s.io/utils v0.0.0-20221107191617-1a15be271d1d @@ -1968,9 +1968,11 @@ k8s.io/utils/pointer k8s.io/utils/strings k8s.io/utils/strings/slices k8s.io/utils/trace -# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 +# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35 ## explicit; go 1.17 sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client +sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics +sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client # sigs.k8s.io/cloud-provider-azure v1.24.2 ## explicit; go 1.18 @@ -2041,32 +2043,32 @@ sigs.k8s.io/yaml # github.com/aws/aws-sdk-go/service/eks => github.com/aws/aws-sdk-go/service/eks v1.38.49 # github.com/digitalocean/godo => github.com/digitalocean/godo v1.27.0 # github.com/rancher/go-rancher => github.com/rancher/go-rancher v0.1.0 -# k8s.io/api => k8s.io/api v0.26.0 -# k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.26.0 -# k8s.io/apimachinery => k8s.io/apimachinery v0.26.1-rc.0 -# k8s.io/apiserver => k8s.io/apiserver v0.26.0 -# k8s.io/cli-runtime => k8s.io/cli-runtime v0.26.0 -# k8s.io/client-go => k8s.io/client-go v0.26.0 -# k8s.io/cloud-provider => k8s.io/cloud-provider v0.26.0 -# k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.26.0 -# k8s.io/code-generator => k8s.io/code-generator v0.26.1-rc.0 -# k8s.io/component-base => k8s.io/component-base v0.26.0 -# k8s.io/component-helpers => k8s.io/component-helpers v0.26.0 -# k8s.io/controller-manager => k8s.io/controller-manager v0.26.0 -# k8s.io/cri-api => k8s.io/cri-api v0.26.1-rc.0 -# k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.26.0 -# k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.26.0 -# k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.26.0 -# k8s.io/kube-proxy => k8s.io/kube-proxy v0.26.0 -# k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.26.0 -# k8s.io/kubectl => k8s.io/kubectl v0.26.0 -# k8s.io/kubelet => k8s.io/kubelet v0.26.0 -# k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.26.0 -# k8s.io/metrics => k8s.io/metrics v0.26.0 -# k8s.io/mount-utils => k8s.io/mount-utils v0.26.1-rc.0 -# k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.26.0 -# k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.26.0 -# k8s.io/sample-controller => k8s.io/sample-controller v0.26.0 -# k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.26.0 -# k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.26.0 -# k8s.io/kms => k8s.io/kms v0.26.1-rc.0 +# k8s.io/api => k8s.io/api v0.26.1 +# k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.26.1 +# k8s.io/apimachinery => k8s.io/apimachinery v0.26.2-rc.0 +# k8s.io/apiserver => k8s.io/apiserver v0.26.1 +# k8s.io/cli-runtime => k8s.io/cli-runtime v0.26.1 +# k8s.io/client-go => k8s.io/client-go v0.26.1 +# k8s.io/cloud-provider => k8s.io/cloud-provider v0.26.1 +# k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.26.1 +# k8s.io/code-generator => k8s.io/code-generator v0.26.2-rc.0 +# k8s.io/component-base => k8s.io/component-base v0.26.1 +# k8s.io/component-helpers => k8s.io/component-helpers v0.26.1 +# k8s.io/controller-manager => k8s.io/controller-manager v0.26.1 +# k8s.io/cri-api => k8s.io/cri-api v0.26.2-rc.0 +# k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.26.1 +# k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.26.1 +# k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.26.1 +# k8s.io/kube-proxy => k8s.io/kube-proxy v0.26.1 +# k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.26.1 +# k8s.io/kubectl => k8s.io/kubectl v0.26.1 +# k8s.io/kubelet => k8s.io/kubelet v0.26.1 +# k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.26.1 +# k8s.io/metrics => k8s.io/metrics v0.26.1 +# k8s.io/mount-utils => k8s.io/mount-utils v0.26.2-rc.0 +# k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.26.1 +# k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.26.1 +# k8s.io/sample-controller => k8s.io/sample-controller v0.26.1 +# k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.26.1 +# k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.26.1 +# k8s.io/kms => k8s.io/kms v0.26.2-rc.0 diff --git a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go index c55070ae6378..cb186cefc263 100644 --- a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go +++ b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go @@ -29,6 +29,9 @@ import ( "google.golang.org/grpc" "k8s.io/klog/v2" + + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics" + commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) @@ -131,6 +134,9 @@ type grpcTunnel struct { // closing should only be accessed through atomic methods. // TODO: switch this to an atomic.Bool once the client is exclusively buit with go1.19+ closing uint32 + + // Stores the current metrics.ClientConnectionStatus + prevStatus atomic.Value } type clientConn interface { @@ -139,6 +145,11 @@ type clientConn interface { var _ clientConn = &grpc.ClientConn{} +var ( + // Expose metrics for client to register. + Metrics = metrics.Metrics +) + // CreateSingleUseGrpcTunnel creates a Tunnel to dial to a remote server through a // gRPC based proxy service. // Currently, a single tunnel supports a single connection, and the tunnel is closed when the connection is terminated @@ -177,7 +188,7 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context, } func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel { - return &grpcTunnel{ + t := grpcTunnel{ stream: stream, clientConn: c, pendingDial: pendingDialManager{pendingDials: make(map[int64]pendingDial)}, @@ -185,6 +196,36 @@ func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *g readTimeoutSeconds: 10, done: make(chan struct{}), } + s := metrics.ClientConnectionStatusCreated + t.prevStatus.Store(s) + metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(s)).Inc() + return &t +} + +func (t *grpcTunnel) updateMetric(status metrics.ClientConnectionStatus) { + select { + case <-t.Done(): + return + default: + } + + prevStatus := t.prevStatus.Swap(status).(metrics.ClientConnectionStatus) + + m := metrics.Metrics.GetClientConnectionsMetric() + m.WithLabelValues(string(prevStatus)).Dec() + m.WithLabelValues(string(status)).Inc() +} + +// closeMetric should be called exactly once to finalize client_connections metric. +func (t *grpcTunnel) closeMetric() { + select { + case <-t.Done(): + return + default: + } + prevStatus := t.prevStatus.Load().(metrics.ClientConnectionStatus) + + metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(prevStatus)).Dec() } func (t *grpcTunnel) serve(tunnelCtx context.Context) { @@ -196,19 +237,29 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // close any channels remaining for these connections. t.conns.closeAll() + t.closeMetric() + close(t.done) }() for { pkt, err := t.stream.Recv() - if err == io.EOF || t.isClosing() { + if err == io.EOF { return } + const segment = commonmetrics.SegmentToClient + isClosing := t.isClosing() if err != nil || pkt == nil { - klog.ErrorS(err, "stream read failure") + if !isClosing { + klog.ErrorS(err, "stream read failure") + } + metrics.Metrics.ObserveStreamErrorNoPacket(segment, err) + return + } + metrics.Metrics.ObservePacket(segment, pkt.Type) + if isClosing { return } - klog.V(5).InfoS("[tracing] recv packet", "type", pkt.Type) switch pkt.Type { @@ -222,13 +273,19 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context // // In either scenario, we should return here and close the tunnel as it is no longer needed. - klog.V(1).InfoS("DialResp not recognized; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random) + kvs := []interface{}{"dialID", resp.Random, "connectID", resp.ConnectID} + if resp.Error != "" { + kvs = append(kvs, "error", resp.Error) + } + klog.V(1).InfoS("DialResp not recognized; dropped", kvs...) return } result := dialResult{connid: resp.ConnectID} if resp.Error != "" { - result.err = &dialFailure{resp.Error, DialFailureEndpoint} + result.err = &dialFailure{resp.Error, metrics.DialFailureEndpoint} + } else { + t.updateMetric(metrics.ClientConnectionStatusOk) } select { // try to send to the result channel @@ -263,7 +320,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { klog.V(1).InfoS("DIAL_CLS after dial finished", "dialID", resp.Random) } else { result := dialResult{ - err: &dialFailure{"dial closed", DialFailureDialClosed}, + err: &dialFailure{"dial closed", metrics.DialFailureDialClosed}, } select { case pendingDial.resultCh <- result: @@ -316,6 +373,15 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) { // Dial connects to the address on the named network, similar to // what net.Dial does. The only supported protocol is tcp. func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) { + conn, err := t.dialContext(requestCtx, protocol, address) + if err != nil { + _, reason := GetDialFailureReason(err) + metrics.Metrics.ObserveDialFailure(reason) + } + return conn, err +} + +func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) { select { case <-t.done: return nil, errors.New("tunnel is closed") @@ -326,6 +392,8 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s return nil, errors.New("protocol not supported") } + t.updateMetric(metrics.ClientConnectionStatusDialing) + random := rand.Int63() /* #nosec G404 */ // This channel is closed once we're returning and no longer waiting on resultCh @@ -350,8 +418,11 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s } klog.V(5).InfoS("[tracing] send packet", "type", req.Type) + const segment = commonmetrics.SegmentFromClient + metrics.Metrics.ObservePacket(segment, req.Type) err := t.stream.Send(req) if err != nil { + metrics.Metrics.ObserveStreamError(segment, err, req.Type) return nil, err } @@ -375,14 +446,14 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s case <-time.After(30 * time.Second): klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random) go t.closeDial(random) - return nil, &dialFailure{"dial timeout, backstop", DialFailureTimeout} + return nil, &dialFailure{"dial timeout, backstop", metrics.DialFailureTimeout} case <-requestCtx.Done(): klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random) go t.closeDial(random) - return nil, &dialFailure{"dial timeout, context", DialFailureContext} + return nil, &dialFailure{"dial timeout, context", metrics.DialFailureContext} case <-t.done: klog.V(5).InfoS("Tunnel closed while waiting for DialResp", "dialID", random) - return nil, &dialFailure{"tunnel closed", DialFailureTunnelClosed} + return nil, &dialFailure{"tunnel closed", metrics.DialFailureTunnelClosed} } return c, nil @@ -402,7 +473,10 @@ func (t *grpcTunnel) closeDial(dialID int64) { }, }, } + const segment = commonmetrics.SegmentFromClient + metrics.Metrics.ObservePacket(segment, req.Type) if err := t.stream.Send(req); err != nil { + metrics.Metrics.ObserveStreamError(segment, err, req.Type) klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID) } t.closeTunnel() @@ -417,38 +491,19 @@ func (t *grpcTunnel) isClosing() bool { return atomic.LoadUint32(&t.closing) != 0 } -func GetDialFailureReason(err error) (isDialFailure bool, reason DialFailureReason) { +func GetDialFailureReason(err error) (isDialFailure bool, reason metrics.DialFailureReason) { var df *dialFailure if errors.As(err, &df) { return true, df.reason } - return false, DialFailureUnknown + return false, metrics.DialFailureUnknown } type dialFailure struct { msg string - reason DialFailureReason + reason metrics.DialFailureReason } func (df *dialFailure) Error() string { return df.msg } - -type DialFailureReason string - -const ( - DialFailureUnknown DialFailureReason = "unknown" - // DialFailureTimeout indicates the hard 30 second timeout was hit. - DialFailureTimeout DialFailureReason = "timeout" - // DialFailureContext indicates that the context was cancelled or reached it's deadline before - // the dial response was returned. - DialFailureContext DialFailureReason = "context" - // DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint. - DialFailureEndpoint DialFailureReason = "endpoint" - // DialFailureDialClosed indicates that the client received a CloseDial response, indicating the - // connection was closed before the dial could complete. - DialFailureDialClosed DialFailureReason = "dialclosed" - // DialFailureTunnelClosed indicates that the client connection was closed before the dial could - // complete. - DialFailureTunnelClosed DialFailureReason = "tunnelclosed" -) diff --git a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/conn.go b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/conn.go index f76b1e37a474..14384a62cb55 100644 --- a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/conn.go +++ b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/conn.go @@ -23,6 +23,9 @@ import ( "time" "k8s.io/klog/v2" + + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics" + commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) @@ -62,8 +65,11 @@ func (c *conn) Write(data []byte) (n int, err error) { klog.V(5).InfoS("[tracing] send req", "type", req.Type) + const segment = commonmetrics.SegmentFromClient + metrics.Metrics.ObservePacket(segment, req.Type) err = c.stream.Send(req) if err != nil { + metrics.Metrics.ObserveStreamError(segment, err, req.Type) return 0, err } return len(data), err @@ -147,7 +153,10 @@ func (c *conn) Close() error { klog.V(5).InfoS("[tracing] send req", "type", req.Type) + const segment = commonmetrics.SegmentFromClient + metrics.Metrics.ObservePacket(segment, req.Type) if err := c.stream.Send(req); err != nil { + metrics.Metrics.ObserveStreamError(segment, err, req.Type) return err } diff --git a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics/metrics.go b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics/metrics.go new file mode 100644 index 000000000000..03e9d94da896 --- /dev/null +++ b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics/metrics.go @@ -0,0 +1,162 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + + commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics" + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" +) + +const ( + Namespace = "konnectivity_network_proxy" + Subsystem = "client" +) + +var ( + // Metrics provides access to all client metrics. The client + // application is responsible for registering (via Metrics.RegisterMetrics). + Metrics = newMetrics() +) + +// ClientMetrics includes all the metrics of the konnectivity-client. +type ClientMetrics struct { + registerOnce sync.Once + streamPackets *prometheus.CounterVec + streamErrors *prometheus.CounterVec + dialFailures *prometheus.CounterVec + clientConns *prometheus.GaugeVec +} + +type DialFailureReason string + +const ( + DialFailureUnknown DialFailureReason = "unknown" + // DialFailureTimeout indicates the hard 30 second timeout was hit. + DialFailureTimeout DialFailureReason = "timeout" + // DialFailureContext indicates that the context was cancelled or reached it's deadline before + // the dial response was returned. + DialFailureContext DialFailureReason = "context" + // DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint. + DialFailureEndpoint DialFailureReason = "endpoint" + // DialFailureDialClosed indicates that the client received a CloseDial response, indicating the + // connection was closed before the dial could complete. + DialFailureDialClosed DialFailureReason = "dialclosed" + // DialFailureTunnelClosed indicates that the client connection was closed before the dial could + // complete. + DialFailureTunnelClosed DialFailureReason = "tunnelclosed" +) + +type ClientConnectionStatus string + +const ( + // The connection is created but has not yet been dialed. + ClientConnectionStatusCreated ClientConnectionStatus = "created" + // The connection is pending dial response. + ClientConnectionStatusDialing ClientConnectionStatus = "dialing" + // The connection is established. + ClientConnectionStatusOk ClientConnectionStatus = "ok" + // The connection is closing. + ClientConnectionStatusClosing ClientConnectionStatus = "closing" +) + +func newMetrics() *ClientMetrics { + // The denominator (total dials started) for both + // dial_failure_total and dial_duration_seconds is the + // stream_packets_total (common metric), where segment is + // "from_client" and packet_type is "DIAL_REQ". + dialFailures := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "dial_failure_total", + Help: "Number of dial failures observed, by reason (example: remote endpoint error)", + }, + []string{ + "reason", + }, + ) + clientConns := prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "client_connections", + Help: "Number of open client connections, by status (Example: dialing)", + }, + []string{ + "status", + }, + ) + return &ClientMetrics{ + streamPackets: commonmetrics.MakeStreamPacketsTotalMetric(Namespace, Subsystem), + streamErrors: commonmetrics.MakeStreamErrorsTotalMetric(Namespace, Subsystem), + dialFailures: dialFailures, + clientConns: clientConns, + } +} + +// RegisterMetrics registers all metrics with the client application. +func (c *ClientMetrics) RegisterMetrics(r prometheus.Registerer) { + c.registerOnce.Do(func() { + r.MustRegister(c.streamPackets) + r.MustRegister(c.streamErrors) + r.MustRegister(c.dialFailures) + r.MustRegister(c.clientConns) + }) +} + +// LegacyRegisterMetrics registers all metrics via MustRegister func. +// TODO: remove this once https://github.com/kubernetes/kubernetes/pull/114293 is available. +func (c *ClientMetrics) LegacyRegisterMetrics(mustRegisterFn func(...prometheus.Collector)) { + c.registerOnce.Do(func() { + mustRegisterFn(c.streamPackets) + mustRegisterFn(c.streamErrors) + mustRegisterFn(c.dialFailures) + mustRegisterFn(c.clientConns) + }) +} + +// Reset resets the metrics. +func (c *ClientMetrics) Reset() { + c.streamPackets.Reset() + c.streamErrors.Reset() + c.dialFailures.Reset() + c.clientConns.Reset() +} + +func (c *ClientMetrics) ObserveDialFailure(reason DialFailureReason) { + c.dialFailures.WithLabelValues(string(reason)).Inc() +} + +func (c *ClientMetrics) GetClientConnectionsMetric() *prometheus.GaugeVec { + return c.clientConns +} + +func (c *ClientMetrics) ObservePacket(segment commonmetrics.Segment, packetType client.PacketType) { + commonmetrics.ObservePacket(c.streamPackets, segment, packetType) +} + +func (c *ClientMetrics) ObserveStreamErrorNoPacket(segment commonmetrics.Segment, err error) { + commonmetrics.ObserveStreamErrorNoPacket(c.streamErrors, segment, err) +} + +func (c *ClientMetrics) ObserveStreamError(segment commonmetrics.Segment, err error, packetType client.PacketType) { + commonmetrics.ObserveStreamError(c.streamErrors, segment, err, packetType) +} diff --git a/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics/metrics.go b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics/metrics.go new file mode 100644 index 000000000000..e8619f472022 --- /dev/null +++ b/cluster-autoscaler/vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics/metrics.go @@ -0,0 +1,78 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package metrics provides metric definitions and helpers used +// across konnectivity client, server, and agent. +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc/status" + + "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" +) + +// Segment identifies one of four tunnel segments (e.g. from server to agent). +type Segment string + +const ( + // SegmentFromClient indicates a packet from client to server. + SegmentFromClient Segment = "from_client" + // SegmentToClient indicates a packet from server to client. + SegmentToClient Segment = "to_client" + // SegmentFromAgent indicates a packet from agent to server. + SegmentFromAgent Segment = "from_agent" + // SegmentToAgent indicates a packet from server to agent. + SegmentToAgent Segment = "to_agent" +) + +func MakeStreamPacketsTotalMetric(namespace, subsystem string) *prometheus.CounterVec { + return prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stream_packets_total", + Help: "Count of packets processed, by segment and packet type (example: from_client, DIAL_REQ)", + }, + []string{"segment", "packet_type"}, + ) +} + +func MakeStreamErrorsTotalMetric(namespace, subsystem string) *prometheus.CounterVec { + return prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "stream_errors_total", + Help: "Count of gRPC stream errors, by segment, grpc Code, packet type. (example: from_agent, Code.Unavailable, DIAL_RSP)", + }, + []string{"segment", "code", "packet_type"}, + ) +} + +func ObservePacket(m *prometheus.CounterVec, segment Segment, packetType client.PacketType) { + m.WithLabelValues(string(segment), packetType.String()).Inc() +} + +func ObserveStreamErrorNoPacket(m *prometheus.CounterVec, segment Segment, err error) { + code := status.Code(err) + m.WithLabelValues(string(segment), code.String(), "Unknown").Inc() +} + +func ObserveStreamError(m *prometheus.CounterVec, segment Segment, err error, packetType client.PacketType) { + code := status.Code(err) + m.WithLabelValues(string(segment), code.String(), packetType.String()).Inc() +}