Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce a balancer interface #2543

Merged
merged 4 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .luacheckrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
std = 'ngx_lua'
globals = {'_'}
globals = {
'_TEST'
}
exclude_files = {'./rootfs/etc/nginx/lua/test/**/*.lua'}
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ env:
- CHANGE_MINIKUBE_NONE_USER=true
- KUBERNETES_VERSION=v1.10.0
- DOCKER=docker
- BUSTED_VERSION=2.0.rc12
- GH_REF=github.com/kubernetes/ingress-nginx
- secure: LIS2XpZufWTcJ53jiRsSZy2Gi1EUJ1XmLg7z3f2ZHeMnyG2Jhk3GW4vod1FNru+PY4PWgddLdCdIl+jqOYXndFlbdAWF3/Oy5fEkYLXdYV7tdlHcPWDkqNFrfiyZ4guChN+b2Nk6FqU7o5fsZAIR7VAbgqNRF5XMo9Mhn/vhDCQRcnbXy7uq7JTrYUkqDbQoyYvT6b480GCY5gags1zp/xZfPDNZEe936o8i5IPTyiykRyNOXN/AH6kd3pR5e1xYgcvJ9KpSVPghcwFE7kJ4fOVMRhRG5ML+IyML+xD0jX43EMNoqRKZ/HS42kIMCInFbJEcxVde7DPNBZ7Y3GAqh7HO6qrE70Dn3ha6DID6zCoH2ArW39BxG4zempjn2VxYoMRGREyZszWQb++dwGoHmo5FHt6zvIrYBG0dA0H8ja9VkZkjFwtYTGHU1ooPzUfJK4O4VBayV8LqZibyZQR+GrmyQc0aagUY7J/fe4A2PJyI4DbkeZ7GX1ELj0ciDz4urQSzUc8l/T3aU3X+FuJItjgYtMLPmqcjA5uifDCtutE8Z9L2gSpanqUdvLSOozuxPho/KNl+2YlF7fXqPW3LnRf5mHD+NbOff306pvKlHJOb2Vmth+HBQ1XDzt/Cy5+sfwS3E0Vmh6UTq/NtkUXxwH10BDMF7FMVlQ4zdHQvyZ0=
- secure: rKDoy9IYYYy0fYBs4+9mwuBVq/TcxfFwMfE0ywYWhUUdgzrUYSJAwpoe/96EQ4YmESUefwC2nDNq4G3XzJKYOWf83PaIveb9Z//zmMrCQXjDuDBDLpwV3sXSh7evXiVDohJz4ogBCeMRUCMKYsyKBM9yWfa/iu+yI92dbphpK9peOKW6yBc0uspJlln4swN3GS2WT9LVuPY2Azv9U2UqrXufOPDKG/qEb/Vrn4yZ2lR/50r2k45e9nSvDoByvr10V8ubM5Zc0iP0vBuAUVRdByv6N53Q4gaBGapY6SxhIjIPC/h0rNnuT9EXp7MWaPT5FmBxLt9wnyleT9QhZJnFyaBYqFgcz/DKifYQkryY4M5dLMo/Rt3yATyAy8Y0df1TOoV2dKdqwOOwQ8bXB1wDfyrGxmQj9HY4Ffnphx3wPE1a+Sjuh+S5Epm7XJbPx5pZJqNO2hd4sTbk0Xp3gpPbihny2r/jtNwHl0wpFCfOM68RNrsVRlIwG3UhzbZvblbQ/M/mmWCdgzINjt07I2SGCJxfKG0e98Q49SKUoDoOgQTTRDqTC9IgOEDxyfAkT0Vr6BtlP88Nsgnf6kmboyigBrRAiaDQGTxn3SP6LnQI3CeopaRDYvFZe/rTwPXE9XlKoTn9FTWnAqF3MuWaLslDcDKYEh7OaYJjF01piu6g4Nc=
Expand Down
164 changes: 83 additions & 81 deletions rootfs/etc/nginx/lua/balancer.lua
Original file line number Diff line number Diff line change
@@ -1,86 +1,71 @@
local ngx_balancer = require("ngx.balancer")
local json = require("cjson")
local configuration = require("configuration")
local util = require("util")
local lrucache = require("resty.lrucache")
local round_robin = require("balancer.round_robin")
local chash = require("balancer.chash")
local sticky = require("balancer.sticky")
local ewma = require("balancer.ewma")
local resty_balancer = require("balancer.resty")

