Skip to content

Commit

Permalink
Merge pull request #3396 from flugel-it/master
Browse files Browse the repository at this point in the history
New balancer implementation: consistent hash subset
  • Loading branch information
k8s-ci-robot authored Jan 4, 2019
2 parents fabf902 + 60b9835 commit 2c3ce07
Show file tree
Hide file tree
Showing 17 changed files with 434 additions and 17 deletions.
70 changes: 70 additions & 0 deletions docs/examples/chashsubset/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginxhello
spec:
replicas: 10
selector:
matchLabels:
app: nginxhello
template:
metadata:
labels:
app: nginxhello
spec:
containers:
- name: nginxhello
image: gcr.io/kubernetes-e2e-test-images/echoserver:2.2
ports:
- containerPort: 8080
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP

---
kind: Service
apiVersion: v1
metadata:
name: nginxhello
labels:
app: nginxhello
spec:
selector:
app: nginxhello
ports:
- name: http
port: 80
targetPort: 8080

---
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/upstream-hash-by: "$arg_predictorid"
nginx.ingress.kubernetes.io/upstream-hash-by-subset: "true"
nginx.ingress.kubernetes.io/upstream-hash-by-subset-size: "3"
name: nginxhello-ingress
namespace: default
spec:
backend:
serviceName: nginxhello
servicePort: 80

6 changes: 6 additions & 0 deletions docs/user-guide/nginx-configuration/annotations.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,16 @@ nginx.ingress.kubernetes.io/auth-realm: "realm string"

