Skip to content

Commit

Permalink
Merge pull request #212 from chengxiangdong/feat_elb
Browse files Browse the repository at this point in the history
Automatically add the IP used for ELB health check to the security group
  • Loading branch information
k8s-ci-robot authored Apr 3, 2023
2 parents 4362924 + 86d1d3a commit 4581e69
Show file tree
Hide file tree
Showing 281 changed files with 13,794 additions and 28 deletions.
6 changes: 6 additions & 0 deletions docs/huawei-cloud-controller-manager-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ The following arguments are supported:
* `health-check-flag` Specifies whether to enable health check for a backend server group.
Valid values are `on` and `off`, defaults to `on`.

> When health check is enabled, CCM will add a new inbound rule to one of the security groups of the backend service,
allowing traffic from `100.125.0.0/16`.
This rule will be removed when all LoadBalance services are removed.
>
> `100.125.0.0/16` are internal IP addresses used by ELB to check the health of backend servers.

* `health-check-option` Specifies the health check.

This parameter is mandatory when the `health-check` is `on`.
Expand Down
6 changes: 6 additions & 0 deletions docs/usage-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ will be used, otherwise use the set value.
* `kubernetes.io/elb.health-check-flag` Optional. Specifies whether to enable health check for a backend server group.
Valid values are `on` and `off`, defaults to `on`.

> When health check is enabled, CCM will add a new inbound rule to one of the security groups of the backend service,
allowing traffic from `100.125.0.0/16`.
This rule will be removed when all LoadBalance services are removed.
>
> `100.125.0.0/16` are internal IP addresses used by ELB to check the health of backend servers.
* `kubernetes.io/elb.health-check-option` Optional. Specifies the health check.
This parameter is mandatory when the `health-check` is `on`.
This is a json string, such as `{"delay": 3, "timeout": 15, "max_retries": 3}`.
Expand Down
18 changes: 13 additions & 5 deletions pkg/cloudprovider/huaweicloud/dedicatedloadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (d *DedicatedLoadBalancer) EnsureLoadBalancer(ctx context.Context, clusterN
}

// add or remove health monitor
if err = d.addOrRemoveHealthMonitor(loadbalancer.Id, pool, port, service); err != nil {
if err = d.ensureHealthCheck(loadbalancer.Id, pool, port, service, nodes[0]); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -672,11 +672,19 @@ func (d *DedicatedLoadBalancer) getSessionAffinity(service *v1.Service) *elbmode
}
}