-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1

local DEFAULT_LB_ALG = "round_robin"
local IMPLEMENTATIONS = {
round_robin = round_robin,
chash = chash,
sticky = sticky,
ewma = ewma,
}

local _M = {}
local balancers = {}

-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
-- load balancer implementations
local backends, backends_err = lrucache.new(1024)
if not backends then
return error("failed to create the cache for backends: " .. (backends_err or "unknown"))
end

local function get_current_backend()
local backend_name = ngx.var.proxy_upstream_name
local backend = backends:get(backend_name)
local function get_implementation(backend)
local name = backend["load-balance"] or DEFAULT_LB_ALG

if not backend then
-- TODO(elvinefendi) maybe force backend sync here?
ngx.log(ngx.WARN, "no backend configuration found for " .. tostring(backend_name))
if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then
name = "sticky"
elseif backend["upstream-hash-by"] then
name = "chash"
end

return backend
end

local function get_balancer(backend)
if not backend then
return nil
local implementation = IMPLEMENTATIONS[name]
if not implementation then
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG]
end

local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if resty_balancer.is_applicable(backend) then
return resty_balancer
elseif lb_alg ~= "ewma" then
if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN,
string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG))
end
return resty_balancer
end

return ewma
end

local function balance()
local backend = get_current_backend()
local balancer = get_balancer(backend)
if not balancer then
return nil, nil
end

local endpoint = balancer.balance(backend)
if not endpoint then
return nil, nil
end

return endpoint.address, endpoint.port
return implementation
end

local function sync_backend(backend)
backends:set(backend.name, backend)
local implementation = get_implementation(backend)
local balancer = balancers[backend.name]

local balancer = get_balancer(backend)
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end

-- every implementation is the metatable of its instances (see .new(...) functions)
-- here we check if `balancer` is the instance of `implementation`
-- if it is not then we deduce LB algorithm has changed for the backend
if getmetatable(balancer) ~= implementation then
ngx.log(ngx.INFO,
string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name))
balancers[backend.name] = implementation:new(backend)
return
end
balancer.sync(backend)

balancer:sync(backend)
end

local function sync_backends()
local backends_data = configuration.get_backends_data()
if not backends_data then
balancers = {}
return
end

Expand All @@ -90,28 +75,22 @@ local function sync_backends()
return
end

for _, new_backend in pairs(new_backends) do
local backend = backends:get(new_backend.name)
local backend_changed = true

if backend then
backend_changed = not util.deep_compare(backend, new_backend)
end
local balancers_to_keep = {}
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
balancers_to_keep[new_backend.name] = balancers[new_backend.name]
end

if backend_changed then
sync_backend(new_backend)
for backend_name, _ in pairs(balancers) do
if not balancers_to_keep[backend_name] then
balancers[backend_name] = nil
end
end
end

local function after_balance()
local backend = get_current_backend()
local balancer = get_balancer(backend)
if not balancer then
return
end

balancer.after_balance()
local function get_balancer()
local backend_name = ngx.var.proxy_upstream_name
return balancers[backend_name]
end

function _M.init_worker()
Expand All @@ -122,28 +101,51 @@ function _M.init_worker()
end
end

function _M.call()
local phase = ngx.get_phase()
if phase == "log" then
after_balance()
return
function _M.rewrite()
local balancer = get_balancer()
if not balancer then
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
return ngx.exit(ngx.status)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with the ingress-nginx config, but I'm guessing this means that every configured upstream will be configured to use Lua load balancing? Likewise, will rewrite_by_lua be called for every configured site? Why not move this logic into balance instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@csfrancis this was in balance() before. But it turns out modifying ngx.status is not allowed in balance phase.

