From d9bd9aabdb617d83010efa9c901483115460b9d2 Mon Sep 17 00:00:00 2001 From: Sean O'Neill Date: Fri, 19 Mar 2021 12:17:34 +0000 Subject: [PATCH] Add load balancing method to transport server config --- cmd/nginx-ingress/main.go | 2 +- .../k8s.nginx.org_transportservers.yaml | 147 ++++++++++++++++++ .../crds/k8s.nginx.org_transportservers.yaml | 2 + .../crds/k8s.nginx.org_transportservers.yaml | 2 + .../configuration/transportserver-resource.md | 5 + internal/configs/parsing_helpers.go | 4 +- internal/configs/transportserver.go | 18 ++- internal/configs/transportserver_test.go | 4 + .../version2/nginx-plus.transportserver.tmpl | 4 +- .../version2/nginx.transportserver.tmpl | 4 +- internal/configs/version2/stream.go | 7 +- internal/k8s/configuration_test.go | 2 +- pkg/apis/configuration/v1alpha1/types.go | 13 +- .../validation/transportserver.go | 78 +++++++++- .../validation/transportserver_test.go | 97 +++++++++++- 15 files changed, 366 insertions(+), 23 deletions(-) create mode 100644 deployments/common/crds-v1beta1/k8s.nginx.org_transportservers.yaml diff --git a/cmd/nginx-ingress/main.go b/cmd/nginx-ingress/main.go index c7d8637549..1727df9ef7 100644 --- a/cmd/nginx-ingress/main.go +++ b/cmd/nginx-ingress/main.go @@ -605,7 +605,7 @@ func main() { templateExecutorV2, *nginxPlus, isWildcardEnabled, plusCollector, *enablePrometheusMetrics, latencyCollector, *enableLatencyMetrics) controllerNamespace := os.Getenv("POD_NAMESPACE") - transportServerValidator := cr_validation.NewTransportServerValidator(*enableTLSPassthrough, *enableSnippets) + transportServerValidator := cr_validation.NewTransportServerValidator(*enableTLSPassthrough, *enableSnippets, *nginxPlus) virtualServerValidator := cr_validation.NewVirtualServerValidator(*nginxPlus) lbcInput := k8s.NewLoadBalancerControllerInput{ diff --git a/deployments/common/crds-v1beta1/k8s.nginx.org_transportservers.yaml b/deployments/common/crds-v1beta1/k8s.nginx.org_transportservers.yaml new file mode 100644 index 0000000000..6020412936 --- /dev/null +++ b/deployments/common/crds-v1beta1/k8s.nginx.org_transportservers.yaml @@ -0,0 +1,147 @@ +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.5.0 + creationTimestamp: null + name: transportservers.k8s.nginx.org +spec: + additionalPrinterColumns: + - JSONPath: .status.state + description: Current state of the TransportServer. If the resource has a valid status, it means it has been validated and accepted by the Ingress Controller. + name: State + type: string + - JSONPath: .status.reason + name: Reason + type: string + - JSONPath: .metadata.creationTimestamp + name: Age + type: date + group: k8s.nginx.org + names: + kind: TransportServer + listKind: TransportServerList + plural: transportservers + shortNames: + - ts + singular: transportserver + preserveUnknownFields: false + scope: Namespaced + subresources: + status: {} + validation: + openAPIV3Schema: + description: TransportServer defines the TransportServer resource. + type: object + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: TransportServerSpec is the spec of the TransportServer resource. + type: object + properties: + action: + description: Action defines an action. + type: object + properties: + pass: + type: string + host: + type: string + ingressClassName: + type: string + listener: + description: TransportServerListener defines a listener for a TransportServer. + type: object + properties: + name: + type: string + protocol: + type: string + serverSnippets: + type: string + sessionParameters: + description: SessionParameters defines session parameters. + type: object + properties: + timeout: + type: string + upstreamParameters: + description: UpstreamParameters defines parameters for an upstream. + type: object + properties: + connectTimeout: + type: string + nextUpstream: + type: boolean + nextUpstreamTimeout: + type: string + nextUpstreamTries: + type: integer + udpRequests: + type: integer + udpResponses: + type: integer + upstreams: + type: array + items: + description: Upstream defines an upstream. + type: object + properties: + failTimeout: + type: string + healthCheck: + description: HealthCheck defines the parameters for active Upstream HealthChecks. + type: object + properties: + enable: + type: boolean + fails: + type: integer + interval: + type: string + jitter: + type: string + passes: + type: integer + port: + type: integer + timeout: + type: string + loadBalancingMethod: + type: string + maxFails: + type: integer + name: + type: string + port: + type: integer + service: + type: string + status: + description: TransportServerStatus defines the status for the TransportServer resource. + type: object + properties: + message: + type: string + reason: + type: string + state: + type: string + version: v1alpha1 + versions: + - name: v1alpha1 + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/deployments/common/crds/k8s.nginx.org_transportservers.yaml b/deployments/common/crds/k8s.nginx.org_transportservers.yaml index d1a291ff51..6cc427eb1c 100644 --- a/deployments/common/crds/k8s.nginx.org_transportservers.yaml +++ b/deployments/common/crds/k8s.nginx.org_transportservers.yaml @@ -113,6 +113,8 @@ spec: type: integer timeout: type: string + loadBalancingMethod: + type: string maxFails: type: integer name: diff --git a/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml b/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml index d1a291ff51..6cc427eb1c 100644 --- a/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml +++ b/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml @@ -113,6 +113,8 @@ spec: type: integer timeout: type: string + loadBalancingMethod: + type: string maxFails: type: integer name: diff --git a/docs-web/configuration/transportserver-resource.md b/docs-web/configuration/transportserver-resource.md index 1d1dfa971a..4f2e6b5709 100644 --- a/docs-web/configuration/transportserver-resource.md +++ b/docs-web/configuration/transportserver-resource.md @@ -169,6 +169,7 @@ service: secure-app port: 8443 maxFails: 3 failTimeout: 30s +loadBalancingMethod: least_conn ``` ```eval_rst @@ -203,6 +204,10 @@ failTimeout: 30s - The health check configuration for the Upstream. See the `health_check `_ directive. Note: this feature is supported only in NGINX Plus. - `healthcheck <#upstream-healthcheck>`_ - No + * - ``loadBalancingMethod`` + - The method used to load balance the upstream servers. By default, connections are distributed between the servers using a weighted round-robin balancing method. See the `upstream `_ section for available methods and their details. + - ``string`` + - No ``` diff --git a/internal/configs/parsing_helpers.go b/internal/configs/parsing_helpers.go index d198f080f6..68f1e9a280 100644 --- a/internal/configs/parsing_helpers.go +++ b/internal/configs/parsing_helpers.go @@ -154,12 +154,12 @@ func validateHashLBMethod(method string) (string, error) { keyWords := strings.Split(method, " ") if keyWords[0] == "hash" { - if len(keyWords) == 2 || len(keyWords) == 3 && keyWords[2] == "consistent" { + if len(keyWords) == 2 || (len(keyWords) == 3 && keyWords[2] == "consistent") { return method, nil } } - return "", fmt.Errorf("Invalid load balancing method: %q", method) + return "", fmt.Errorf("invalid load balancing method: %q", method) } // ParseBool ensures that the string value is a valid bool diff --git a/internal/configs/transportserver.go b/internal/configs/transportserver.go index b01b13b159..c60e98518b 100644 --- a/internal/configs/transportserver.go +++ b/internal/configs/transportserver.go @@ -190,7 +190,21 @@ func generateStreamUpstream(upstream *conf_v1alpha1.Upstream, upstreamNamer *ups } return version2.StreamUpstream{ - Name: name, - Servers: upsServers, + Name: name, + Servers: upsServers, + LoadBalancingMethod: generateLoadBalancingMethod(upstream.LoadBalancingMethod), } } + +func generateLoadBalancingMethod(method string) string { + if method == "" { + // By default, if unspecified, Nginx uses the 'round_robin' load balancing method. + // We override this default which suits the Ingress Controller better. + return "random two least_conn" + } + if method == "round_robin" { + // By default, Nginx uses round robin. We select this method by not specifying any method. + return "" + } + return method +} diff --git a/internal/configs/transportserver_test.go b/internal/configs/transportserver_test.go index 3ea91322f5..b1e6c5b28e 100644 --- a/internal/configs/transportserver_test.go +++ b/internal/configs/transportserver_test.go @@ -112,6 +112,7 @@ func TestGenerateTransportServerConfigForTCPSnippets(t *testing.T) { ResourceNamespace: "default", Service: "tcp-app-svc", }, + LoadBalancingMethod: "random two least_conn", }, }, Server: version2.StreamServer{ @@ -196,6 +197,7 @@ func TestGenerateTransportServerConfigForTCP(t *testing.T) { ResourceNamespace: "default", Service: "tcp-app-svc", }, + LoadBalancingMethod: "random two least_conn", }, }, Server: version2.StreamServer{ @@ -279,6 +281,7 @@ func TestGenerateTransportServerConfigForTLSPasstrhough(t *testing.T) { ResourceNamespace: "default", Service: "tcp-app-svc", }, + LoadBalancingMethod: "random two least_conn", }, }, Server: version2.StreamServer{ @@ -368,6 +371,7 @@ func TestGenerateTransportServerConfigForUDP(t *testing.T) { ResourceNamespace: "default", Service: "udp-app-svc", }, + LoadBalancingMethod: "random two least_conn", }, }, Server: version2.StreamServer{ diff --git a/internal/configs/version2/nginx-plus.transportserver.tmpl b/internal/configs/version2/nginx-plus.transportserver.tmpl index 1bed8a9d55..e62d68227c 100644 --- a/internal/configs/version2/nginx-plus.transportserver.tmpl +++ b/internal/configs/version2/nginx-plus.transportserver.tmpl @@ -2,7 +2,9 @@ upstream {{ $u.Name }} { zone {{ $u.Name }} 256k; - random two least_conn; + {{ if $u.LoadBalancingMethod }} + {{ $u.LoadBalancingMethod }}; + {{ end }} {{ range $s := $u.Servers }} server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }}; diff --git a/internal/configs/version2/nginx.transportserver.tmpl b/internal/configs/version2/nginx.transportserver.tmpl index 014f5ee2a2..6f2c42b5ce 100644 --- a/internal/configs/version2/nginx.transportserver.tmpl +++ b/internal/configs/version2/nginx.transportserver.tmpl @@ -2,7 +2,9 @@ upstream {{ $u.Name }} { zone {{ $u.Name }} 256k; - random two least_conn; + {{ if $u.LoadBalancingMethod }} + {{ $u.LoadBalancingMethod }}; + {{ end }} {{ range $s := $u.Servers }} server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }}; diff --git a/internal/configs/version2/stream.go b/internal/configs/version2/stream.go index 4ef2ceaeed..02aeb9589e 100644 --- a/internal/configs/version2/stream.go +++ b/internal/configs/version2/stream.go @@ -8,9 +8,10 @@ type TransportServerConfig struct { // StreamUpstream defines a stream upstream. type StreamUpstream struct { - Name string - Servers []StreamUpstreamServer - UpstreamLabels UpstreamLabels + Name string + Servers []StreamUpstreamServer + UpstreamLabels UpstreamLabels + LoadBalancingMethod string } // StreamUpstreamServer defines a stream upstream server. diff --git a/internal/k8s/configuration_test.go b/internal/k8s/configuration_test.go index 9927a37e45..6c53667f1a 100644 --- a/internal/k8s/configuration_test.go +++ b/internal/k8s/configuration_test.go @@ -33,7 +33,7 @@ func createTestConfiguration() *Configuration { 80: true, 443: true, }), - validation.NewTransportServerValidator(isTLSPassthroughEnabled, snippetsEnabled), + validation.NewTransportServerValidator(isTLSPassthroughEnabled, snippetsEnabled, isPlus), isTLSPassthroughEnabled, ) } diff --git a/pkg/apis/configuration/v1alpha1/types.go b/pkg/apis/configuration/v1alpha1/types.go index 72bc48fb19..e42cf364a1 100644 --- a/pkg/apis/configuration/v1alpha1/types.go +++ b/pkg/apis/configuration/v1alpha1/types.go @@ -84,12 +84,13 @@ type TransportServerListener struct { // Upstream defines an upstream. type Upstream struct { - Name string `json:"name"` - Service string `json:"service"` - Port int `json:"port"` - FailTimeout string `json:"failTimeout"` - MaxFails *int `json:"maxFails"` - HealthCheck *HealthCheck `json:"healthCheck"` + Name string `json:"name"` + Service string `json:"service"` + Port int `json:"port"` + FailTimeout string `json:"failTimeout"` + MaxFails *int `json:"maxFails"` + HealthCheck *HealthCheck `json:"healthCheck"` + LoadBalancingMethod string `json:"loadBalancingMethod"` } // HealthCheck defines the parameters for active Upstream HealthChecks. diff --git a/pkg/apis/configuration/validation/transportserver.go b/pkg/apis/configuration/validation/transportserver.go index 55ba51be07..486e0e045b 100644 --- a/pkg/apis/configuration/validation/transportserver.go +++ b/pkg/apis/configuration/validation/transportserver.go @@ -2,8 +2,9 @@ package validation import ( "fmt" + "strings" - v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" + "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" @@ -13,13 +14,15 @@ import ( type TransportServerValidator struct { tlsPassthrough bool snippetsEnabled bool + isPlus bool } // NewTransportServerValidator creates a new TransportServerValidator. -func NewTransportServerValidator(tlsPassthrough bool, snippetsEnabled bool) *TransportServerValidator { +func NewTransportServerValidator(tlsPassthrough bool, snippetsEnabled bool, isPlus bool) *TransportServerValidator { return &TransportServerValidator{ tlsPassthrough: tlsPassthrough, snippetsEnabled: snippetsEnabled, + isPlus: isPlus, } } @@ -37,7 +40,7 @@ func (tsv *TransportServerValidator) validateTransportServerSpec(spec *v1alpha1. isTLSPassthroughListener := isPotentialTLSPassthroughListener(&spec.Listener) allErrs = append(allErrs, validateTransportServerHost(spec.Host, fieldPath.Child("host"), isTLSPassthroughListener)...) - upstreamErrs, upstreamNames := validateTransportServerUpstreams(spec.Upstreams, fieldPath.Child("upstreams")) + upstreamErrs, upstreamNames := validateTransportServerUpstreams(spec.Upstreams, fieldPath.Child("upstreams"), tsv.isPlus) allErrs = append(allErrs, upstreamErrs...) allErrs = append(allErrs, validateTransportServerUpstreamParameters(spec.UpstreamParameters, fieldPath.Child("upstreamParameters"), spec.Listener.Protocol)...) @@ -144,7 +147,7 @@ func validateListenerProtocol(protocol string, fieldPath *field.Path) field.Erro return allErrs } -func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path) (allErrs field.ErrorList, upstreamNames sets.String) { +func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isPlus bool) (allErrs field.ErrorList, upstreamNames sets.String) { allErrs = field.ErrorList{} upstreamNames = sets.String{} @@ -169,11 +172,78 @@ func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath * } allErrs = append(allErrs, validateTSUpstreamHealthChecks(u.HealthCheck, idxPath.Child("healthChecks"))...) + + allErrs = append(allErrs, validateLoadBalancingMethod(u.LoadBalancingMethod, idxPath.Child("loadBalancingMethod"), isPlus)...) } return allErrs, upstreamNames } +func validateLoadBalancingMethod(method string, fieldPath *field.Path, isPlus bool) field.ErrorList { + allErrs := field.ErrorList{} + if method == "" { + return allErrs + } + + method = strings.TrimSpace(method) + + if strings.HasPrefix(method, "hash") { + err := validateHashLoadBalancingMethod(method) + if err != nil { + allErrs = append(allErrs, field.Invalid(fieldPath, method, err.Error())) + } + return allErrs + } + + validMethodValues := nginxStreamLoadBalanceValidInput + if isPlus { + validMethodValues = nginxPlusStreamLoadBalanceValidInput + } + + if _, exists := validMethodValues[method]; !exists { + return append(allErrs, field.Invalid(fieldPath, method, fmt.Sprintf("load balancing method is not valid: %v", method))) + } + + return allErrs +} + +var nginxStreamLoadBalanceValidInput = map[string]bool{ + "round_robin": true, + "least_conn": true, + "random": true, + "random two": true, + "random two least_conn": true, +} + +var nginxPlusStreamLoadBalanceValidInput = map[string]bool{ + "round_robin": true, + "least_conn": true, + "random": true, + "random two": true, + "random two least_conn": true, + "random least_conn": true, + "least_time connect": true, + "least_time first_byte": true, + "least_time last_byte": true, + "least_time last_byte inflight": true, +} + +func validateHashLoadBalancingMethod(method string) error { + keyWords := strings.Split(method, " ") + if len(keyWords) > 0 && keyWords[0] == "hash" { + if len(keyWords) == 2 || (len(keyWords) == 3 && keyWords[2] == "consistent") { + value := keyWords[1] + if !escapedStringsFmtRegexp.MatchString(value) { + return fmt.Errorf("invalid value for hash: %v", validation.RegexError(escapedStringsErrMsg, escapedStringsFmt)) + } + + return nil + } + } + + return fmt.Errorf("invalid load balancing method: %q", method) +} + func validateTSUpstreamHealthChecks(hc *v1alpha1.HealthCheck, fieldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} diff --git a/pkg/apis/configuration/validation/transportserver_test.go b/pkg/apis/configuration/validation/transportserver_test.go index 75be6769c8..efccf61e7c 100644 --- a/pkg/apis/configuration/validation/transportserver_test.go +++ b/pkg/apis/configuration/validation/transportserver_test.go @@ -99,7 +99,7 @@ func TestValidateTransportServerUpstreams(t *testing.T) { } for _, test := range tests { - allErrs, resultUpstreamNames := validateTransportServerUpstreams(test.upstreams, field.NewPath("upstreams")) + allErrs, resultUpstreamNames := validateTransportServerUpstreams(test.upstreams, field.NewPath("upstreams"), true) if len(allErrs) > 0 { t.Errorf("validateTransportServerUpstreams() returned errors %v for valid input for the case of %s", allErrs, test.msg) } @@ -173,7 +173,7 @@ func TestValidateTransportServerUpstreamsFails(t *testing.T) { } for _, test := range tests { - allErrs, resultUpstreamNames := validateTransportServerUpstreams(test.upstreams, field.NewPath("upstreams")) + allErrs, resultUpstreamNames := validateTransportServerUpstreams(test.upstreams, field.NewPath("upstreams"), true) if len(allErrs) == 0 { t.Errorf("validateTransportServerUpstreams() returned no errors for the case of %s", test.msg) } @@ -206,6 +206,99 @@ func TestValidateTransportServerHost(t *testing.T) { } } +func TestValidateTransportServerLoadBalancingMethod(t *testing.T) { + tests := []struct { + method string + isPlus bool + hasError bool + }{ + { + method: "", + isPlus: false, + hasError: false, + }, + { + method: "", + isPlus: true, + hasError: false, + }, + { + method: "hash", + isPlus: false, + hasError: true, + }, + { + method: "hash test", + isPlus: false, + hasError: false, + }, + { + method: "hash test toomany", + isPlus: false, + hasError: true, + }, + { + method: "hash test consistent", + isPlus: false, + hasError: false, + }, + { + method: "hash test toomany consistent", + isPlus: false, + hasError: true, + }, + { + method: "invalid", + isPlus: false, + hasError: true, + }, + { + method: "least_conn", + isPlus: false, + hasError: false, + }, + { + method: "random", + isPlus: false, + hasError: false, + }, + { + method: "random two", + isPlus: false, + hasError: false, + }, + { + method: "random two least_conn", + isPlus: false, + hasError: false, + }, + { + method: "random two least_time", + isPlus: false, + hasError: true, + }, + { + method: "random two least_time", + isPlus: true, + hasError: true, + }, { + method: "random two least_time=connect", + isPlus: true, + hasError: true, + }, + } + + for _, test := range tests { + allErrs := validateLoadBalancingMethod(test.method, field.NewPath("method"), test.isPlus) + if !test.hasError && len(allErrs) > 0 { + t.Errorf("validateLoadBalancingMethod(%q, %v) returned errors %v for valid input", test.method, test.isPlus, allErrs) + } + if test.hasError && len(allErrs) < 1 { + t.Errorf("validateLoadBalancingMethod(%q, %v) failed to return an error for invalid input", test.method, test.isPlus) + } + } +} + func TestValidateTransportServerSnippet(t *testing.T) { tests := []struct { snippet string