From 611a74dbd9307f5259ee627b6b410755159c39c5 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 28 Aug 2019 18:01:05 +0100 Subject: [PATCH 1/7] Add service subselector support in vs/vsr --- deployments/helm-chart/templates/rbac.yaml | 1 + deployments/rbac/rbac.yaml | 1 + docs/virtualserver-and-virtualserverroute.md | 3 + internal/k8s/controller.go | 153 ++++++++++++++---- internal/k8s/controller_test.go | 92 ++++++++++- internal/k8s/utils.go | 17 +- pkg/apis/configuration/v1alpha1/types.go | 45 +++--- .../v1alpha1/zz_generated.deepcopy.go | 7 + .../configuration/validation/validation.go | 29 +++- .../validation/validation_test.go | 29 ++++ 10 files changed, 319 insertions(+), 58 deletions(-) diff --git a/deployments/helm-chart/templates/rbac.yaml b/deployments/helm-chart/templates/rbac.yaml index 1646184888..4b8ba12a7e 100644 --- a/deployments/helm-chart/templates/rbac.yaml +++ b/deployments/helm-chart/templates/rbac.yaml @@ -41,6 +41,7 @@ rules: - pods verbs: - list + - watch - apiGroups: - "" resources: diff --git a/deployments/rbac/rbac.yaml b/deployments/rbac/rbac.yaml index c333b6df20..c8c795de85 100644 --- a/deployments/rbac/rbac.yaml +++ b/deployments/rbac/rbac.yaml @@ -36,6 +36,7 @@ rules: - pods verbs: - list + - watch - apiGroups: - "" resources: diff --git a/docs/virtualserver-and-virtualserverroute.md b/docs/virtualserver-and-virtualserverroute.md index 77ee5ee859..5f21b23271 100644 --- a/docs/virtualserver-and-virtualserverroute.md +++ b/docs/virtualserver-and-virtualserverroute.md @@ -181,6 +181,8 @@ The upstream defines a destination for the routing configuration. For example: ```yaml name: tea service: tea-svc +subselector: + version: canary port: 80 lb-method: round_robin fail-timeout: 10s @@ -204,6 +206,7 @@ tls: | ----- | ----------- | ---- | -------- | | `name` | The name of the upstream. Must be a valid DNS label as defined in RFC 1035. For example, `hello` and `upstream-123` are valid. The name must be unique among all upstreams of the resource. | `string` | Yes | | `service` | The name of a [service](https://kubernetes.io/docs/concepts/services-networking/service/). The service must belong to the same namespace as the resource. If the service doesn't exist, NGINX will assume the service has zero endpoints and return a `502` response for requests for this upstream. For NGINX Plus only, services of type [ExternalName](https://kubernetes.io/docs/concepts/services-networking/service/#externalname) are also supported (check the [prerequisites](../examples/externalname-services#prerequisites)). | `string` | Yes | +| `subselector` | Selects the pods within the service using label keys and values. By default, all pods of the service are selected. Note: the specified labels are expected to be present in the pods when they are created. If the pod labels are updated, the Ingress Controller will not see that change until the number of the pods is changed. | `map[string]string` | No | | `port` | The port of the service. If the service doesn't define that port, NGINX will assume the service has zero endpoints and return a `502` response for requests for this upstream. The port must fall into the range `1..65553`. | `uint16` | Yes | | `lb-method` | The load [balancing method](https://docs.nginx.com/nginx/admin-guide/load-balancer/http-load-balancer/#choosing-a-load-balancing-method). To use the round-robin method, specify `round_robin`. The default is specified in the `lb-method` ConfigMap key. | `string` | No | | `fail-timeout` | The time during which the specified number of unsuccessful attempts to communicate with an upstream server should happen to consider the server unavailable. See the [fail_timeout](https://nginx.org/en/docs/http/ngx_http_upstream_module.html#fail_timeout) parameter of the server directive. The default is set in the `fail-timeout` ConfigMap key. | `string` | No | diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 345794b166..8e3ec433ea 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -64,10 +64,12 @@ type LoadBalancerController struct { secretController cache.Controller virtualServerController cache.Controller virtualServerRouteController cache.Controller + podController cache.Controller ingressLister storeToIngressLister svcLister cache.Store endpointLister storeToEndpointLister configMapLister storeToConfigMapLister + podLister indexerToPodLister secretLister storeToSecretLister virtualServerLister cache.Store virtualServerRouteLister cache.Store @@ -164,6 +166,7 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc lbc.addIngressHandler(createIngressHandlers(lbc)) lbc.addServiceHandler(createServiceHandlers(lbc)) lbc.addEndpointHandler(createEndpointHandlers(lbc)) + lbc.addPodHandler() if lbc.areCustomResourcesEnabled { lbc.addVirtualServerHandler(createVirtualServerHandlers(lbc)) @@ -276,6 +279,20 @@ func (lbc *LoadBalancerController) addConfigMapHandler(handlers cache.ResourceEv ) } +func (lbc *LoadBalancerController) addPodHandler() { + lbc.podLister.Indexer, lbc.podController = cache.NewIndexerInformer( + cache.NewListWatchFromClient( + lbc.client.CoreV1().RESTClient(), + "pods", + lbc.namespace, + fields.Everything()), + &api_v1.Pod{}, + lbc.resync, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{}, + ) +} + func (lbc *LoadBalancerController) addVirtualServerHandler(handlers cache.ResourceEventHandlerFuncs) { lbc.virtualServerLister, lbc.virtualServerController = cache.NewInformer( cache.NewListWatchFromClient( @@ -310,6 +327,7 @@ func (lbc *LoadBalancerController) Run() { go lbc.leaderElector.Run(lbc.ctx) } go lbc.svcController.Run(lbc.ctx.Done()) + go lbc.podController.Run(lbc.ctx.Done()) go lbc.endpointController.Run(lbc.ctx.Done()) go lbc.secretController.Run(lbc.ctx.Done()) if lbc.watchNginxConfigMaps { @@ -1555,13 +1573,23 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp for _, u := range virtualServer.Spec.Upstreams { endpointsKey := configs.GenerateEndpointsKey(virtualServer.Namespace, u.Service, u.Port) - endps, external, err := lbc.getEndpointsForUpstream(virtualServer.Namespace, u) - if err != nil { - glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) + + var endps []string + var err error + + if len(u.Subselector) > 0 { + endps, err = lbc.getEndpointsForSubselector(virtualServer.Namespace, u) + } else { + var external bool + endps, external, err = lbc.getEndpointsForUpstream(virtualServer.Namespace, u) + + if err == nil && external && lbc.isNginxPlus { + externalNameSvcs[configs.GenerateExternalNameSvcKey(virtualServer.Namespace, u.Service)] = true + } } - if err == nil && external && lbc.isNginxPlus { - externalNameSvcs[configs.GenerateExternalNameSvcKey(virtualServer.Namespace, u.Service)] = true + if err != nil { + glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) } endpoints[endpointsKey] = endps @@ -1609,15 +1637,25 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp for _, u := range vsr.Spec.Upstreams { endpointsKey := configs.GenerateEndpointsKey(vsr.Namespace, u.Service, u.Port) - endps, external, err := lbc.getEndpointsForUpstream(vsr.Namespace, u) - if err != nil { - glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) - } - if err == nil && external && lbc.isNginxPlus { - externalNameSvcs[configs.GenerateExternalNameSvcKey(vsr.Namespace, u.Service)] = true - } + var endps []string + var err error + if len(u.Subselector) > 0 { + endps, err = lbc.getEndpointsForSubselector(vsr.Namespace, u) + if err != nil { + glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) + } + } else { + var external bool + endps, external, err = lbc.getEndpointsForUpstream(vsr.Namespace, u) + if err != nil { + glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) + } + if err == nil && external && lbc.isNginxPlus { + externalNameSvcs[configs.GenerateExternalNameSvcKey(vsr.Namespace, u.Service)] = true + } + } endpoints[endpointsKey] = endps } } @@ -1629,7 +1667,7 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp return &virtualServerEx, virtualServerRouteErrors } -func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, upstream conf_v1alpha1.Upstream) (result []string, isExternal bool, err error) { +func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, upstream conf_v1alpha1.Upstream) (endps []string, isExternal bool, err error) { svc, err := lbc.getServiceForUpstream(upstream, namespace) if err != nil { return nil, false, fmt.Errorf("Error getting service %v: %v", upstream.Service, err) @@ -1640,7 +1678,7 @@ func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, ups ServicePort: intstr.FromInt(int(upstream.Port)), } - endps, isExternal, err := lbc.getEndpointsForIngressBackend(backend, svc) + endps, isExternal, err = lbc.getEndpointsForIngressBackend(backend, svc) if err != nil { return nil, false, fmt.Errorf("Error retrieving endpoints for the service %v: %v", upstream.Service, err) } @@ -1648,13 +1686,71 @@ func (lbc *LoadBalancerController) getEndpointsForUpstream(namespace string, ups return endps, isExternal, err } -func (lbc *LoadBalancerController) getPodsForIngressBackend(svc *api_v1.Service) *api_v1.PodList { - pods, err := lbc.client.CoreV1().Pods(svc.Namespace).List(meta_v1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).String()}) +func (lbc *LoadBalancerController) getEndpointsForSubselector(namespace string, upstream conf_v1alpha1.Upstream) (endps []string, err error) { + svc, err := lbc.getServiceForUpstream(upstream, namespace) if err != nil { - glog.V(3).Infof("Error fetching pods for namespace %v: %v", svc.Namespace, err) - return nil + return nil, fmt.Errorf("Error getting service %v: %v", upstream.Service, err) } - return pods + + var targetPort int32 + + for _, port := range svc.Spec.Ports { + if port.Port == int32(upstream.Port) { + targetPort, err = lbc.getTargetPort(&port, svc) + if err != nil { + return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", upstream.Port, err) + } + break + } + } + + if targetPort == 0 { + return nil, fmt.Errorf("No port %v in service %s", upstream.Port, svc.Name) + } + + endps, err = lbc.getEndpointsForServiceWithSubselector(targetPort, upstream.Subselector, svc) + if err != nil { + return nil, fmt.Errorf("Error retrieving endpoints for the service %v: %v", upstream.Service, err) + } + + return endps, err +} + +func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetPort int32, subselector map[string]string, svc *api_v1.Service) (endps []string, err error) { + pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Merge(svc.Spec.Selector, subselector).AsSelector()) + if err != nil { + return nil, fmt.Errorf("Error getting pods in namespace %v that match the selector %v: %v", svc.Namespace, labels.Merge(svc.Spec.Selector, subselector), err) + } + + svcEps, err := lbc.endpointLister.GetServiceEndpoints(svc) + if err != nil { + glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) + return nil, err + } + + endps = getEndpointsBySubselectedPods(targetPort, pods, svcEps) + return endps, nil +} + +func getEndpointsBySubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEps api_v1.Endpoints) (endps []string) { + for _, pod := range pods { + podEp := fmt.Sprintf("%v:%v", pod.Status.PodIP, targetPort) + + for _, subset := range svcEps.Subsets { + for _, port := range subset.Ports { + if port.Port != targetPort { + continue + } + for _, address := range subset.Addresses { + svcEp := fmt.Sprintf("%v:%v", address.IP, port.Port) + if svcEp == podEp { + endps = append(endps, podEp) + } + } + } + } + } + return endps } func (lbc *LoadBalancerController) getHealthChecksForIngressBackend(backend *extensions.IngressBackend, namespace string) *api_v1.Probe { @@ -1667,14 +1763,15 @@ func (lbc *LoadBalancerController) getHealthChecksForIngressBackend(backend *ext if svcPort == nil { return nil } - pods := lbc.getPodsForIngressBackend(svc) - if pods == nil { + pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) + if err != nil { + glog.V(3).Infof("Error fetching pods for namespace %v: %v", svc.Namespace, err) return nil } - return findProbeForPods(pods.Items, svcPort) + return findProbeForPods(pods, svcPort) } -func findProbeForPods(pods []api_v1.Pod, svcPort *api_v1.ServicePort) *api_v1.Probe { +func findProbeForPods(pods []*api_v1.Pod, svcPort *api_v1.ServicePort) *api_v1.Probe { if len(pods) > 0 { pod := pods[0] for _, container := range pod.Spec.Containers { @@ -1736,7 +1833,6 @@ func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extens func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, ingSvcPort intstr.IntOrString, svc *api_v1.Service) ([]string, error) { var targetPort int32 var err error - found := false for _, port := range svc.Spec.Ports { if (ingSvcPort.Type == intstr.Int && port.Port == int32(ingSvcPort.IntValue())) || (ingSvcPort.Type == intstr.String && port.Name == ingSvcPort.String()) { @@ -1744,12 +1840,11 @@ func (lbc *LoadBalancerController) getEndpointsForPort(endps api_v1.Endpoints, i if err != nil { return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", ingSvcPort, err) } - found = true break } } - if !found { + if targetPort == 0 { return nil, fmt.Errorf("No port %v in service %s", ingSvcPort, svc.Name) } @@ -1787,16 +1882,16 @@ func (lbc *LoadBalancerController) getTargetPort(svcPort *api_v1.ServicePort, sv return int32(svcPort.TargetPort.IntValue()), nil } - pods, err := lbc.client.CoreV1().Pods(svc.Namespace).List(meta_v1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).String()}) + pods, err := lbc.podLister.ListByNamespace(svc.Namespace, labels.Set(svc.Spec.Selector).AsSelector()) if err != nil { return 0, fmt.Errorf("Error getting pod information: %v", err) } - if len(pods.Items) == 0 { + if len(pods) == 0 { return 0, fmt.Errorf("No pods of service %s", svc.Name) } - pod := &pods.Items[0] + pod := pods[0] portNum, err := findPort(pod, svcPort) if err != nil { diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 0039ad22a2..7a1ff31edb 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -7,11 +7,10 @@ import ( "time" "unsafe" - "github.com/nginxinc/kubernetes-ingress/internal/configs/version2" - "github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors" - "github.com/nginxinc/kubernetes-ingress/internal/configs" "github.com/nginxinc/kubernetes-ingress/internal/configs/version1" + "github.com/nginxinc/kubernetes-ingress/internal/configs/version2" + "github.com/nginxinc/kubernetes-ingress/internal/metrics/collectors" "github.com/nginxinc/kubernetes-ingress/internal/nginx" conf_v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" v1 "k8s.io/api/core/v1" @@ -755,7 +754,7 @@ func TestComparePorts(t *testing.T) { } func TestFindProbeForPods(t *testing.T) { - pods := []v1.Pod{ + pods := []*v1.Pod{ { Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -1518,3 +1517,88 @@ func TestFormatWarningsMessages(t *testing.T) { t.Errorf("formatWarningMessages(%v) returned %v but expected %v", warnings, result, expected) } } + +func TestGetHealthyEndpointsBySubselectedPods(t *testing.T) { + type args struct { + targetPort int32 + pods []*v1.Pod + svcEps v1.Endpoints + } + tests := []struct { + name string + args args + wantEndps []string + }{ + { + name: "find one endpoint", + args: args{ + targetPort: 80, + pods: []*v1.Pod{ + { + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + }, + svcEps: v1.Endpoints{ + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "1.2.3.4", + Hostname: "asdf.com", + }, + }, + Ports: []v1.EndpointPort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + wantEndps: []string{ + "1.2.3.4:80", + }, + }, + { + name: "targetPort mismatch", + args: args{ + targetPort: 21, + pods: []*v1.Pod{ + { + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + }, + svcEps: v1.Endpoints{ + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "1.2.3.4", + Hostname: "asdf.com", + }, + }, + Ports: []v1.EndpointPort{ + { + Port: 80, + }, + }, + }, + }, + }, + }, + wantEndps: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotEndps := getEndpointsBySubselectedPods(tt.args.targetPort, tt.args.pods, tt.args.svcEps); !reflect.DeepEqual(gotEndps, tt.wantEndps) { + t.Errorf("getEndpointsBySubselectedPods() = %v, want %v", gotEndps, tt.wantEndps) + } + }) + } +} diff --git a/internal/k8s/utils.go b/internal/k8s/utils.go index ce9bddc6fe..e62d85ffb5 100644 --- a/internal/k8s/utils.go +++ b/internal/k8s/utils.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" ) @@ -95,6 +96,19 @@ func (s *storeToConfigMapLister) List() (cfgm v1.ConfigMapList, err error) { return cfgm, nil } +// indexerToPodLister makes a Indexer that lists Pods. +type indexerToPodLister struct { + cache.Indexer +} + +// ListByNamespace lists all Pods in the indexer for a given namespace that match the provided selector. +func (ipl indexerToPodLister) ListByNamespace(ns string, selector labels.Selector) (pods []*v1.Pod, err error) { + err = cache.ListAllByNamespace(ipl.Indexer, ns, selector, func(m interface{}) { + pods = append(pods, m.(*v1.Pod)) + }) + return pods, err +} + // storeToEndpointLister makes a Store that lists Endponts type storeToEndpointLister struct { cache.Store @@ -108,8 +122,7 @@ func (s *storeToEndpointLister) GetServiceEndpoints(svc *v1.Service) (ep v1.Endp return ep, nil } } - err = fmt.Errorf("could not find endpoints for service: %v", svc.Name) - return + return ep, fmt.Errorf("could not find endpoints for service: %v", svc.Name) } // findPort locates the container port for the given pod and portName. If the diff --git a/pkg/apis/configuration/v1alpha1/types.go b/pkg/apis/configuration/v1alpha1/types.go index de9cefcb75..21b36d680b 100644 --- a/pkg/apis/configuration/v1alpha1/types.go +++ b/pkg/apis/configuration/v1alpha1/types.go @@ -25,28 +25,29 @@ type VirtualServerSpec struct { // Upstream defines an upstream. type Upstream struct { - Name string `json:"name"` - Service string `json:"service"` - Port uint16 `json:"port"` - LBMethod string `json:"lb-method"` - FailTimeout string `json:"fail-timeout"` - MaxFails *int `json:"max-fails"` - MaxConns *int `json:"max-conns"` - Keepalive *int `json:"keepalive"` - ProxyConnectTimeout string `json:"connect-timeout"` - ProxyReadTimeout string `json:"read-timeout"` - ProxySendTimeout string `json:"send-timeout"` - ProxyNextUpstream string `json:"next-upstream"` - ProxyNextUpstreamTimeout string `json:"next-upstream-timeout"` - ProxyNextUpstreamTries int `json:"next-upstream-tries"` - ProxyBuffering *bool `json:"buffering"` - ProxyBuffers *UpstreamBuffers `json:"buffers"` - ProxyBufferSize string `json:"buffer-size"` - ClientMaxBodySize string `json:"client-max-body-size"` - TLS UpstreamTLS `json:"tls"` - HealthCheck *HealthCheck `json:"healthCheck"` - SlowStart string `json:"slow-start"` - Queue *UpstreamQueue `json:"queue"` + Name string `json:"name"` + Service string `json:"service"` + Subselector map[string]string `json:"subselector"` + Port uint16 `json:"port"` + LBMethod string `json:"lb-method"` + FailTimeout string `json:"fail-timeout"` + MaxFails *int `json:"max-fails"` + MaxConns *int `json:"max-conns"` + Keepalive *int `json:"keepalive"` + ProxyConnectTimeout string `json:"connect-timeout"` + ProxyReadTimeout string `json:"read-timeout"` + ProxySendTimeout string `json:"send-timeout"` + ProxyNextUpstream string `json:"next-upstream"` + ProxyNextUpstreamTimeout string `json:"next-upstream-timeout"` + ProxyNextUpstreamTries int `json:"next-upstream-tries"` + ProxyBuffering *bool `json:"buffering"` + ProxyBuffers *UpstreamBuffers `json:"buffers"` + ProxyBufferSize string `json:"buffer-size"` + ClientMaxBodySize string `json:"client-max-body-size"` + TLS UpstreamTLS `json:"tls"` + HealthCheck *HealthCheck `json:"healthCheck"` + SlowStart string `json:"slow-start"` + Queue *UpstreamQueue `json:"queue"` } // UpstreamBuffers defines Buffer Configuration for an Upstream diff --git a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go index 4b4be791ca..a9c2c14910 100644 --- a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go @@ -176,6 +176,13 @@ func (in *TLS) DeepCopy() *TLS { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Upstream) DeepCopyInto(out *Upstream) { *out = *in + if in.Subselector != nil { + in, out := &in.Subselector, &out.Subselector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.MaxFails != nil { in, out := &in.MaxFails, &out.MaxFails *out = new(int) diff --git a/pkg/apis/configuration/validation/validation.go b/pkg/apis/configuration/validation/validation.go index 807a6e6b4b..2927e33a80 100644 --- a/pkg/apis/configuration/validation/validation.go +++ b/pkg/apis/configuration/validation/validation.go @@ -356,7 +356,7 @@ func validateUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isP } allErrs = append(allErrs, validateServiceName(u.Service, idxPath.Child("service"))...) - + allErrs = append(allErrs, validateLabels(u.Subselector, idxPath.Child("subselector"))...) allErrs = append(allErrs, validateTime(u.ProxyConnectTimeout, idxPath.Child("connect-timeout"))...) allErrs = append(allErrs, validateTime(u.ProxyReadTimeout, idxPath.Child("read-timeout"))...) allErrs = append(allErrs, validateTime(u.ProxySendTimeout, idxPath.Child("send-timeout"))...) @@ -836,3 +836,30 @@ func validateQueue(queue *v1alpha1.UpstreamQueue, fieldPath *field.Path, isPlus return allErrs } + +// isValidLabelName checks if a label name is valid. +// It performs the same validation as ValidateLabelName from k8s.io/apimachinery/pkg/apis/meta/v1/validation/validation.go. +func isValidLabelName(labelName string, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + for _, msg := range validation.IsQualifiedName(labelName) { + allErrs = append(allErrs, field.Invalid(fieldPath, labelName, msg)) + } + + return allErrs +} + +// validateLabels validates that a set of labels are correctly defined. +// It performs the same validation as ValidateLabels from k8s.io/apimachinery/pkg/apis/meta/v1/validation/validation.go. +func validateLabels(labels map[string]string, fieldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + + for labelName, labelValue := range labels { + allErrs = append(allErrs, isValidLabelName(labelName, fieldPath)...) + for _, msg := range validation.IsValidLabelValue(labelValue) { + allErrs = append(allErrs, field.Invalid(fieldPath, labelValue, msg)) + } + } + + return allErrs +} diff --git a/pkg/apis/configuration/validation/validation_test.go b/pkg/apis/configuration/validation/validation_test.go index bbefc13fa2..5634450887 100644 --- a/pkg/apis/configuration/validation/validation_test.go +++ b/pkg/apis/configuration/validation/validation_test.go @@ -144,6 +144,7 @@ func TestValidateUpstreams(t *testing.T) { }, { Name: "upstream2", + Subselector: map[string]string{"version": "test"}, Service: "test-2", Port: 80, ProxyNextUpstream: "error timeout", @@ -353,6 +354,34 @@ func TestValidateUpstreamsFails(t *testing.T) { }, msg: "invalid value for ProxyBufferSize", }, + { + upstreams: []v1alpha1.Upstream{ + { + Name: "upstream1", + Service: "test-1", + Subselector: map[string]string{"\\$invalidkey": "test"}, + Port: 80, + }, + }, + expectedUpstreamNames: map[string]sets.Empty{ + "upstream1": {}, + }, + msg: "invalid key for subselector", + }, + { + upstreams: []v1alpha1.Upstream{ + { + Name: "upstream1", + Service: "test-1", + Subselector: map[string]string{"version": "test=fail"}, + Port: 80, + }, + }, + expectedUpstreamNames: map[string]sets.Empty{ + "upstream1": {}, + }, + msg: "invalid value for subselector", + }, } isPlus := false From 4465f33d5fa523d4c5080284b5f318347153610e Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Mon, 30 Sep 2019 09:55:37 +0100 Subject: [PATCH 2/7] Simplify code. Refactor unit test style --- internal/k8s/controller.go | 19 +++--- internal/k8s/controller_test.go | 104 ++++++++++++-------------------- 2 files changed, 44 insertions(+), 79 deletions(-) diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 8e3ec433ea..140c080f22 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -1642,20 +1642,17 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp var err error if len(u.Subselector) > 0 { endps, err = lbc.getEndpointsForSubselector(vsr.Namespace, u) - if err != nil { - glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) - } } else { var external bool endps, external, err = lbc.getEndpointsForUpstream(vsr.Namespace, u) - if err != nil { - glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) - } if err == nil && external && lbc.isNginxPlus { externalNameSvcs[configs.GenerateExternalNameSvcKey(vsr.Namespace, u.Service)] = true } } + if err != nil { + glog.Warningf("Error getting Endpoints for Upstream %v: %v", u.Name, err) + } endpoints[endpointsKey] = endps } } @@ -1698,7 +1695,7 @@ func (lbc *LoadBalancerController) getEndpointsForSubselector(namespace string, if port.Port == int32(upstream.Port) { targetPort, err = lbc.getTargetPort(&port, svc) if err != nil { - return nil, fmt.Errorf("Error determining target port for port %v in Ingress: %v", upstream.Port, err) + return nil, fmt.Errorf("Error determining target port for port %v in service %v: %v", upstream.Port, svc.Name, err) } break } @@ -1734,17 +1731,15 @@ func (lbc *LoadBalancerController) getEndpointsForServiceWithSubselector(targetP func getEndpointsBySubselectedPods(targetPort int32, pods []*api_v1.Pod, svcEps api_v1.Endpoints) (endps []string) { for _, pod := range pods { - podEp := fmt.Sprintf("%v:%v", pod.Status.PodIP, targetPort) - for _, subset := range svcEps.Subsets { for _, port := range subset.Ports { if port.Port != targetPort { continue } for _, address := range subset.Addresses { - svcEp := fmt.Sprintf("%v:%v", address.IP, port.Port) - if svcEp == podEp { - endps = append(endps, podEp) + if address.IP == pod.Status.PodIP { + podEndpoint := fmt.Sprintf("%v:%v", pod.Status.PodIP, targetPort) + endps = append(endps, podEndpoint) } } } diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 7a1ff31edb..b137031e90 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -1518,86 +1518,56 @@ func TestFormatWarningsMessages(t *testing.T) { } } -func TestGetHealthyEndpointsBySubselectedPods(t *testing.T) { - type args struct { - targetPort int32 - pods []*v1.Pod - svcEps v1.Endpoints - } +func TestGetEndpointsBySubselectedPods(t *testing.T) { tests := []struct { - name string - args args - wantEndps []string + desc string + targetPort int32 + svcEps v1.Endpoints + expectedEps []string }{ { - name: "find one endpoint", - args: args{ - targetPort: 80, - pods: []*v1.Pod{ - { - Status: v1.PodStatus{ - PodIP: "1.2.3.4", - }, - }, - }, - svcEps: v1.Endpoints{ - Subsets: []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{ - { - IP: "1.2.3.4", - Hostname: "asdf.com", - }, - }, - Ports: []v1.EndpointPort{ - { - Port: 80, - }, - }, - }, - }, - }, - }, - wantEndps: []string{ - "1.2.3.4:80", - }, + desc: "find one endpoint", + targetPort: 80, + expectedEps: []string{"1.2.3.4:80"}, + }, + { + desc: "targetPort mismatch", + targetPort: 21, + expectedEps: nil, }, + } + + pods := []*v1.Pod{ { - name: "targetPort mismatch", - args: args{ - targetPort: 21, - pods: []*v1.Pod{ + Status: v1.PodStatus{ + PodIP: "1.2.3.4", + }, + }, + } + + svcEps := v1.Endpoints{ + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ { - Status: v1.PodStatus{ - PodIP: "1.2.3.4", - }, + IP: "1.2.3.4", + Hostname: "asdf.com", }, }, - svcEps: v1.Endpoints{ - Subsets: []v1.EndpointSubset{ - { - Addresses: []v1.EndpointAddress{ - { - IP: "1.2.3.4", - Hostname: "asdf.com", - }, - }, - Ports: []v1.EndpointPort{ - { - Port: 80, - }, - }, - }, + Ports: []v1.EndpointPort{ + { + Port: 80, }, }, }, - wantEndps: nil, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if gotEndps := getEndpointsBySubselectedPods(tt.args.targetPort, tt.args.pods, tt.args.svcEps); !reflect.DeepEqual(gotEndps, tt.wantEndps) { - t.Errorf("getEndpointsBySubselectedPods() = %v, want %v", gotEndps, tt.wantEndps) + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + gotEndps := getEndpointsBySubselectedPods(test.targetPort, pods, svcEps) + if !reflect.DeepEqual(gotEndps, test.expectedEps) { + t.Errorf("getEndpointsBySubselectedPods() = %v, want %v", gotEndps, test.expectedEps) } }) } From 857892c5fcec637302f972e5deba9918d6a5517a Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Tue, 1 Oct 2019 11:26:42 +0100 Subject: [PATCH 3/7] Give Upstreams with subselector unique endpointKey --- internal/configs/virtualserver.go | 9 ++++ internal/configs/virtualserver_test.go | 60 ++++++++++++++++++++++++++ internal/k8s/controller.go | 2 + 3 files changed, 71 insertions(+) diff --git a/internal/configs/virtualserver.go b/internal/configs/virtualserver.go index 897dfd4ff4..02e50c6979 100644 --- a/internal/configs/virtualserver.go +++ b/internal/configs/virtualserver.go @@ -7,6 +7,7 @@ import ( "github.com/golang/glog" "github.com/nginxinc/kubernetes-ingress/internal/nginx" api_v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "github.com/nginxinc/kubernetes-ingress/internal/configs/version2" @@ -50,6 +51,11 @@ func GenerateEndpointsKey(serviceNamespace string, serviceName string, port uint return fmt.Sprintf("%s/%s:%d", serviceNamespace, serviceName, port) } +// GenerateEndpointsKeyWithSubselector generates a key for the Endpoints map in VirtualServerEx. +func GenerateEndpointsKeyWithSubselector(serviceNamespace, serviceName, subselector string, port uint16) string { + return fmt.Sprintf("%s/%s_%s:%d", serviceNamespace, serviceName, subselector, port) +} + type upstreamNamer struct { prefix string } @@ -138,6 +144,9 @@ func newVirtualServerConfigurator(cfgParams *ConfigParams, isPlus bool, isResolv func (vsc *virtualServerConfigurator) generateEndpointsForUpstream(owner runtime.Object, namespace string, upstream conf_v1alpha1.Upstream, virtualServerEx *VirtualServerEx) []string { endpointsKey := GenerateEndpointsKey(namespace, upstream.Service, upstream.Port) + if len(upstream.Subselector) > 0 { + endpointsKey = GenerateEndpointsKeyWithSubselector(namespace, upstream.Service, labels.Set(upstream.Subselector).String(), upstream.Port) + } externalNameSvcKey := GenerateExternalNameSvcKey(namespace, upstream.Service) endpoints := virtualServerEx.Endpoints[endpointsKey] if !vsc.isPlus && len(endpoints) == 0 { diff --git a/internal/configs/virtualserver_test.go b/internal/configs/virtualserver_test.go index 0be964f59b..a941c18df5 100644 --- a/internal/configs/virtualserver_test.go +++ b/internal/configs/virtualserver_test.go @@ -9,6 +9,7 @@ import ( "github.com/nginxinc/kubernetes-ingress/internal/nginx" conf_v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" ) func TestVirtualServerExString(t *testing.T) { @@ -58,6 +59,21 @@ func TestGenerateEndpointsKey(t *testing.T) { } } +func TestGenerateEndpointsKeyWithSubselector(t *testing.T) { + serviceNamespace := "default" + serviceName := "test" + subselector := map[string]string{"version": "v1"} + subselectorKey := labels.Set(subselector) + var port uint16 = 80 + + expected := "default/test_version=v1:80" + + result := GenerateEndpointsKeyWithSubselector(serviceNamespace, serviceName, subselectorKey.String(), port) + if result != expected { + t.Errorf("GenerateEndpointsKey() returned %q but expected %q", result, expected) + } +} + func TestUpstreamNamerForVirtualServer(t *testing.T) { virtualServer := conf_v1alpha1.VirtualServer{ ObjectMeta: meta_v1.ObjectMeta{ @@ -2039,6 +2055,50 @@ func TestGenerateEndpointsForUpstream(t *testing.T) { expected: nil, msg: "Service with no endpoints", }, + { + upstream: conf_v1alpha1.Upstream{ + Service: name, + Port: 8080, + Subselector: map[string]string{"version": "test"}, + }, + vsEx: &VirtualServerEx{ + VirtualServer: &conf_v1alpha1.VirtualServer{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + Endpoints: map[string][]string{ + "test-namespace/test_version=test:8080": {"192.168.10.10:8080"}, + }, + }, + isPlus: false, + isResolverConfigured: false, + expected: []string{"192.168.10.10:8080"}, + msg: "Upstream with subselector, with a matching endpoint", + }, + { + upstream: conf_v1alpha1.Upstream{ + Service: name, + Port: 8080, + Subselector: map[string]string{"version": "test"}, + }, + vsEx: &VirtualServerEx{ + VirtualServer: &conf_v1alpha1.VirtualServer{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + }, + Endpoints: map[string][]string{ + "test-namespace/test:8080": {"192.168.10.10:8080"}, + }, + }, + isPlus: false, + isResolverConfigured: false, + expected: []string{nginx502Server}, + msg: "Upstream with subselector, without a matching endpoint", + }, } for _, test := range tests { diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 140c080f22..4124a794d8 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -1578,6 +1578,7 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp var err error if len(u.Subselector) > 0 { + endpointsKey = configs.GenerateEndpointsKeyWithSubselector(virtualServer.Namespace, u.Service, labels.Set(u.Subselector).String(), u.Port) endps, err = lbc.getEndpointsForSubselector(virtualServer.Namespace, u) } else { var external bool @@ -1641,6 +1642,7 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp var endps []string var err error if len(u.Subselector) > 0 { + endpointsKey = configs.GenerateEndpointsKeyWithSubselector(vsr.Namespace, u.Service, labels.Set(u.Subselector).String(), u.Port) endps, err = lbc.getEndpointsForSubselector(vsr.Namespace, u) } else { var external bool From e557f16962c2b6e1f8d0a3463e72e98a59b1c2cc Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Tue, 1 Oct 2019 14:43:35 +0100 Subject: [PATCH 4/7] Fix subselector endpoints dynamic updates --- internal/configs/virtualserver.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/configs/virtualserver.go b/internal/configs/virtualserver.go index 02e50c6979..4c151a7b1a 100644 --- a/internal/configs/virtualserver.go +++ b/internal/configs/virtualserver.go @@ -766,6 +766,9 @@ func createUpstreamsForPlus(virtualServerEx *VirtualServerEx, baseCfgParams *Con upstreamNamespace := virtualServerEx.VirtualServer.Namespace endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Port) + if len(u.Subselector) > 0 { + endpointsKey = GenerateEndpointsKeyWithSubselector(upstreamNamespace, u.Service, labels.Set(u.Subselector).String(), u.Port) + } endpoints := virtualServerEx.Endpoints[endpointsKey] ups := vsc.generateUpstream(virtualServerEx.VirtualServer, upstreamName, u, isExternalNameSvc, endpoints) @@ -785,6 +788,9 @@ func createUpstreamsForPlus(virtualServerEx *VirtualServerEx, baseCfgParams *Con upstreamNamespace := vsr.Namespace endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Port) + if len(u.Subselector) > 0 { + endpointsKey = GenerateEndpointsKeyWithSubselector(upstreamNamespace, u.Service, labels.Set(u.Subselector).String(), u.Port) + } endpoints := virtualServerEx.Endpoints[endpointsKey] ups := vsc.generateUpstream(vsr, upstreamName, u, isExternalNameSvc, endpoints) From c823a46aa39d64edb5d20ff9a1847ec4f0d1e0f5 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Tue, 1 Oct 2019 15:29:15 +0100 Subject: [PATCH 5/7] GenerateEndpointsKey refactor to include subselector --- internal/configs/virtualserver.go | 25 ++++++------------ internal/configs/virtualserver_test.go | 35 +++++++++++++------------- internal/k8s/controller.go | 6 ++--- 3 files changed, 26 insertions(+), 40 deletions(-) diff --git a/internal/configs/virtualserver.go b/internal/configs/virtualserver.go index 4c151a7b1a..b23094d964 100644 --- a/internal/configs/virtualserver.go +++ b/internal/configs/virtualserver.go @@ -47,15 +47,13 @@ func (vsx *VirtualServerEx) String() string { } // GenerateEndpointsKey generates a key for the Endpoints map in VirtualServerEx. -func GenerateEndpointsKey(serviceNamespace string, serviceName string, port uint16) string { +func GenerateEndpointsKey(serviceNamespace string, serviceName string, subselector map[string]string, port uint16) string { + if len(subselector) > 0 { + return fmt.Sprintf("%s/%s_%s:%d", serviceNamespace, serviceName, labels.Set(subselector).String(), port) + } return fmt.Sprintf("%s/%s:%d", serviceNamespace, serviceName, port) } -// GenerateEndpointsKeyWithSubselector generates a key for the Endpoints map in VirtualServerEx. -func GenerateEndpointsKeyWithSubselector(serviceNamespace, serviceName, subselector string, port uint16) string { - return fmt.Sprintf("%s/%s_%s:%d", serviceNamespace, serviceName, subselector, port) -} - type upstreamNamer struct { prefix string } @@ -143,10 +141,7 @@ func newVirtualServerConfigurator(cfgParams *ConfigParams, isPlus bool, isResolv } func (vsc *virtualServerConfigurator) generateEndpointsForUpstream(owner runtime.Object, namespace string, upstream conf_v1alpha1.Upstream, virtualServerEx *VirtualServerEx) []string { - endpointsKey := GenerateEndpointsKey(namespace, upstream.Service, upstream.Port) - if len(upstream.Subselector) > 0 { - endpointsKey = GenerateEndpointsKeyWithSubselector(namespace, upstream.Service, labels.Set(upstream.Subselector).String(), upstream.Port) - } + endpointsKey := GenerateEndpointsKey(namespace, upstream.Service, upstream.Subselector, upstream.Port) externalNameSvcKey := GenerateExternalNameSvcKey(namespace, upstream.Service) endpoints := virtualServerEx.Endpoints[endpointsKey] if !vsc.isPlus && len(endpoints) == 0 { @@ -765,10 +760,7 @@ func createUpstreamsForPlus(virtualServerEx *VirtualServerEx, baseCfgParams *Con upstreamName := upstreamNamer.GetNameForUpstream(u.Name) upstreamNamespace := virtualServerEx.VirtualServer.Namespace - endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Port) - if len(u.Subselector) > 0 { - endpointsKey = GenerateEndpointsKeyWithSubselector(upstreamNamespace, u.Service, labels.Set(u.Subselector).String(), u.Port) - } + endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Subselector, u.Port) endpoints := virtualServerEx.Endpoints[endpointsKey] ups := vsc.generateUpstream(virtualServerEx.VirtualServer, upstreamName, u, isExternalNameSvc, endpoints) @@ -787,10 +779,7 @@ func createUpstreamsForPlus(virtualServerEx *VirtualServerEx, baseCfgParams *Con upstreamName := upstreamNamer.GetNameForUpstream(u.Name) upstreamNamespace := vsr.Namespace - endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Port) - if len(u.Subselector) > 0 { - endpointsKey = GenerateEndpointsKeyWithSubselector(upstreamNamespace, u.Service, labels.Set(u.Subselector).String(), u.Port) - } + endpointsKey := GenerateEndpointsKey(upstreamNamespace, u.Service, u.Subselector, u.Port) endpoints := virtualServerEx.Endpoints[endpointsKey] ups := vsc.generateUpstream(vsr, upstreamName, u, isExternalNameSvc, endpoints) diff --git a/internal/configs/virtualserver_test.go b/internal/configs/virtualserver_test.go index a941c18df5..fd7d40dd7a 100644 --- a/internal/configs/virtualserver_test.go +++ b/internal/configs/virtualserver_test.go @@ -9,7 +9,6 @@ import ( "github.com/nginxinc/kubernetes-ingress/internal/nginx" conf_v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" ) func TestVirtualServerExString(t *testing.T) { @@ -51,26 +50,26 @@ func TestGenerateEndpointsKey(t *testing.T) { serviceName := "test" var port uint16 = 80 - expected := "default/test:80" - - result := GenerateEndpointsKey(serviceNamespace, serviceName, port) - if result != expected { - t.Errorf("GenerateEndpointsKey() returned %q but expected %q", result, expected) + tests := []struct { + subselector map[string]string + expected string + }{ + { + subselector: nil, + expected: "default/test:80", + }, + { + subselector: map[string]string{"version": "v1"}, + expected: "default/test_version=v1:80", + }, } -} -func TestGenerateEndpointsKeyWithSubselector(t *testing.T) { - serviceNamespace := "default" - serviceName := "test" - subselector := map[string]string{"version": "v1"} - subselectorKey := labels.Set(subselector) - var port uint16 = 80 - - expected := "default/test_version=v1:80" + for _, test := range tests { + result := GenerateEndpointsKey(serviceNamespace, serviceName, test.subselector, port) + if result != test.expected { + t.Errorf("GenerateEndpointsKey() returned %q but expected %q", result, test.expected) + } - result := GenerateEndpointsKeyWithSubselector(serviceNamespace, serviceName, subselectorKey.String(), port) - if result != expected { - t.Errorf("GenerateEndpointsKey() returned %q but expected %q", result, expected) } } diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 4124a794d8..11ecc43c70 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -1572,13 +1572,12 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp externalNameSvcs := make(map[string]bool) for _, u := range virtualServer.Spec.Upstreams { - endpointsKey := configs.GenerateEndpointsKey(virtualServer.Namespace, u.Service, u.Port) + endpointsKey := configs.GenerateEndpointsKey(virtualServer.Namespace, u.Service, u.Subselector, u.Port) var endps []string var err error if len(u.Subselector) > 0 { - endpointsKey = configs.GenerateEndpointsKeyWithSubselector(virtualServer.Namespace, u.Service, labels.Set(u.Subselector).String(), u.Port) endps, err = lbc.getEndpointsForSubselector(virtualServer.Namespace, u) } else { var external bool @@ -1637,12 +1636,11 @@ func (lbc *LoadBalancerController) createVirtualServer(virtualServer *conf_v1alp virtualServerRoutes = append(virtualServerRoutes, vsr) for _, u := range vsr.Spec.Upstreams { - endpointsKey := configs.GenerateEndpointsKey(vsr.Namespace, u.Service, u.Port) + endpointsKey := configs.GenerateEndpointsKey(vsr.Namespace, u.Service, u.Subselector, u.Port) var endps []string var err error if len(u.Subselector) > 0 { - endpointsKey = configs.GenerateEndpointsKeyWithSubselector(vsr.Namespace, u.Service, labels.Set(u.Subselector).String(), u.Port) endps, err = lbc.getEndpointsForSubselector(vsr.Namespace, u) } else { var external bool From f50d601acee5562da852c9b28b7f99d63f94e8ca Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Wed, 2 Oct 2019 15:27:45 +0100 Subject: [PATCH 6/7] Add test coverage for dynamic updates --- internal/configs/virtualserver_test.go | 55 ++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/internal/configs/virtualserver_test.go b/internal/configs/virtualserver_test.go index fd7d40dd7a..3ba72c354c 100644 --- a/internal/configs/virtualserver_test.go +++ b/internal/configs/virtualserver_test.go @@ -189,12 +189,22 @@ func TestGenerateVirtualServerConfig(t *testing.T) { Service: "tea-svc", Port: 80, }, + { + Name: "tea-latest", + Service: "tea-svc", + Subselector: map[string]string{"version": "v1"}, + Port: 80, + }, }, Routes: []conf_v1alpha1.Route{ { Path: "/tea", Upstream: "tea", }, + { + Path: "/tea-latest", + Upstream: "tea-latest", + }, { Path: "/coffee", Route: "default/coffee", @@ -206,9 +216,12 @@ func TestGenerateVirtualServerConfig(t *testing.T) { "default/tea-svc:80": { "10.0.0.20:80", }, - "default/coffee-svc:80": { + "default/tea-svc_version=v1:80": { "10.0.0.30:80", }, + "default/coffee-svc:80": { + "10.0.0.40:80", + }, }, VirtualServerRoutes: []*conf_v1alpha1.VirtualServerRoute{ { @@ -259,7 +272,7 @@ func TestGenerateVirtualServerConfig(t *testing.T) { Keepalive: 16, }, { - Name: "vs_default_cafe_vsr_default_coffee_coffee", + Name: "vs_default_cafe_tea-latest", Servers: []version2.UpstreamServer{ { Address: "10.0.0.30:80", @@ -267,6 +280,15 @@ func TestGenerateVirtualServerConfig(t *testing.T) { }, Keepalive: 16, }, + { + Name: "vs_default_cafe_vsr_default_coffee_coffee", + Servers: []version2.UpstreamServer{ + { + Address: "10.0.0.40:80", + }, + }, + Keepalive: 16, + }, }, Server: version2.Server{ ServerName: "cafe.example.com", @@ -287,6 +309,14 @@ func TestGenerateVirtualServerConfig(t *testing.T) { ProxyNextUpstreamTries: 0, HasKeepalive: true, }, + { + Path: "/tea-latest", + ProxyPass: "http://vs_default_cafe_tea-latest", + ProxyNextUpstream: "error timeout", + ProxyNextUpstreamTimeout: "0s", + ProxyNextUpstreamTries: 0, + HasKeepalive: true, + }, { Path: "/coffee", ProxyPass: "http://vs_default_cafe_vsr_default_coffee_coffee", @@ -1149,6 +1179,12 @@ func TestCreateUpstreamServersForPlus(t *testing.T) { Service: "test-svc", Port: 80, }, + { + Name: "subselector-test", + Service: "test-svc", + Subselector: map[string]string{"it": "works"}, + Port: 80, + }, { Name: "external", Service: "external-svc", @@ -1176,9 +1212,12 @@ func TestCreateUpstreamServersForPlus(t *testing.T) { "10.0.0.20:80", }, "default/test-svc:80": {}, - "default/coffee-svc:80": { + "default/test-svc_it=works:80": { "10.0.0.30:80", }, + "default/coffee-svc:80": { + "10.0.0.40:80", + }, "default/external-svc:80": { "example.com:80", }, @@ -1226,13 +1265,21 @@ func TestCreateUpstreamServersForPlus(t *testing.T) { Servers: nil, }, { - Name: "vs_default_cafe_vsr_default_coffee_coffee", + Name: "vs_default_cafe_subselector-test", Servers: []version2.UpstreamServer{ { Address: "10.0.0.30:80", }, }, }, + { + Name: "vs_default_cafe_vsr_default_coffee_coffee", + Servers: []version2.UpstreamServer{ + { + Address: "10.0.0.40:80", + }, + }, + }, } result := createUpstreamsForPlus(&virtualServerEx, &ConfigParams{}) From 325f43824c3ec471620dd5f9131c05035278b110 Mon Sep 17 00:00:00 2001 From: Dean Coakley Date: Fri, 4 Oct 2019 10:42:26 +0100 Subject: [PATCH 7/7] Add test coverage for subselected routes --- internal/configs/virtualserver_test.go | 47 ++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/internal/configs/virtualserver_test.go b/internal/configs/virtualserver_test.go index 3ba72c354c..d7018e421b 100644 --- a/internal/configs/virtualserver_test.go +++ b/internal/configs/virtualserver_test.go @@ -209,6 +209,10 @@ func TestGenerateVirtualServerConfig(t *testing.T) { Path: "/coffee", Route: "default/coffee", }, + { + Path: "/subtea", + Route: "default/subtea", + }, }, }, }, @@ -222,6 +226,9 @@ func TestGenerateVirtualServerConfig(t *testing.T) { "default/coffee-svc:80": { "10.0.0.40:80", }, + "default/sub-tea-svc_version=v1:80": { + "10.0.0.50:80", + }, }, VirtualServerRoutes: []*conf_v1alpha1.VirtualServerRoute{ { @@ -246,6 +253,29 @@ func TestGenerateVirtualServerConfig(t *testing.T) { }, }, }, + { + ObjectMeta: meta_v1.ObjectMeta{ + Name: "subtea", + Namespace: "default", + }, + Spec: conf_v1alpha1.VirtualServerRouteSpec{ + Host: "cafe.example.com", + Upstreams: []conf_v1alpha1.Upstream{ + { + Name: "subtea", + Service: "sub-tea-svc", + Port: 80, + Subselector: map[string]string{"version": "v1"}, + }, + }, + Subroutes: []conf_v1alpha1.Route{ + { + Path: "/subtea", + Upstream: "subtea", + }, + }, + }, + }, }, } @@ -289,6 +319,15 @@ func TestGenerateVirtualServerConfig(t *testing.T) { }, Keepalive: 16, }, + { + Name: "vs_default_cafe_vsr_default_subtea_subtea", + Servers: []version2.UpstreamServer{ + { + Address: "10.0.0.50:80", + }, + }, + Keepalive: 16, + }, }, Server: version2.Server{ ServerName: "cafe.example.com", @@ -325,6 +364,14 @@ func TestGenerateVirtualServerConfig(t *testing.T) { ProxyNextUpstreamTries: 0, HasKeepalive: true, }, + { + Path: "/subtea", + ProxyPass: "http://vs_default_cafe_vsr_default_subtea_subtea", + ProxyNextUpstream: "error timeout", + ProxyNextUpstreamTimeout: "0s", + ProxyNextUpstreamTries: 0, + HasKeepalive: true, + }, }, }, }