NGINX supports load balancing by client-server mapping based on [consistent hashing](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash) for a given key. The key can contain text, variables or any combination thereof. This feature allows for request stickiness other than client IP or cookies. The [ketama](http://www.last.fm/user/RJ/journal/2007/04/10/392555/) consistent hashing method will be used which ensures only a few keys would be remapped to different servers on upstream group changes.

There is a special mode of upstream hashing called subset. In this mode, upstream servers are grouped into subsets, and stickiness works by mapping keys to a subset instead of individual upstream servers. Specific server is chosen uniformly at random from the selected sticky subset. It provides a balance between stickiness and load distribution.

To enable consistent hashing for a backend:

`nginx.ingress.kubernetes.io/upstream-hash-by`: the nginx variable, text value or any combination thereof to use for consistent hashing. For example `nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"` to consistently hash upstream requests by the current request URI.

"subset" hashing can be enabled setting `nginx.ingress.kubernetes.io/upstream-hash-by-subset`: "true". This maps requests to subset of nodes instead of a single one. `upstream-hash-by-subset-size` determines the size of each subset (default 3).

Please check the [chashsubset](../../examples/chashsubset/deployment.yaml) example.

### Custom NGINX load balancing

This is similar to [`load-balance` in ConfigMap](./configmap.md#load-balance), but configures load balancing algorithm per ingress.
Expand Down
2 changes: 1 addition & 1 deletion internal/ingress/annotations/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Ingress struct {
SessionAffinity sessionaffinity.Config
SSLPassthrough bool
UsePortInRedirects bool
UpstreamHashBy string
UpstreamHashBy upstreamhashby.Config
LoadBalancing string
UpstreamVhost string
Whitelist ipwhitelist.SourceRange
Expand Down
2 changes: 1 addition & 1 deletion internal/ingress/annotations/annotations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestUpstreamHashBy(t *testing.T) {

for _, foo := range fooAnns {
ing.SetAnnotations(foo.annotations)
r := ec.Extract(ing).UpstreamHashBy
r := ec.Extract(ing).UpstreamHashBy.UpstreamHashBy
if r != foo.er {
t.Errorf("Returned %v but expected %v", r, foo.er)
}
Expand Down
21 changes: 17 additions & 4 deletions internal/ingress/annotations/upstreamhashby/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,27 @@ type upstreamhashby struct {
r resolver.Resolver
}

// NewParser creates a new CORS annotation parser
// Config contains the Consistent hash configuration to be used in the Ingress
type Config struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
}

// NewParser creates a new UpstreamHashBy annotation parser
func NewParser(r resolver.Resolver) parser.IngressAnnotation {
return upstreamhashby{r}
}

// Parse parses the annotations contained in the ingress rule
// used to indicate if the location/s contains a fragment of
// configuration to be included inside the paths of the rules
func (a upstreamhashby) Parse(ing *extensions.Ingress) (interface{}, error) {
return parser.GetStringAnnotation("upstream-hash-by", ing)
upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing)
upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing)
upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing)

if upstreamHashbySubsetSize == 0 {
upstreamHashbySubsetSize = 3
}

return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil
}
7 changes: 6 additions & 1 deletion internal/ingress/annotations/upstreamhashby/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ func TestParse(t *testing.T) {
for _, testCase := range testCases {
ing.SetAnnotations(testCase.annotations)
result, _ := ap.Parse(ing)
if result != testCase.expected {
uc, ok := result.(*Config)
if !ok {
t.Fatalf("expected a Config type")
}

if uc.UpstreamHashBy != testCase.expected {
t.Errorf("expected %v but returned %v, annotations: %s", testCase.expected, result, testCase.annotations)
}
}
Expand Down
14 changes: 10 additions & 4 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,9 +661,13 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
if upstreams[defBackend].SecureCACert.Secret == "" {
upstreams[defBackend].SecureCACert = anns.SecureUpstream.CACert
}
if upstreams[defBackend].UpstreamHashBy == "" {
upstreams[defBackend].UpstreamHashBy = anns.UpstreamHashBy

if upstreams[defBackend].UpstreamHashBy.UpstreamHashBy == "" {
upstreams[defBackend].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
}

if upstreams[defBackend].LoadBalancing == "" {
upstreams[defBackend].LoadBalancing = anns.LoadBalancing
}
Expand Down Expand Up @@ -725,8 +729,10 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B
upstreams[name].SecureCACert = anns.SecureUpstream.CACert
}

if upstreams[name].UpstreamHashBy == "" {
upstreams[name].UpstreamHashBy = anns.UpstreamHashBy
if upstreams[name].UpstreamHashBy.UpstreamHashBy == "" {
upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy
upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset
upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize
}

if upstreams[name].LoadBalancing == "" {
Expand Down
8 changes: 8 additions & 0 deletions internal/ingress/defaults/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ type Backend struct {
// http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash
UpstreamHashBy string `json:"upstream-hash-by"`

// Consistent hashing subset flag.
// Default: false
UpstreamHashBySubset bool `json:"upstream-hash-by-subset"`

// Subset consistent hashing, subset size.
// Default 3
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"`

// Let's us choose a load balancing algorithm per ingress
LoadBalancing string `json:"load-balance"`

Expand Down
9 changes: 8 additions & 1 deletion internal/ingress/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Backend struct {
// StickySessionAffinitySession contains the StickyConfig object with stickyness configuration
SessionAffinity SessionAffinityConfig `json:"sessionAffinityConfig"`
// Consistent hashing by NGINX variable
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBy UpstreamHashByConfig `json:"upstreamHashByConfig,omitempty"`
// LB algorithm configuration per ingress
LoadBalancing string `json:"load-balance,omitempty"`
// Denotes if a backend has no server. The backend instead shares a server with another backend and acts as an
Expand Down Expand Up @@ -150,6 +150,13 @@ type CookieSessionAffinity struct {
Path string `json:"path,omitempty"`
}

// UpstreamHashByConfig described setting from the upstream-hash-by* annotations.
type UpstreamHashByConfig struct {
UpstreamHashBy string `json:"upstream-hash-by,omitempty"`
UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"`
UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"`
}

// Endpoint describes a kubernetes endpoint in a backend
// +k8s:deepcopy-gen=true
type Endpoint struct {
Expand Down
21 changes: 21 additions & 0 deletions internal/ingress/types_equals.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,27 @@ func (csa1 *CookieSessionAffinity) Equal(csa2 *CookieSessionAffinity) bool {
return true
}

//Equal checks the equality between UpstreamByConfig types
func (u1 *UpstreamHashByConfig) Equal(u2 *UpstreamHashByConfig) bool {
if u1 == u2 {
return true
}
if u1 == nil || u2 == nil {
return false
}
if u1.UpstreamHashBy != u2.UpstreamHashBy {
return false
}
if u1.UpstreamHashBySubset != u2.UpstreamHashBySubset {
return false
}
if u1.UpstreamHashBySubsetSize != u2.UpstreamHashBySubsetSize {
return false
}

return true
}

// Equal checks the equality against an Endpoint
func (e1 *Endpoint) Equal(e2 *Endpoint) bool {
if e1 == e2 {
Expand Down
10 changes: 8 additions & 2 deletions rootfs/etc/nginx/lua/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local dns_util = require("util.dns")
local configuration = require("configuration")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local chashsubset = require("balancer.chashsubset")
local sticky = require("balancer.sticky")
local ewma = require("balancer.ewma")

Expand All @@ -17,6 +18,7 @@ local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
chashsubset = chashsubset,
sticky = sticky,
ewma = ewma,
}
Expand All @@ -29,8 +31,12 @@ local function get_implementation(backend)

if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
name = "sticky"
elseif backend["upstream-hash-by"] then
name = "chash"
elseif backend["upstreamHashByConfig"] and backend["upstreamHashByConfig"]["upstream-hash-by"] then
if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then
name = "chashsubset"
else
name = "chash"
end
end

local implementation = IMPLEMENTATIONS[name]
Expand Down
2 changes: 1 addition & 1 deletion rootfs/etc/nginx/lua/balancer/chash.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints)
local o = {
instance = self.factory:new(nodes),
hash_by = backend["upstream-hash-by"],
hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"],
traffic_shaping_policy = backend.trafficShapingPolicy,
alternative_backends = backend.alternativeBackends,
}
Expand Down
84 changes: 84 additions & 0 deletions rootfs/etc/nginx/lua/balancer/chashsubset.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
-- Consistent hashing to a subset of nodes. Instead of returning the same node
-- always, we return the same subset always.

local resty_chash = require("resty.chash")
local util = require("util")

local _M = { name = "chashsubset" }

local function build_subset_map(backend)
local endpoints = {}
local subset_map = {}
local subsets = {}
local subset_size = backend["upstreamHashByConfig"]["upstream-hash-by-subset-size"]

for _, endpoint in pairs(backend.endpoints) do
table.insert(endpoints, endpoint)
end

local set_count = math.ceil(#endpoints/subset_size)
local node_count = set_count * subset_size

-- if we don't have enough endpoints, we reuse endpoints in the last set to
-- keep the same number on all of them.
local j = 1
for _ = #endpoints+1, node_count do
table.insert(endpoints, endpoints[j])
j = j+1
end

local k = 1
for i = 1, set_count do
local subset = {}
local subset_id = "set" .. tostring(i)
for _ = 1, subset_size do
table.insert(subset, endpoints[k])
k = k+1
end
subsets[subset_id] = subset
subset_map[subset_id] = 1
end

return subset_map, subsets
end

function _M.new(self, backend)
local subset_map, subsets = build_subset_map(backend)

local o = {
instance = resty_chash:new(subset_map),
hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"],
subsets = subsets,
current_endpoints = backend.endpoints
}
setmetatable(o, self)
self.__index = self
return o
end

function _M.balance(self)
local key = util.lua_ngx_var(self.hash_by)
local subset_id = self.instance:find(key)
local endpoints = self.subsets[subset_id]
local endpoint = endpoints[math.random(#endpoints)]
return endpoint.address .. ":" .. endpoint.port
end

function _M.sync(self, backend)
local subset_map

local changed = not util.deep_compare(self.current_endpoints, backend.endpoints)
if not changed then
return
end

self.current_endpoints = backend.endpoints

subset_map, self.subsets = build_subset_map(backend)

self.instance:reinit(subset_map)

return
end

return _M
2 changes: 1 addition & 1 deletion rootfs/etc/nginx/lua/test/balancer/chash_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe("Balancer chash", function()
end

local backend = {
name = "my-dummy-backend", ["upstream-hash-by"] = "$request_uri",
name = "my-dummy-backend", upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri" },
endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } }
}
local instance = balancer_chash:new(backend)
Expand Down
Loading

0 comments on commit 2c3ce07

Please sign in to comment.