Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(timers) cluster and reports timers use raw shm #1783

Merged
merged 1 commit into from
Nov 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kong/api/routes/apis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ return {

POST = function(self, dao_factory)
crud.post(self.params, dao_factory.plugins, function(data)
data.signal = reports.api_signal
reports.send(data)
reports.send("api", data)
end)
end,

Expand Down
3 changes: 1 addition & 2 deletions kong/api/routes/plugins.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ return {

POST = function(self, dao_factory)
crud.post(self.params, dao_factory.plugins, function(data)
data.signal = reports.api_signal
reports.send(data)
reports.send("api", data)
end)
end
},
Expand Down
183 changes: 114 additions & 69 deletions kong/core/cluster.lua
Original file line number Diff line number Diff line change
@@ -1,100 +1,145 @@
local cache = require "kong.tools.database_cache"
local singletons = require "kong.singletons"


local kong_dict = ngx.shared.kong
local timer_at = ngx.timer.at
local ngx_log = ngx.log
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG

local resty_lock
local status, res = pcall(require, "resty.lock")
if status then
resty_lock = res
end

local KEEPALIVE_INTERVAL = 30
local ASYNC_AUTOJOIN_INTERVAL = 3
local ASYNC_AUTOJOIN_RETRIES = 20 -- Try for max a minute (3s * 20)
local KEEPALIVE_KEY = "events:keepalive"
local AUTOJOIN_INTERVAL = 3
local AUTOJOIN_KEY = "events:autojoin"
local AUTOJOIN_MAX_RETRIES = 20 -- Try for max a minute (3s * 20)
local AUTOJOIN_MAX_RETRIES_KEY = "autojoin_retries"


local function log(lvl, ...)
ngx_log(lvl, "[cluster] ", ...)
end


-- Hold a lock for the whole interval (exptime) to prevent multiple
-- worker processes from sending the test request simultaneously.
-- Other workers do not need to wait until this lock is released,
-- and can ignore the event, knowing another worker is handling it.
-- We substract 1ms to the exp time to prevent a race condition
-- with the next timer event.
local function get_lock(key, exptime)
local ok, err = kong_dict:safe_add(key, true, exptime - 0.001)
if not ok and err ~= "exists" then
log(ERR, "could not get lock from 'kong' shm: ", err)
end

return ok
end


local function create_timer(at, cb)
local ok, err = ngx.timer.at(at, cb)
local function create_timer(...)
local ok, err = timer_at(...)
if not ok then
ngx_log(ngx.ERR, "[cluster] failed to create timer: ", err)
log(ERR, "could not create timer: ", err)
end
end

local function async_autojoin(premature)
if premature then return end

local function autojoin_handler(premature)
if premature then
return
end

-- increase retry count by 1

local n_retries, err = kong_dict:incr(AUTOJOIN_MAX_RETRIES_KEY, 1, 0)
if err then
log(ERR, "could not increment number of auto-join retries in 'kong' ",
"shm: ", err)
return
end

-- register recurring retry timer

if n_retries < AUTOJOIN_MAX_RETRIES then
-- all workers need to register a recurring timer, in case one of them
-- crashes. Hence, this must be called before the `get_lock()` call.
create_timer(AUTOJOIN_INTERVAL, autojoin_handler)
end

if not get_lock(AUTOJOIN_KEY, AUTOJOIN_INTERVAL) then
return
end

-- auto-join nodes table

-- If this node is the only node in the cluster, but other nodes are present, then try to join them
-- This usually happens when two nodes are started very fast, and the first node didn't write his
-- information into the datastore yet. When the second node starts up, there is nothing to join yet.
local lock, err = resty_lock:new("cluster_autojoin_locks", {
exptime = ASYNC_AUTOJOIN_INTERVAL - 0.001
})
if not lock then
ngx_log(ngx.ERR, "could not create lock: ", err)
return
end
local elapsed = lock:lock("async_autojoin")
if elapsed and elapsed == 0 then
-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in
-- the DAO, then try to join them
local count, err = singletons.dao.nodes:count()
log(DEBUG, "auto-joining")

-- If the current member count on this node's cluster is 1, but there are more than 1 active nodes in
-- the DAO, then try to join them
local count, err = singletons.dao.nodes:count()
if err then
log(ERR, err)

elseif count > 1 then
local members, err = singletons.serf:members()
if err then
ngx_log(ngx.ERR, tostring(err))
elseif count > 1 then
local members, err = singletons.serf:members()
log(ERR, err)

elseif #members < 2 then
-- Trigger auto-join
local _, err = singletons.serf:autojoin()
if err then
ngx_log(ngx.ERR, tostring(err))
elseif #members < 2 then
-- Trigger auto-join
local _, err = singletons.serf:autojoin()
if err then
ngx_log(ngx.ERR, tostring(err))
end
else
return -- The node is already in the cluster and no need to continue
log(ERR, err)
end
end

-- Create retries counter key if it doesn't exist
if not cache.get(cache.autojoin_retries_key()) then
cache.rawset(cache.autojoin_retries_key(), 0)
end

local autojoin_retries = cache.incr(cache.autojoin_retries_key(), 1) -- Increment retries counter
if (autojoin_retries < ASYNC_AUTOJOIN_RETRIES) then
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin)
else
return -- The node is already in the cluster and no need to continue
end
end
end

local function send_keepalive(premature)
if premature then return end

local lock = resty_lock:new("cluster_locks", {
exptime = KEEPALIVE_INTERVAL - 0.001
})
local elapsed = lock:lock("keepalive")
if elapsed and elapsed == 0 then
-- Send keepalive
local nodes, err = singletons.dao.nodes:find_all {
name = singletons.serf.node_name
}

local function keepalive_handler(premature)
if premature then
return
end

-- all workers need to register a recurring timer, in case one of them
-- crashes. Hence, this must be called before the `get_lock()` call.
create_timer(KEEPALIVE_INTERVAL, keepalive_handler)

if not get_lock(KEEPALIVE_KEY, KEEPALIVE_INTERVAL) then
return
end

log(DEBUG, "sending keepalive event to datastore")

local nodes, err = singletons.dao.nodes:find_all {
name = singletons.serf.node_name
}
if err then
log(ERR, "could not retrieve nodes from datastore: ", err)

elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node, {
ttl = singletons.configuration.cluster_ttl_on_failure,
quiet = true
})
if err then
ngx_log(ngx.ERR, tostring(err))
elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node, {ttl=singletons.configuration.cluster_ttl_on_failure})
if err then
ngx_log(ngx.ERR, tostring(err))
end
log(ERR, "could not update node in datastore:", err)
end
end

create_timer(KEEPALIVE_INTERVAL, send_keepalive)
end


return {
init_worker = function()
create_timer(KEEPALIVE_INTERVAL, send_keepalive)
create_timer(ASYNC_AUTOJOIN_INTERVAL, async_autojoin) -- Only execute one time
create_timer(KEEPALIVE_INTERVAL, keepalive_handler)
create_timer(AUTOJOIN_INTERVAL, autojoin_handler)
end
}
Loading