but I'm guessing this means that every configured upstream will be configured to use Lua load balancing

that's correct it's either all or none

end
if phase ~= "balancer" then
return error("must be called in balancer or log, but was called in: " .. phase)
end

function _M.balance()
local balancer = get_balancer()
if not balancer then
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to log something if not balancer here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewlouis93 https://github.com/kubernetes/ingress-nginx/pull/2543/files#diff-b00d77a6df9c8c05a483044b08e6bc50R103 short-circuits when balancer does not exist and this code does not even get executed. And we get 503 response code

Copy link
Contributor

@andrewloux andrewloux May 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right I thought about that - but consider the case when a sync_backends that results in balancers = {} occurs between a request's rewrite and balance phase?

end

local host, port = balance()
if not host then
ngx.status = ngx.HTTP_SERVICE_UNAVAILABLE
return ngx.exit(ngx.status)
local host, port = balancer:balance()
if not (host and port) then
ngx.log(ngx.WARN,
string.format("host or port is missing, balancer: %s, host: %s, port: %s", balancer.name, host, port))
return
end

ngx_balancer.set_more_tries(1)

local ok, err = ngx_balancer.set_current_peer(host, port)
if not ok then
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
ngx.log(ngx.ERR, "error while setting current upstream peer to " .. tostring(err))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here ... should this return a 503?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end
end

function _M.log()
local balancer = get_balancer()
if not balancer then
return
end

if not balancer.after_balance then
return
end

balancer:after_balance()
end

if _TEST then
_M.get_implementation = get_implementation
_M.sync_backend = sync_backend
end

return _M
21 changes: 21 additions & 0 deletions rootfs/etc/nginx/lua/balancer/chash.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local balancer_resty = require("balancer.resty")
local resty_chash = require("resty.chash")
local util = require("util")

local _M = balancer_resty:new({ factory = resty_chash, name = "chash" })

function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints)
local o = { instance = self.factory:new(nodes), hash_by = backend["upstream-hash-by"] }
setmetatable(o, self)
self.__index = self
return o
end

function _M.balance(self)
local key = util.lua_ngx_var(self.hash_by)
local endpoint_string = self.instance:find(key)
return util.split_pair(endpoint_string, ":")
end

return _M
37 changes: 27 additions & 10 deletions rootfs/etc/nginx/lua/balancer/ewma.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ local PICK_SET_SIZE = 2

local ewma_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})

local _M = {}
local _M = { name = "ewma" }

local function lock(upstream)
local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
Expand Down Expand Up @@ -117,17 +117,20 @@ local function pick_and_score(peers, k)
return peers[lowest_score_index]
end

function _M.balance(backend)
local peers = backend.endpoints
if #peers == 1 then
return peers[1]
function _M.balance(self)
local peers = self.peers
local endpoint = peers[1]

if #peers > 1 then
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
endpoint = pick_and_score(peer_copy, k)
end
local k = (#peers < PICK_SET_SIZE) and #peers or PICK_SET_SIZE
local peer_copy = util.deepcopy(peers)
return pick_and_score(peer_copy, k)

return endpoint.address, endpoint.port
end

function _M.after_balance()
function _M.after_balance(_)
local response_time = tonumber(util.get_first_value(ngx.var.upstream_response_time)) or 0
local connect_time = tonumber(util.get_first_value(ngx.var.upstream_connect_time)) or 0
local rtt = connect_time + response_time
Expand All @@ -139,10 +142,24 @@ function _M.after_balance()
get_or_update_ewma(upstream, rtt, true)
end

function _M.sync(_)
function _M.sync(self, backend)
local changed = not util.deep_compare(self.peers, backend.endpoints)
if not changed then
return
end

self.peers = backend.endpoints

-- TODO: Reset state of EWMA per backend
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end

function _M.new(self, backend)
local o = { peers = backend.endpoints }
setmetatable(o, self)
self.__index = self
return o
end

return _M
Loading