Skip to content

Commit

Permalink
fix(cluster) some minor logging messages and missing options table
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed Sep 28, 2016
1 parent 53c5fc5 commit 40fd870
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 32 deletions.
1 change: 1 addition & 0 deletions lib/cassandra/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ end
_Host.get_request_opts = get_opts

local function page_iterator(self, query, args, opts)
opts = opts or {}
local page = 0
return function(_, p_rows)
local meta = p_rows.meta
Expand Down
79 changes: 58 additions & 21 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ local log = ngx.log
local WARN = ngx.WARN
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE
local C = ffi.C

local _log_prefix = '[lua-cassandra] '
local _rec_key = 'host:rec:'
local _prepared_key = 'prepared:id:'

ffi.cdef [[
size_t strlen(const char *str);

struct peer_rec {
uint64_t reconn_delay;
uint64_t unhealthy_at;
char *data_center;
char *release_version;
};
]]
local str_const = ffi.typeof('char *')
local rec_peer_const = ffi.typeof('const struct peer_rec*')
local rec_peer_size = ffi.sizeof('struct peer_rec')
local rec_peer_cdata = ffi.new('struct peer_rec')
Expand All @@ -43,33 +49,42 @@ local function get_now()
return now() * 1000
end

-----------------------------
-- Hosts health stored in shm
-----------------------------
-----------------------------------------
-- Hosts status+health+info stored in shm
-----------------------------------------

local function set_peer(self, host, up, reconn_delay, unhealthy_at,
data_center, release_version)
data_center = data_center or ''
release_version = release_version or ''

local function set_peer(self, host, up, reconn_delay, unhealthy_at)
-- status
local ok, err = self.shm:set(host, up)
if not ok then
return nil, 'could not set host status in shm: '..err
return nil, 'could not set host details in shm: '..err
end

-- health details
-- host health and info
rec_peer_cdata.reconn_delay = reconn_delay
rec_peer_cdata.unhealthy_at = unhealthy_at
rec_peer_cdata.data_center = ffi_cast(str_const, data_center)
rec_peer_cdata.release_version = ffi_cast(str_const, release_version)

ok, err = self.shm:set(_rec_key..host, ffi_str(rec_peer_cdata, rec_peer_size))
if not ok then
return nil, 'could not set host info in shm: '..err
return nil, 'could not set host details in shm: '..err
end

return true
end

local function get_peer(self, host, status)
local v, err = self.shm:get(_rec_key .. host)
local rec_v, err = self.shm:get(_rec_key .. host)
if err then
return nil, 'could not get host details in shm: '..err
elseif type(v) ~= 'string' or #v ~= rec_peer_size then
elseif not rec_v then
return nil, 'no host details for '..host
elseif type(rec_v) ~= 'string' or #rec_v ~= rec_peer_size then
return nil, 'corrupted shm'
end

Expand All @@ -78,13 +93,15 @@ local function get_peer(self, host, status)
if err then return nil, 'could not get host status in shm: '..err end
end

local peer_rec = ffi_cast(rec_peer_const, v)
local peer = ffi_cast(rec_peer_const, rec_v)

return {
up = status,
host = host,
reconn_delay = tonumber(peer_rec.reconn_delay),
unhealthy_at = tonumber(peer_rec.unhealthy_at)
data_center = ffi_str(peer.data_center, C.strlen(peer.data_center)),
release_version = ffi_str(peer.release_version, C.strlen(peer.release_version)),
reconn_delay = tonumber(peer.reconn_delay),
unhealthy_at = tonumber(peer.unhealthy_at)
}
end

Expand All @@ -108,14 +125,24 @@ local function get_peers(self)
end

local function set_peer_down(self, host)
log(WARN, _log_prefix, 'setting host at ', host.coordinator, ' DOWN')
return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now())
log(WARN, _log_prefix, 'setting host at ', host, ' DOWN')

local peer = get_peer(self, host, false)
peer = peer or {}

return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(),
peer.data_center, peer.release_version)
end

local function set_peer_up(self, host)
log(NOTICE, _log_prefix, 'setting host at ', host.coordinator, ' UP')
log(NOTICE, _log_prefix, 'setting host at ', host, ' UP')
self.reconn_policy:reset(host)
return set_peer(self, host, true, 0, 0)

local peer = get_peer(self, host, true)
peer = peer or {}

return set_peer(self, host, true, 0, 0,
peer.data_center, peer.release_version)
end

local function can_try_peer(self, host)
Expand Down Expand Up @@ -387,8 +414,8 @@ function _Cluster:refresh()
for i = 1, #old_peers do
local host = old_peers[i].host
old_peers[host] = old_peers[i] -- alias as a hash
self.shm:delete(_rec_key .. host)
self.shm:delete(host)
self.shm:delete(_rec_key .. host) -- details
self.shm:delete(host) -- status bool
end
else
old_peers = {} -- empty shm
Expand All @@ -406,14 +433,23 @@ function _Cluster:refresh()
local coordinator, err = first_coordinator(self)
if not coordinator then return nil, err end