func (d *DedicatedLoadBalancer) addOrRemoveHealthMonitor(loadbalancerID string, pool *elbmodel.Pool,
port v1.ServicePort, service *v1.Service) error {
// ensureHealthCheck add or update or remove health check
func (d *DedicatedLoadBalancer) ensureHealthCheck(loadbalancerID string, pool *elbmodel.Pool,
port v1.ServicePort, service *v1.Service, node *v1.Node) error {
healthCheckOpts := getHealthCheckOptionFromAnnotation(service, d.loadbalancerOpts)
monitorID := pool.HealthmonitorId
klog.Infof("add or remove health check: %s : %#v", monitorID, healthCheckOpts)
klog.Infof("add or update or remove health check: %s : %#v", monitorID, healthCheckOpts)

if healthCheckOpts.Enable {
err := d.allowHealthCheckRule(node)
if err != nil {
return err
}
}

// create health monitor
if monitorID == "" && healthCheckOpts.Enable {
Expand Down Expand Up @@ -775,7 +783,7 @@ func (d *DedicatedLoadBalancer) UpdateLoadBalancer(ctx context.Context, clusterN
}

// add or remove health monitor
if err = d.addOrRemoveHealthMonitor(loadbalancer.Id, pool, port, service); err != nil {
if err = d.ensureHealthCheck(loadbalancer.Id, pool, port, service, nodes[0]); err != nil {
return err
}
}
Expand Down
198 changes: 186 additions & 12 deletions pkg/cloudprovider/huaweicloud/huaweicloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"fmt"
"io"
"os"
"sync"
"time"

ecsmodel "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model"
vpcmodel "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/vpc/v2/model"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/api/core/v1"
Expand All @@ -43,6 +45,7 @@ import (
"k8s.io/cloud-provider"
"k8s.io/cloud-provider/options"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

"sigs.k8s.io/cloud-provider-huaweicloud/pkg/cloudprovider/huaweicloud/wrapper"
"sigs.k8s.io/cloud-provider-huaweicloud/pkg/config"
Expand Down Expand Up @@ -96,11 +99,15 @@ const (
ProtocolHTTP = "HTTP"
ProtocolHTTPS = "HTTPS"
ProtocolTerminatedHTTPS = "TERMINATED_HTTPS"

healthCheckCidr = "100.125.0.0/16"
)

type ELBProtocol string
type ELBAlgorithm string

var healthCheckCidrOptLock = &sync.Mutex{}

type Basic struct {
cloudControllerManagerOpts *options.CloudControllerManagerOptions
cloudConfig *config.CloudConfig
Expand All @@ -114,6 +121,7 @@ type Basic struct {
dedicatedELBClient *wrapper.DedicatedLoadBalanceClient
eipClient *wrapper.EIpClient
ecsClient *wrapper.EcsClient
vpcClient *wrapper.VpcClient

restConfig *rest.Config
kubeClient *corev1.CoreV1Client
Expand Down Expand Up @@ -171,6 +179,99 @@ func (b Basic) getNodeSubnetID(node *v1.Node) (string, error) {
return "", fmt.Errorf("failed to get node subnet ID")
}

func (b Basic) allowHealthCheckRule(node *v1.Node) error {
// Avoid adding security group rules in parallel.
healthCheckCidrOptLock.Lock()
defer func() {
healthCheckCidrOptLock.Unlock()
}()

instance, err := b.ecsClient.GetByName(node.Name)
if err != nil {
return err
}

secGroups, err := b.ecsClient.ListSecurityGroups(instance.Id)
if err != nil {
return err
}
if len(secGroups) == 0 {
klog.Warningf("not found any security groups on %s", node.Name)
return nil
}

for _, sg := range secGroups {
rules, err := b.vpcClient.ListSecurityGroupRules(sg.Id)
if err != nil {
return fmt.Errorf("failed to list security group[%s] rules: %s", sg.Id, err)
}

for _, r := range rules {
if r.Direction == "ingress" && r.RemoteIpPrefix == healthCheckCidr &&
r.Ethertype == "IPv4" && r.PortRangeMin == 0 && r.PortRangeMax == 0 {
klog.Infof("the health check IP is already in the security group, no need to add rules")
return nil
}
}
}

desc := fmt.Sprintf("DO NOT EDIT. %s are internal IP addresses used by ELB to check the health of backend"+
" servers. Created by K8s CCM.", healthCheckCidr)

securityGroupID := secGroups[0].Id
_, err = b.vpcClient.CreateSecurityGroupRule(&vpcmodel.CreateSecurityGroupRuleOption{
SecurityGroupId: securityGroupID,
Description: &desc,
Direction: "ingress",
Ethertype: pointer.String("IPv4"),
RemoteIpPrefix: pointer.String(healthCheckCidr),
})

if err != nil {
return fmt.Errorf("failed to create security group[%s] rules: %s", securityGroupID, err)
}

return err
}

func (b Basic) removeHealthCheckRules(node *v1.Node) error {
instance, err := b.ecsClient.GetByName(node.Name)
if err != nil {
return err
}

secGroups, err := b.ecsClient.ListSecurityGroups(instance.Id)
if err != nil {
return err
}
if len(secGroups) == 0 {
klog.Warningf("not found any security groups on %s", node.Name)
return nil
}

desc := fmt.Sprintf("DO NOT EDIT. %s are internal IP addresses used by ELB to check the health of backend"+
" servers. Created by K8s CCM.", healthCheckCidr)

for _, sg := range secGroups {
rules, err := b.vpcClient.ListSecurityGroupRules(sg.Id)
if err != nil {
return fmt.Errorf("failed to list security group[%s] rules: %s", sg.Id, err)
}

for _, r := range rules {
if r.Direction == "ingress" && r.RemoteIpPrefix == healthCheckCidr &&
r.Ethertype == "IPv4" && r.PortRangeMin == 0 && r.PortRangeMax == 0 && r.Description == desc {
err := b.vpcClient.DeleteSecurityGroupRule(r.Id)
if err != nil {
klog.Errorf("failed to delete security group[%s] rule[%s]", sg.Id, r.Id)
}
}
}
}

return nil
}

type CloudProvider struct {
Basic
providers map[LoadBalanceVersion]cloudprovider.LoadBalancer
Expand Down Expand Up @@ -240,6 +341,7 @@ func NewHWSCloud(cfg io.Reader) (*CloudProvider, error) {
dedicatedELBClient: &wrapper.DedicatedLoadBalanceClient{AuthOpts: &cloudConfig.AuthOpts},
eipClient: &wrapper.EIpClient{AuthOpts: &cloudConfig.AuthOpts},
ecsClient: &wrapper.EcsClient{AuthOpts: &cloudConfig.AuthOpts},
vpcClient: &wrapper.VpcClient{AuthOpts: &cloudConfig.AuthOpts},

restConfig: restConfig,
kubeClient: kubeClient,
Expand Down Expand Up @@ -454,18 +556,20 @@ func IsPodActive(p v1.Pod) bool {
return false
}

type EndpointSliceListener struct {
type LoadBalancerServiceListener struct {
stopChannel chan struct{}
kubeClient *corev1.CoreV1Client
mutexLock *mutexkv.MutexKV

serviceCache map[string]*v1.Service
}

func (e *EndpointSliceListener) stopListenerSlice() {
func (e *LoadBalancerServiceListener) stopListenerSlice() {
klog.Warningf("Stop listening to Endpoints")
close(e.stopChannel)
}

func (e *EndpointSliceListener) startEndpointListener(handle func(*v1.Service)) {
func (e *LoadBalancerServiceListener) startEndpointListener(handle func(*v1.Service)) {
klog.Infof("starting EndpointListener")
for {
endpointsList, err := e.kubeClient.Endpoints(metav1.NamespaceAll).
Expand Down Expand Up @@ -534,7 +638,7 @@ func (e *EndpointSliceListener) startEndpointListener(handle func(*v1.Service))
klog.Infof("EndpointListener started")
}

func (e *EndpointSliceListener) dispatcher(namespace, name string, handle func(*v1.Service)) {
func (e *LoadBalancerServiceListener) dispatcher(namespace, name string, handle func(*v1.Service)) {
key := fmt.Sprintf("%s/%s", namespace, name)
e.mutexLock.Lock(key)
defer e.mutexLock.Unlock(key)
Expand All @@ -546,10 +650,78 @@ func (e *EndpointSliceListener) dispatcher(namespace, name string, handle func(*
handle(svc)
}

func (e *LoadBalancerServiceListener) autoRemoveHealthCheckRule(handle func(node *v1.Node) error) {
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return e.kubeClient.Services(metav1.NamespaceAll).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return e.kubeClient.Services(metav1.NamespaceAll).Watch(context.Background(), options)
},
},
&v1.Service{},
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

_, err := informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
service := obj.(*v1.Service)
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return
}

key := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
e.serviceCache[key] = service
klog.V(6).Infof("new LoadBalancer service %s/%s added, cache size: %v",
service.Namespace, service.Name, len(e.serviceCache))
},
UpdateFunc: func(oldObj, newObj interface{}) {},
DeleteFunc: func(obj interface{}) {
service := obj.(*v1.Service)
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return
}
key := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
delete(e.serviceCache, key)
klog.V(6).Infof("found LoadBalancer service %s/%s deleted, cache size: %v",
service.Namespace, service.Name, len(e.serviceCache))

if len(e.serviceCache) > 0 {
klog.V(6).Infof("found %v LoadBalancer service(s), "+
"skip clearing the security group rules for ELB health check", len(e.serviceCache))
return
}

nodes, err := e.kubeClient.Nodes().List(context.TODO(), metav1.ListOptions{
Limit: 1,
})
if err != nil {
klog.Errorf("failed to query a list of nodes in autoRemoveHealthCheckRule: %s", err)
}

if len(nodes.Items) <= 0 {
klog.Warningf("not found any nodes, skip clearing the security group rules for ELB health check")
return
}
klog.Infof("all LoadBalancer services has been deleted, start to clean health check rules")
n := nodes.Items[0]
handle(&n) //nolint:errcheck
},
}, 5*time.Second)
if err != nil {
klog.Errorf("failed to watch service: %s", err)
}

informer.Run(e.stopChannel)
}

func (h *CloudProvider) listenerDeploy() error {
listener := EndpointSliceListener{
kubeClient: h.kubeClient,
mutexLock: mutexkv.NewMutexKV(),
listener := LoadBalancerServiceListener{
kubeClient: h.kubeClient,
mutexLock: mutexkv.NewMutexKV(),
serviceCache: make(map[string]*v1.Service, 0),
}

clusterName := h.cloudControllerManagerOpts.KubeCloudShared.ClusterName
Expand All @@ -559,6 +731,8 @@ func (h *CloudProvider) listenerDeploy() error {
}

go leaderElection(id, h.restConfig, h.eventRecorder, func(ctx context.Context) {
go listener.autoRemoveHealthCheckRule(h.removeHealthCheckRules)

listener.startEndpointListener(func(service *v1.Service) {
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return
Expand Down Expand Up @@ -589,9 +763,9 @@ func (h *CloudProvider) listenerDeploy() error {

func leaderElection(id string, restConfig *rest.Config, recorder record.EventRecorder, onSuccess func(context.Context), onStop func()) {
leaseName := "endpoint-slice-listener"
leaseDuration := 30 * time.Second
renewDeadline := 20 * time.Second
retryPeriod := 10 * time.Second
leaseDuration := 60 * time.Second
renewDeadline := 50 * time.Second
retryPeriod := 30 * time.Second

configmapLock, err := resourcelock.NewFromKubeconfig(resourcelock.ConfigMapsLeasesResourceLock,
config.ProviderNamespace,
Expand All @@ -613,11 +787,11 @@ func leaderElection(id string, restConfig *rest.Config, recorder record.EventRec
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.V(4).Infof("[Listener EndpointSlices] leader election got: %s", id)
klog.V(6).Infof("[Listener EndpointSlices] leader election got: %s", id)
onSuccess(ctx)
},
OnStoppedLeading: func() {
klog.V(4).Infof("[Listener EndpointSlices] leader election lost: %s", id)
klog.Infof("[Listener EndpointSlices] leader election lost: %s", id)
onStop()
},
},
Expand Down
Loading

0 comments on commit 4581e69

Please sign in to comment.