Skip to content

Commit

Permalink
fix(balancer) async initialization of balancer objects
Browse files Browse the repository at this point in the history
When upstreams include targets using hostnames instead of IPs, the
initialization of balancer objects needs to perform DNS resolution, which
involves opening a socket, which is not available in the init_worker phase.

This patch performs balancer initialization in a timer, which makes
initialization asynchronous to worker initialization. For this reason,
it adds a locking system to create_balancer to avoid race conditions
when requests happen while the balancers are being initialized.

The locking system uses a simplified version of `resty.lock` which does
not use the shm, since balancers are per-worker and we are concerned only
with intra-worker concurrency control.
  • Loading branch information
hishamhm committed Feb 14, 2018
1 parent fcdcfd5 commit 1d63abe
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 13 deletions.
62 changes: 55 additions & 7 deletions kong/core/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ local table_concat = table.concat
local crc32 = ngx.crc32_short
local toip = dns_client.toip
local log = ngx.log
local sleep = ngx.sleep
local min = math.min
local max = math.max

local CRIT = ngx.CRIT
local ERR = ngx.ERR
local WARN = ngx.WARN
local DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -291,9 +295,49 @@ do
end
end

local creating = {}

local function wait(id)
local timeout = 30
local step = 0.001
local ratio = 2
local max_step = 0.5
while timeout > 0 do
sleep(step)
timeout = timeout - step
if not creating[id] then
return true
end
if timeout <= 0 then
break
end
step = min(max(0.001, step * ratio), timeout, max_step)
end
return nil, "timeout"
end

------------------------------------------------------------------------------
-- @param upstream (table) The upstream data
-- @param recreate (boolean, optional) create new balancer even if one exists
-- @param history (table, optional) history of target updates
-- @param start (integer, optional) from where to start reading the history
-- @return The new balancer object, or nil+error
create_balancer = function(upstream, history, start)
create_balancer = function(upstream, recreate, history, start)

if balancers[upstream.id] and not recreate then
return balancers[upstream.id]
end

if creating[upstream.id] then
local ok = wait(upstream.id)
if not ok then
return nil, "timeout waiting for balancer for " .. upstream.id
end
return balancers[upstream.id]
end

creating[upstream.id] = true

local balancer, err = ring_balancer.new({
wheelSize = upstream.slots,
order = upstream.orderlist,
Expand Down Expand Up @@ -321,6 +365,8 @@ do
-- is fully set up.
balancers[upstream.id] = balancer

creating[upstream.id] = nil

return balancer
end
end
Expand Down Expand Up @@ -376,7 +422,7 @@ local function check_target_history(upstream, balancer)

stop_healthchecker(balancer)

local new_balancer, err = create_balancer(upstream, new_history, 1)
local new_balancer, err = create_balancer(upstream, true, new_history, 1)
if not new_balancer then
return nil, err
end
Expand Down Expand Up @@ -472,7 +518,7 @@ local function get_balancer(target, no_create)
return nil, "balancer not found"
else
log(ERR, "balancer not found for ", upstream.name, ", will create it")
return create_balancer(upstream)
return create_balancer(upstream), upstream
end
end

Expand Down Expand Up @@ -525,7 +571,7 @@ local function on_upstream_event(operation, upstream)

local _, err = create_balancer(upstream)
if err then
log(ERR, "failed creating balancer for ", upstream.name, ": ", err)
log(CRIT, "failed creating balancer for ", upstream.name, ": ", err)
end

elseif operation == "delete" or operation == "update" then
Expand All @@ -542,7 +588,7 @@ local function on_upstream_event(operation, upstream)
if operation == "delete" then
balancers[upstream.id] = nil
else
local _, err = create_balancer(upstream)
local _, err = create_balancer(upstream, true)
if err then
log(ERR, "failed recreating balancer for ", upstream.name, ": ", err)
end
Expand Down Expand Up @@ -605,9 +651,10 @@ end


local function init()

local upstreams, err = get_all_upstreams()
if not upstreams then
log(ngx.STDERR, "failed loading initial list of upstreams: ", err)
log(CRIT, "failed loading initial list of upstreams: " .. err)
return
end

Expand All @@ -618,11 +665,12 @@ local function init()
if ok ~= nil then
oks = oks + 1
else
log(ngx.STDERR, "failed creating balancer for ", name, ": ", err)
log(CRIT, "failed creating balancer for ", name, ": ", err)
errs = errs + 1
end
end
log(DEBUG, "initialized ", oks, " balancer(s), ", errs, " error(s)")

end


Expand Down
13 changes: 7 additions & 6 deletions kong/core/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ return {
local cluster_events = singletons.cluster_events


-- initialize balancers


balancer.init()


-- events dispatcher


Expand Down Expand Up @@ -303,6 +297,12 @@ return {
end
end)


-- initialize balancers for active healthchecks
ngx.timer.at(0, function()
balancer.init()
end)

end
},
certificate = {
Expand All @@ -320,6 +320,7 @@ return {
},
access = {
before = function(ctx)

-- ensure router is up-to-date

local version, err = singletons.cache:get("router:version", {
Expand Down
29 changes: 29 additions & 0 deletions spec/01-unit/011-balancer_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,35 @@ describe("Balancer", function()
assert.truthy(hc)
hc:stop()
end)

it("reuses a balancer by default", function()
local b1 = balancer._create_balancer(UPSTREAMS_FIXTURES[1])
assert.truthy(b1)
local hc1 = balancer._get_healthchecker(b1)
local b2 = balancer._create_balancer(UPSTREAMS_FIXTURES[1])
assert.equal(b1, b2)
assert(hc1:stop())
end)

it("re-creates a balancer if told to", function()
local b1 = balancer._create_balancer(UPSTREAMS_FIXTURES[1], true)
assert.truthy(b1)
local hc1 = balancer._get_healthchecker(b1)
assert(hc1:stop())
local b2 = balancer._create_balancer(UPSTREAMS_FIXTURES[1], true)
assert.truthy(b1)
local hc2 = balancer._get_healthchecker(b2)
assert(hc2:stop())
local target_history = {
{ name = "mashape.com", port = 80, order = "001:a3", weight = 10 },
{ name = "mashape.com", port = 80, order = "002:a2", weight = 10 },
{ name = "mashape.com", port = 80, order = "002:a4", weight = 10 },
{ name = "mashape.com", port = 80, order = "003:a1", weight = 10 },
}
assert.not_same(b1, b2)
assert.same(target_history, balancer._get_target_history(b1))
assert.same(target_history, balancer._get_target_history(b2))
end)
end)

describe("get_balancer()", function()
Expand Down

0 comments on commit 1d63abe

Please sign in to comment.