diff --git a/lib/cassandra/init.lua b/lib/cassandra/init.lua index ac4c60b..cff3fe1 100644 --- a/lib/cassandra/init.lua +++ b/lib/cassandra/init.lua @@ -383,6 +383,7 @@ end _Host.get_request_opts = get_opts local function page_iterator(self, query, args, opts) + opts = opts or {} local page = 0 return function(_, p_rows) local meta = p_rows.meta diff --git a/lib/resty/cassandra/cluster.lua b/lib/resty/cassandra/cluster.lua index 10d6eeb..e9fb5ea 100644 --- a/lib/resty/cassandra/cluster.lua +++ b/lib/resty/cassandra/cluster.lua @@ -24,17 +24,23 @@ local log = ngx.log local WARN = ngx.WARN local DEBUG = ngx.DEBUG local NOTICE = ngx.NOTICE +local C = ffi.C local _log_prefix = '[lua-cassandra] ' local _rec_key = 'host:rec:' local _prepared_key = 'prepared:id:' ffi.cdef [[ + size_t strlen(const char *str); + struct peer_rec { uint64_t reconn_delay; uint64_t unhealthy_at; + char *data_center; + char *release_version; }; ]] +local str_const = ffi.typeof('char *') local rec_peer_const = ffi.typeof('const struct peer_rec*') local rec_peer_size = ffi.sizeof('struct peer_rec') local rec_peer_cdata = ffi.new('struct peer_rec') @@ -43,33 +49,42 @@ local function get_now() return now() * 1000 end ------------------------------ --- Hosts health stored in shm ------------------------------ +----------------------------------------- +-- Hosts status+health+info stored in shm +----------------------------------------- + +local function set_peer(self, host, up, reconn_delay, unhealthy_at, + data_center, release_version) + data_center = data_center or '' + release_version = release_version or '' -local function set_peer(self, host, up, reconn_delay, unhealthy_at) -- status local ok, err = self.shm:set(host, up) if not ok then - return nil, 'could not set host status in shm: '..err + return nil, 'could not set host details in shm: '..err end - -- health details + -- host health and info rec_peer_cdata.reconn_delay = reconn_delay rec_peer_cdata.unhealthy_at = unhealthy_at + rec_peer_cdata.data_center = ffi_cast(str_const, data_center) + rec_peer_cdata.release_version = ffi_cast(str_const, release_version) + ok, err = self.shm:set(_rec_key..host, ffi_str(rec_peer_cdata, rec_peer_size)) if not ok then - return nil, 'could not set host info in shm: '..err + return nil, 'could not set host details in shm: '..err end return true end local function get_peer(self, host, status) - local v, err = self.shm:get(_rec_key .. host) + local rec_v, err = self.shm:get(_rec_key .. host) if err then return nil, 'could not get host details in shm: '..err - elseif type(v) ~= 'string' or #v ~= rec_peer_size then + elseif not rec_v then + return nil, 'no host details for '..host + elseif type(rec_v) ~= 'string' or #rec_v ~= rec_peer_size then return nil, 'corrupted shm' end @@ -78,13 +93,15 @@ local function get_peer(self, host, status) if err then return nil, 'could not get host status in shm: '..err end end - local peer_rec = ffi_cast(rec_peer_const, v) + local peer = ffi_cast(rec_peer_const, rec_v) return { up = status, host = host, - reconn_delay = tonumber(peer_rec.reconn_delay), - unhealthy_at = tonumber(peer_rec.unhealthy_at) + data_center = ffi_str(peer.data_center, C.strlen(peer.data_center)), + release_version = ffi_str(peer.release_version, C.strlen(peer.release_version)), + reconn_delay = tonumber(peer.reconn_delay), + unhealthy_at = tonumber(peer.unhealthy_at) } end @@ -108,14 +125,24 @@ local function get_peers(self) end local function set_peer_down(self, host) - log(WARN, _log_prefix, 'setting host at ', host.coordinator, ' DOWN') - return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now()) + log(WARN, _log_prefix, 'setting host at ', host, ' DOWN') + + local peer = get_peer(self, host, false) + peer = peer or {} + + return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(), + peer.data_center, peer.release_version) end local function set_peer_up(self, host) - log(NOTICE, _log_prefix, 'setting host at ', host.coordinator, ' UP') + log(NOTICE, _log_prefix, 'setting host at ', host, ' UP') self.reconn_policy:reset(host) - return set_peer(self, host, true, 0, 0) + + local peer = get_peer(self, host, true) + peer = peer or {} + + return set_peer(self, host, true, 0, 0, + peer.data_center, peer.release_version) end local function can_try_peer(self, host) @@ -387,8 +414,8 @@ function _Cluster:refresh() for i = 1, #old_peers do local host = old_peers[i].host old_peers[host] = old_peers[i] -- alias as a hash - self.shm:delete(_rec_key .. host) - self.shm:delete(host) + self.shm:delete(_rec_key .. host) -- details + self.shm:delete(host) -- status bool end else old_peers = {} -- empty shm @@ -406,14 +433,23 @@ function _Cluster:refresh() local coordinator, err = first_coordinator(self) if not coordinator then return nil, err end + local local_rows, err = coordinator:execute [[ + SELECT data_center,rpc_address,release_version FROM system.local + ]] + if not local_rows then return nil, err end + local rows, err = coordinator:execute [[ - SELECT peer,data_center,rpc_address FROM system.peers + SELECT peer,data_center,rpc_address,release_version FROM system.peers ]] if not rows then return nil, err end coordinator:setkeepalive() - rows[#rows+1] = {rpc_address = coordinator.host} -- local host + rows[#rows+1] = { -- local host + rpc_address = local_rows[1].rpc_address, + data_center = local_rows[1].data_center, + release_version = local_rows[1].release_version + } for i = 1, #rows do local host = rows[i].rpc_address @@ -426,7 +462,8 @@ function _Cluster:refresh() unhealthy_at = old_peer.unhealthy_at end - local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at) + local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, + rows[i].data_center, rows[i].release_version) if not ok then return nil, err end end diff --git a/t/06-cluster.t b/t/06-cluster.t index e26c4e6..12ed9e4 100644 --- a/t/06-cluster.t +++ b/t/06-cluster.t @@ -286,7 +286,52 @@ GET /t -=== TEST 9: cluster.refresh() inits cluster +=== TEST 9: cluster.refresh() sets data_center/release_version of each host +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local Cluster = require 'resty.cassandra.cluster' + local cluster, err = Cluster.new() + if not cluster then + ngx.log(ngx.ERR, 'could not spawn cluster: ', err) + return + end + + local ok, err = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, 'could not refresh: ', err) + return + end + + local shm = ngx.shared.cassandra + local keys = shm:get_keys() + assert(#keys > 0) + + local peers, err = cluster:get_peers() + if not peers then + ngx.log(ngx.ERR, 'could not get shm peers: ', err) + return + end + + for i = 1, #peers do + local p = peers[i] + ngx.say(p.host, ' ', p.data_center, ' ', p.release_version) + end + } + } +--- request +GET /t +--- response_body_like +\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d? +\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d? +\d+\.\d+\.\d+\.\d+.*?\S+.*?\d+\.\d+\.?\d? +--- no_error_log +[error] + + + +=== TEST 10: cluster.refresh() inits cluster --- http_config eval: $::HttpConfig --- config location /t { @@ -318,7 +363,7 @@ init: true -=== TEST 10: cluster.refresh() removes old peers records and status +=== TEST 11: cluster.refresh() removes old peers details/status --- http_config eval: $::HttpConfig --- config location /t { @@ -330,8 +375,8 @@ init: true end -- insert fake peers - cluster:set_peer('127.0.0.253', true, 0, 0) - cluster:set_peer('127.0.0.254', true, 0, 0) + cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', '0.0') + cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', '0.0') local ok, err = cluster:refresh() if not ok then @@ -352,6 +397,11 @@ init: true ngx.say('status: ', cluster.shm:get('127.0.0.253')) ngx.say('status: ', cluster.shm:get('127.0.0.254')) + + local _, err = cluster:get_peer('127.0.0.253') + ngx.say('info: ', err) + local _, err = cluster:get_peer('127.0.0.254') + ngx.say('info: ', err) } } --- request @@ -362,6 +412,8 @@ GET /t 127.0.0.1 true status: nil status: nil +info: no host details for 127.0.0.253 +info: no host details for 127.0.0.254 --- no_error_log [error] @@ -379,7 +431,7 @@ status: nil end -- insert previous peers with some infos - cluster:set_peer('127.0.0.1', false, 1000, 1461030739000) + cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '') local ok, err = cluster:refresh() if not ok then @@ -420,7 +472,7 @@ up: false -=== TEST 12: get_peers() corrupted shm +=== TEST 14: get_peers() corrupted shm --- http_config eval: $::HttpConfig --- config location /t { @@ -448,7 +500,7 @@ corrupted shm -=== TEST 13: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes health +=== TEST 15: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes status --- http_config eval: $::HttpConfig --- config location /t { @@ -510,7 +562,7 @@ GET /t -=== TEST 14: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays) +=== TEST 16: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays) --- http_config eval: $::HttpConfig --- config location /t { @@ -596,7 +648,7 @@ reconn_delay: true -=== TEST 15: can_try_peer() use reconnection policy to decide when node is down +=== TEST 17: can_try_peer() use reconnection policy to decide when node is down --- http_config eval: $::HttpConfig --- config location /t { @@ -635,7 +687,7 @@ reconn_delay: true ngx.say('until delay: ', ok, ' ', is_retry) -- still down but speed up reconnection delay - ok, err = cluster:set_peer('127.0.0.1', false, 1000, 1460780710809) + ok, err = cluster:set_peer('127.0.0.1', false, 1000, 1460780710809, '', '') if not ok then ngx.log(ngx.ERR, 'could not set peer_rec: ', err) return @@ -884,7 +936,7 @@ can try peer 255.255.255.253: false end -- still down, but simulate delay for retry from reconnection policy - ok, err = cluster:set_peer(peers[i].host, false, 1000, 1460780710809) + ok, err = cluster:set_peer(peers[i].host, false, 1000, 1460780710809, '', '') if not ok then ngx.log(ngx.ERR, 'could not set peer_rec: ', err) return