Skip to content

Commit

Permalink
Add load balancing method to transport server config
Browse files Browse the repository at this point in the history
  • Loading branch information
soneillf5 committed May 19, 2021
1 parent 12a0296 commit 61c5b76
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/nginx-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func main() {
templateExecutorV2, *nginxPlus, isWildcardEnabled, plusCollector, *enablePrometheusMetrics, latencyCollector, *enableLatencyMetrics)
controllerNamespace := os.Getenv("POD_NAMESPACE")

transportServerValidator := cr_validation.NewTransportServerValidator(*enableTLSPassthrough, *enableSnippets)
transportServerValidator := cr_validation.NewTransportServerValidator(*enableTLSPassthrough, *enableSnippets, *nginxPlus)
virtualServerValidator := cr_validation.NewVirtualServerValidator(*nginxPlus)

lbcInput := k8s.NewLoadBalancerControllerInput{
Expand Down
2 changes: 2 additions & 0 deletions deployments/common/crds/k8s.nginx.org_transportservers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ spec:
type: integer
timeout:
type: string
loadBalancingMethod:
type: string
maxConns:
type: integer
maxFails:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ spec:
type: integer
timeout:
type: string
loadBalancingMethod:
type: string
maxConns:
type: integer
maxFails:
Expand Down
5 changes: 5 additions & 0 deletions docs-web/configuration/transportserver-resource.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ port: 8443
maxFails: 3
maxConns: 100
failTimeout: 30s
loadBalancingMethod: least_conn
```

```eval_rst
Expand Down Expand Up @@ -212,6 +213,10 @@ failTimeout: 30s
- The health check configuration for the Upstream. See the `health_check <https://nginx.org/en/docs/stream/ngx_stream_upstream_hc_module.html#health_check>`_ directive. Note: this feature is supported only in NGINX Plus.
- `healthcheck <#upstream-healthcheck>`_
- No
* - ``loadBalancingMethod``
- The method used to load balance the upstream servers. By default, connections are distributed between the servers using a weighted round-robin balancing method. See the `upstream <http://nginx.org/en/docs/stream/ngx_stream_upstream_module.html#upstream>`_ section for available methods and their details.
- ``string``
- No

```

Expand Down
4 changes: 2 additions & 2 deletions internal/configs/parsing_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ func validateHashLBMethod(method string) (string, error) {
keyWords := strings.Split(method, " ")

if keyWords[0] == "hash" {
if len(keyWords) == 2 || len(keyWords) == 3 && keyWords[2] == "consistent" {
if len(keyWords) == 2 || (len(keyWords) == 3 && keyWords[2] == "consistent") {
return method, nil
}
}

return "", fmt.Errorf("Invalid load balancing method: %q", method)
return "", fmt.Errorf("invalid load balancing method: %q", method)
}

// ParseBool ensures that the string value is a valid bool
Expand Down
18 changes: 16 additions & 2 deletions internal/configs/transportserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,21 @@ func generateStreamUpstream(upstream *conf_v1alpha1.Upstream, upstreamNamer *ups
}

return version2.StreamUpstream{
Name: name,
Servers: upsServers,
Name: name,
Servers: upsServers,
LoadBalancingMethod: generateLoadBalancingMethod(upstream.LoadBalancingMethod),
}
}

func generateLoadBalancingMethod(method string) string {
if method == "" {
// By default, if unspecified, Nginx uses the 'round_robin' load balancing method.
// We override this default which suits the Ingress Controller better.
return "random two least_conn"
}
if method == "round_robin" {
// By default, Nginx uses round robin. We select this method by not specifying any method.
return ""
}
return method
}
5 changes: 5 additions & 0 deletions internal/configs/transportserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func TestGenerateTransportServerConfigForTCPSnippets(t *testing.T) {
ResourceNamespace: "default",
Service: "tcp-app-svc",
},
LoadBalancingMethod: "random two least_conn",
},
},
Server: version2.StreamServer{
Expand Down Expand Up @@ -198,6 +199,7 @@ func TestGenerateTransportServerConfigForTCP(t *testing.T) {
ResourceNamespace: "default",
Service: "tcp-app-svc",
},
LoadBalancingMethod: "random two least_conn",
},
},
Server: version2.StreamServer{
Expand Down Expand Up @@ -286,6 +288,7 @@ func TestGenerateTransportServerConfigForTCPMaxConnections(t *testing.T) {
ResourceNamespace: "default",
Service: "tcp-app-svc",
},
LoadBalancingMethod: "random two least_conn",
},
},
Server: version2.StreamServer{
Expand Down Expand Up @@ -370,6 +373,7 @@ func TestGenerateTransportServerConfigForTLSPasstrhough(t *testing.T) {
ResourceNamespace: "default",
Service: "tcp-app-svc",
},
LoadBalancingMethod: "random two least_conn",
},
},
Server: version2.StreamServer{
Expand Down Expand Up @@ -460,6 +464,7 @@ func TestGenerateTransportServerConfigForUDP(t *testing.T) {
ResourceNamespace: "default",
Service: "udp-app-svc",
},
LoadBalancingMethod: "random two least_conn",
},
},
Server: version2.StreamServer{
Expand Down
4 changes: 3 additions & 1 deletion internal/configs/version2/nginx-plus.transportserver.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
upstream {{ $u.Name }} {
zone {{ $u.Name }} 256k;

random two least_conn;
{{ if $u.LoadBalancingMethod }}
{{ $u.LoadBalancingMethod }};
{{ end }}

{{ range $s := $u.Servers }}
server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }} max_conns={{ $s.MaxConnections }};
Expand Down
4 changes: 3 additions & 1 deletion internal/configs/version2/nginx.transportserver.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
upstream {{ $u.Name }} {
zone {{ $u.Name }} 256k;

random two least_conn;
{{ if $u.LoadBalancingMethod }}
{{ $u.LoadBalancingMethod }};
{{ end }}

{{ range $s := $u.Servers }}
server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }} max_conns={{ $s.MaxConnections }};
Expand Down
7 changes: 4 additions & 3 deletions internal/configs/version2/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ type TransportServerConfig struct {

// StreamUpstream defines a stream upstream.
type StreamUpstream struct {
Name string
Servers []StreamUpstreamServer
UpstreamLabels UpstreamLabels
Name string
Servers []StreamUpstreamServer
UpstreamLabels UpstreamLabels
LoadBalancingMethod string
}

// StreamUpstreamServer defines a stream upstream server.
Expand Down
2 changes: 1 addition & 1 deletion internal/k8s/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func createTestConfiguration() *Configuration {
80: true,
443: true,
}),
validation.NewTransportServerValidator(isTLSPassthroughEnabled, snippetsEnabled),
validation.NewTransportServerValidator(isTLSPassthroughEnabled, snippetsEnabled, isPlus),
isTLSPassthroughEnabled,
)
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/apis/configuration/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ type TransportServerListener struct {

// Upstream defines an upstream.
type Upstream struct {
Name string `json:"name"`
Service string `json:"service"`
Port int `json:"port"`
FailTimeout string `json:"failTimeout"`
MaxFails *int `json:"maxFails"`
MaxConns *int `json:"maxConns"`
HealthCheck *HealthCheck `json:"healthCheck"`
Name string `json:"name"`
Service string `json:"service"`
Port int `json:"port"`
FailTimeout string `json:"failTimeout"`
MaxFails *int `json:"maxFails"`
MaxConns *int `json:"maxConns"`
HealthCheck *HealthCheck `json:"healthCheck"`
LoadBalancingMethod string `json:"loadBalancingMethod"`
}

// HealthCheck defines the parameters for active Upstream HealthChecks.
Expand Down
87 changes: 83 additions & 4 deletions pkg/apis/configuration/validation/transportserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package validation

import (
"fmt"
"regexp"
"strings"

v1alpha1 "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1"
"github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1alpha1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -13,13 +15,15 @@ import (
type TransportServerValidator struct {
tlsPassthrough bool
snippetsEnabled bool
isPlus bool
}

// NewTransportServerValidator creates a new TransportServerValidator.
func NewTransportServerValidator(tlsPassthrough bool, snippetsEnabled bool) *TransportServerValidator {
func NewTransportServerValidator(tlsPassthrough bool, snippetsEnabled bool, isPlus bool) *TransportServerValidator {
return &TransportServerValidator{
tlsPassthrough: tlsPassthrough,
snippetsEnabled: snippetsEnabled,
isPlus: isPlus,
}
}

Expand All @@ -37,7 +41,7 @@ func (tsv *TransportServerValidator) validateTransportServerSpec(spec *v1alpha1.
isTLSPassthroughListener := isPotentialTLSPassthroughListener(&spec.Listener)
allErrs = append(allErrs, validateTransportServerHost(spec.Host, fieldPath.Child("host"), isTLSPassthroughListener)...)

upstreamErrs, upstreamNames := validateTransportServerUpstreams(spec.Upstreams, fieldPath.Child("upstreams"))
upstreamErrs, upstreamNames := validateTransportServerUpstreams(spec.Upstreams, fieldPath.Child("upstreams"), tsv.isPlus)
allErrs = append(allErrs, upstreamErrs...)

allErrs = append(allErrs, validateTransportServerUpstreamParameters(spec.UpstreamParameters, fieldPath.Child("upstreamParameters"), spec.Listener.Protocol)...)
Expand Down Expand Up @@ -146,7 +150,7 @@ func validateListenerProtocol(protocol string, fieldPath *field.Path) field.Erro
return allErrs
}

func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path) (allErrs field.ErrorList, upstreamNames sets.String) {
func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isPlus bool) (allErrs field.ErrorList, upstreamNames sets.String) {
allErrs = field.ErrorList{}
upstreamNames = sets.String{}

Expand All @@ -172,11 +176,86 @@ func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath *
}

allErrs = append(allErrs, validateTSUpstreamHealthChecks(u.HealthCheck, idxPath.Child("healthChecks"))...)

allErrs = append(allErrs, validateLoadBalancingMethod(u.LoadBalancingMethod, idxPath.Child("loadBalancingMethod"), isPlus)...)
}

return allErrs, upstreamNames
}

func validateLoadBalancingMethod(method string, fieldPath *field.Path, isPlus bool) field.ErrorList {
allErrs := field.ErrorList{}
if method == "" {
return allErrs
}

method = strings.TrimSpace(method)

if strings.HasPrefix(method, "hash") {
return validateHashLoadBalancingMethod(method, fieldPath, isPlus)
}

validMethodValues := nginxStreamLoadBalanceValidInput
if isPlus {
validMethodValues = nginxPlusStreamLoadBalanceValidInput
}

if _, exists := validMethodValues[method]; !exists {
return append(allErrs, field.Invalid(fieldPath, method, fmt.Sprintf("load balancing method is not valid: %v", method)))
}

return allErrs
}

var nginxStreamLoadBalanceValidInput = map[string]bool{
"round_robin": true,
"least_conn": true,
"random": true,
"random two": true,
"random two least_conn": true,
}

var nginxPlusStreamLoadBalanceValidInput = map[string]bool{
"round_robin": true,
"least_conn": true,
"random": true,
"random two": true,
"random two least_conn": true,
"random least_conn": true,
"least_time connect": true,
"least_time first_byte": true,
"least_time last_byte": true,
"least_time last_byte inflight": true,
}

var loadBalancingVariables = map[string]bool{
"remote_addr": true,
}

var hashMethodRegexp = regexp.MustCompile(`^hash (\S+)(?: consistent)?$`)

func validateHashLoadBalancingMethod(method string, fieldPath *field.Path, isPlus bool) field.ErrorList {
allErrs := field.ErrorList{}
matches := hashMethodRegexp.FindStringSubmatch(method)
if len(matches) != 2 {
msg := fmt.Sprintf("invalid value for load balancing method: %v", method)
return append(allErrs, field.Invalid(fieldPath, method, msg))
}

hashKey := matches[1]
if strings.Contains(hashKey, "$") {
varErrs := validateStringWithVariables(hashKey, fieldPath, []string{}, loadBalancingVariables, isPlus)
allErrs = append(allErrs, varErrs...)
}

if !escapedStringsFmtRegexp.MatchString(method) {
msg := fmt.Sprintf("invalid value for hash: %v", validation.RegexError(escapedStringsErrMsg, escapedStringsFmt))
return append(allErrs, field.Invalid(fieldPath, method, msg))
}

return allErrs
}

func validateTSUpstreamHealthChecks(hc *v1alpha1.HealthCheck, fieldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}

Expand Down
Loading

0 comments on commit 61c5b76

Please sign in to comment.