local local_rows, err = coordinator:execute [[
SELECT data_center,rpc_address,release_version FROM system.local
]]
if not local_rows then return nil, err end

local rows, err = coordinator:execute [[
SELECT peer,data_center,rpc_address FROM system.peers
SELECT peer,data_center,rpc_address,release_version FROM system.peers
]]
if not rows then return nil, err end

coordinator:setkeepalive()

rows[#rows+1] = {rpc_address = coordinator.host} -- local host
rows[#rows+1] = { -- local host
rpc_address = local_rows[1].rpc_address,
data_center = local_rows[1].data_center,
release_version = local_rows[1].release_version
}

for i = 1, #rows do
local host = rows[i].rpc_address
Expand All @@ -426,7 +462,8 @@ function _Cluster:refresh()
unhealthy_at = old_peer.unhealthy_at
end

local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at)
local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at,
rows[i].data_center, rows[i].release_version)
if not ok then return nil, err end
end

Expand Down
74 changes: 63 additions & 11 deletions t/06-cluster.t
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,52 @@ GET /t



=== TEST 9: cluster.refresh() inits cluster
=== TEST 9: cluster.refresh() sets data_center/release_version of each host
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new()
if not cluster then
ngx.log(ngx.ERR, 'could not spawn cluster: ', err)
return
end

local ok, err = cluster:refresh()
if not ok then
ngx.log(ngx.ERR, 'could not refresh: ', err)
return
end

local shm = ngx.shared.cassandra
local keys = shm:get_keys()
assert(#keys > 0)

local peers, err = cluster:get_peers()
if not peers then
ngx.log(ngx.ERR, 'could not get shm peers: ', err)
return
end

for i = 1, #peers do
local p = peers[i]
ngx.say(p.host, ' ', p.data_center, ' ', p.release_version)
end
}
}
--- request
GET /t
--- response_body_like
\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d?
\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d?
\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d?
--- no_error_log
[error]



=== TEST 10: cluster.refresh() inits cluster
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -318,7 +363,7 @@ init: true



=== TEST 10: cluster.refresh() removes old peers records and status
=== TEST 11: cluster.refresh() removes old peers details/status
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand All @@ -330,8 +375,8 @@ init: true
end

-- insert fake peers
cluster:set_peer('127.0.0.253', true, 0, 0)
cluster:set_peer('127.0.0.254', true, 0, 0)
cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', '0.0')
cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', '0.0')

local ok, err = cluster:refresh()
if not ok then
Expand All @@ -352,6 +397,11 @@ init: true

ngx.say('status: ', cluster.shm:get('127.0.0.253'))
ngx.say('status: ', cluster.shm:get('127.0.0.254'))

local _, err = cluster:get_peer('127.0.0.253')
ngx.say('info: ', err)
local _, err = cluster:get_peer('127.0.0.254')
ngx.say('info: ', err)
}
}
--- request
Expand All @@ -362,6 +412,8 @@ GET /t
127.0.0.1 true
status: nil
status: nil
info: no host details for 127.0.0.253
info: no host details for 127.0.0.254
--- no_error_log
[error]

Expand All @@ -379,7 +431,7 @@ status: nil
end

-- insert previous peers with some infos
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000)
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '')

local ok, err = cluster:refresh()
if not ok then
Expand Down Expand Up @@ -420,7 +472,7 @@ up: false



=== TEST 12: get_peers() corrupted shm
=== TEST 14: get_peers() corrupted shm
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -448,7 +500,7 @@ corrupted shm



=== TEST 13: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes health
=== TEST 15: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes status
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -510,7 +562,7 @@ GET /t



=== TEST 14: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays)
=== TEST 16: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays)
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -596,7 +648,7 @@ reconn_delay: true



=== TEST 15: can_try_peer() use reconnection policy to decide when node is down
=== TEST 17: can_try_peer() use reconnection policy to decide when node is down
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -635,7 +687,7 @@ reconn_delay: true
ngx.say('until delay: ', ok, ' ', is_retry)

-- still down but speed up reconnection delay
ok, err = cluster:set_peer('127.0.0.1', false, 1000, 1460780710809)
ok, err = cluster:set_peer('127.0.0.1', false, 1000, 1460780710809, '', '')
if not ok then
ngx.log(ngx.ERR, 'could not set peer_rec: ', err)
return
Expand Down Expand Up @@ -884,7 +936,7 @@ can try peer 255.255.255.253: false
end

-- still down, but simulate delay for retry from reconnection policy
ok, err = cluster:set_peer(peers[i].host, false, 1000, 1460780710809)
ok, err = cluster:set_peer(peers[i].host, false, 1000, 1460780710809, '', '')
if not ok then
ngx.log(ngx.ERR, 'could not set peer_rec: ', err)
return
Expand Down

0 comments on commit 40fd870

Please sign in to comment.