diff --git a/kong/core/balancer.lua b/kong/core/balancer.lua index a882de88681d..b58c8761582e 100644 --- a/kong/core/balancer.lua +++ b/kong/core/balancer.lua @@ -8,7 +8,11 @@ local table_concat = table.concat local crc32 = ngx.crc32_short local toip = dns_client.toip local log = ngx.log +local sleep = ngx.sleep +local min = math.min +local max = math.max +local CRIT = ngx.CRIT local ERR = ngx.ERR local WARN = ngx.WARN local DEBUG = ngx.DEBUG @@ -289,9 +293,49 @@ do end end + local creating = {} + + local function wait(id) + local timeout = 30 + local step = 0.001 + local ratio = 2 + local max_step = 0.5 + while timeout > 0 do + sleep(step) + timeout = timeout - step + if not creating[id] then + return true + end + if timeout <= 0 then + break + end + step = min(max(0.001, step * ratio), timeout, max_step) + end + return nil, "timeout" + end + ------------------------------------------------------------------------------ + -- @param upstream (table) The upstream data + -- @param recreate (boolean, optional) create new balancer even if one exists + -- @param history (table, optional) history of target updates + -- @param start (integer, optional) from where to start reading the history -- @return The new balancer object, or nil+error - create_balancer = function(upstream, history, start) + create_balancer = function(upstream, recreate, history, start) + + if balancers[upstream.id] and not recreate then + return balancers[upstream.id] + end + + if creating[upstream.id] then + local ok = wait(upstream.id) + if not ok then + return nil, "timeout waiting for balancer for " .. upstream.id + end + return balancers[upstream.id] + end + + creating[upstream.id] = true + local balancer, err = ring_balancer.new({ wheelSize = upstream.slots, order = upstream.orderlist, @@ -319,6 +363,8 @@ do -- is fully set up. balancers[upstream.id] = balancer + creating[upstream.id] = nil + return balancer end end @@ -374,7 +420,7 @@ local function check_target_history(upstream, balancer) stop_healthchecker(balancer) - local new_balancer, err = create_balancer(upstream, new_history, 1) + local new_balancer, err = create_balancer(upstream, true, new_history, 1) if not new_balancer then return nil, err end @@ -470,7 +516,7 @@ local function get_balancer(target, no_create) return nil, "balancer not found" else log(ERR, "balancer not found for ", upstream.name, ", will create it") - return create_balancer(upstream) + return create_balancer(upstream), upstream end end @@ -523,7 +569,7 @@ local function on_upstream_event(operation, upstream) local _, err = create_balancer(upstream) if err then - log(ERR, "failed creating balancer for ", upstream.name, ": ", err) + log(CRIT, "failed creating balancer for ", upstream.name, ": ", err) end elseif operation == "delete" or operation == "update" then @@ -540,7 +586,7 @@ local function on_upstream_event(operation, upstream) if operation == "delete" then balancers[upstream.id] = nil else - local _, err = create_balancer(upstream) + local _, err = create_balancer(upstream, true) if err then log(ERR, "failed recreating balancer for ", upstream.name, ": ", err) end @@ -603,9 +649,10 @@ end local function init() + local upstreams, err = get_all_upstreams() if not upstreams then - log(ngx.STDERR, "failed loading initial list of upstreams: ", err) + log(CRIT, "failed loading initial list of upstreams: " .. err) return end @@ -616,11 +663,12 @@ local function init() if ok ~= nil then oks = oks + 1 else - log(ngx.STDERR, "failed creating balancer for ", name, ": ", err) + log(CRIT, "failed creating balancer for ", name, ": ", err) errs = errs + 1 end end log(DEBUG, "initialized ", oks, " balancer(s), ", errs, " error(s)") + end diff --git a/kong/core/handler.lua b/kong/core/handler.lua index f23a52218895..b42e18ced7d2 100644 --- a/kong/core/handler.lua +++ b/kong/core/handler.lua @@ -85,12 +85,6 @@ return { local cluster_events = singletons.cluster_events - -- initialize balancers - - - balancer.init() - - -- events dispatcher @@ -303,6 +297,12 @@ return { end end) + + -- initialize balancers for active healthchecks + ngx.timer.at(0, function() + balancer.init() + end) + end }, certificate = { @@ -320,6 +320,7 @@ return { }, access = { before = function(ctx) + -- ensure router is up-to-date local version, err = singletons.cache:get("router:version", { diff --git a/spec/01-unit/011-balancer_spec.lua b/spec/01-unit/011-balancer_spec.lua index bb63b6b486da..0d5d58ca4554 100644 --- a/spec/01-unit/011-balancer_spec.lua +++ b/spec/01-unit/011-balancer_spec.lua @@ -203,6 +203,35 @@ describe("Balancer", function() assert.truthy(hc) hc:stop() end) + + it("reuses a balancer by default", function() + local b1 = balancer._create_balancer(UPSTREAMS_FIXTURES[1]) + assert.truthy(b1) + local hc1 = balancer._get_healthchecker(b1) + local b2 = balancer._create_balancer(UPSTREAMS_FIXTURES[1]) + assert.equal(b1, b2) + assert(hc1:stop()) + end) + + it("re-creates a balancer if told to", function() + local b1 = balancer._create_balancer(UPSTREAMS_FIXTURES[1], true) + assert.truthy(b1) + local hc1 = balancer._get_healthchecker(b1) + assert(hc1:stop()) + local b2 = balancer._create_balancer(UPSTREAMS_FIXTURES[1], true) + assert.truthy(b1) + local hc2 = balancer._get_healthchecker(b2) + assert(hc2:stop()) + local target_history = { + { name = "mashape.com", port = 80, order = "001:a3", weight = 10 }, + { name = "mashape.com", port = 80, order = "002:a2", weight = 10 }, + { name = "mashape.com", port = 80, order = "002:a4", weight = 10 }, + { name = "mashape.com", port = 80, order = "003:a1", weight = 10 }, + } + assert.not_same(b1, b2) + assert.same(target_history, balancer._get_target_history(b1)) + assert.same(target_history, balancer._get_target_history(b2)) + end) end) describe("get_balancer()", function()