diff --git a/lib/resty/cassandra/cluster.lua b/lib/resty/cassandra/cluster.lua index 1862fd3..9eeab81 100644 --- a/lib/resty/cassandra/cluster.lua +++ b/lib/resty/cassandra/cluster.lua @@ -19,6 +19,7 @@ local pairs = pairs local fmt = string.format local sub = string.sub local find = string.find +local gmatch = string.gmatch local now = ngx.now local type = type local log = ngx.log @@ -31,7 +32,8 @@ local empty_t = {} local _log_prefix = '[lua-cassandra] ' local _rec_key = 'host:rec:' local _prepared_key = 'prepared:id:' -local _protocol_version_key = 'protocol:version:' +local _topo_version_key = 'topo:' +local _refresh_lock_key = 'refresh:' local function get_now() return now() * 1000 @@ -42,14 +44,16 @@ end ----------------------------------------- local function set_peer(self, host, up, reconn_delay, unhealthy_at, - data_center, connect_err, release_version) + data_center, connect_err, release_version, add) data_center = data_center or '' connect_err = connect_err or '' release_version = release_version or '' + local method = add and 'safe_add' or 'safe_set' + -- host status - local ok, err = self.shm:safe_set(host, up) - if not ok then + local ok, err = self.shm[method](self.shm, host, up) + if not ok and err ~= "exists" then return nil, 'could not set host details in shm: '..err end @@ -58,16 +62,18 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at, #data_center, #connect_err, data_center, connect_err, release_version) - ok, err = self.shm:safe_set(_rec_key..host, marshalled) - if not ok then + ok, err = self.shm[method](self.shm, _rec_key..host, marshalled) + if not ok and err ~= "exists" then return nil, 'could not set host details in shm: '..err end return true end -local function add_peer(self, host, data_center) - return set_peer(self, host, true, 0, 0, data_center, nil, nil) +local function add_peer(self, host, up, reconn_delay, unhealthy_at, + data_center, connect_err, release_version) + return set_peer(self, host, up, reconn_delay, unhealthy_at, data_center, nil, + release_version, true) end local function get_peer(self, host, status) @@ -75,7 +81,8 @@ local function get_peer(self, host, status) if err then return nil, 'could not get host details in shm: '..err elseif marshalled == nil then - return nil, 'no host details for '..host + local tb = debug.traceback("") + return nil, 'no host details for '..host.."\n"..tb elseif type(marshalled) ~= 'string' then return nil, 'corrupted shm' end @@ -113,23 +120,56 @@ local function get_peer(self, host, status) } end -local function get_peers(self) - local peers = {} - local keys = self.shm:get_keys() -- 1024 keys - -- we shall have a relatively small number of keys, but in any case this - -- function is not to be called in hot paths anyways. - for i = 1, #keys do - if sub(keys[i], 1, #_rec_key) == _rec_key then - local host = sub(keys[i], #_rec_key + 1) - local peer, err = get_peer(self, host) - if not peer then return nil, err end - peers[#peers+1] = peer - end +local function set_peers(self, topo_version, peers, protocol_version) + local marshalled = {} + + for i = 1, #peers do + marshalled[i] = peers[i].host + end + + marshalled = concat(marshalled, ",") + + if protocol_version then + marshalled = protocol_version .. "|" .. marshalled + end + + local ok, err = self.shm:safe_set(_topo_version_key .. topo_version, + marshalled) + if not ok then return nil, 'could not set peers in shm: '..err end + + return true +end + +local function get_peers(self, topo_version) + if not topo_version then + topo_version = self.shm:get(_topo_version_key .. 'latest') + if not topo_version then return end + end + + local marshalled, err = self.shm:get(_topo_version_key .. topo_version) + if err then return nil, 'could not get peers from shm: '..err + elseif not marshalled then return end + + local peers = { + topology_version = topo_version, + } + + local sep_1 = find(marshalled, "|", 1, true) + if sep_1 then + peers.protocol_version = tonumber(sub(marshalled, 1, sep_1 - 1), 10) + marshalled = sub(marshalled, sep_1 + 1) end - if #peers > 0 then - return peers + for host in gmatch(marshalled, '([^,]+)') do + local peer, err = get_peer(self, host) + if not peer then + return nil, 'could not get peer from shm: '..err + end + + peers[#peers + 1] = peer end + + return peers end local function delete_peer(self, host) @@ -374,6 +414,7 @@ function _Cluster.new(opts) end return setmetatable({ + topo_ver = 0, shm = shared[dict_name], dict_name = dict_name, prepared_ids = {}, @@ -460,136 +501,221 @@ local function next_coordinator(self, coordinator_options) return nil, no_host_available_error(errors) end +local function compare_peers(t1, t2, tc) + for i = 1, #t1 do + local found + + for j = 1, #t2 do + if t1[i].host == t2[j].host then + found = true + break + end + end + + if not found then + table.insert(tc, t1[i].host) + end + end +end + --- Refresh the list of nodes in the cluster. -- Queries one of the specified `contact_points` to retrieve the list of -- available nodes in the cluster, and update the configured policies. +-- The query will use the timeout threshold specified in the `read_timeout` +-- option of the `new` method. +-- This method is safe be called at runtime, by multiple workers at the same +-- time, which can be useful to refresh the cluster topology when nodes are +-- added or removed from the cluster. -- This method is automatically called upon the first query made to the -- cluster (from `execute`, `batch` or `iterate`), but needs to be manually -- called if further updates are required. +-- @param[type=number] timeout Timeout threshold (in seconds) for a given +-- worker when another worker is already refreshing the topology (defaults to +-- the `lock_timeout` option of the `new` method). -- @treturn boolean `ok`: `true` if success, `nil` if failure. -- @treturn string `err`: String describing the error if failure. -function _Cluster:refresh() - local old_peers, err = get_peers(self) - if err then return nil, err - elseif old_peers then - -- we first need to flush the existing peers from the shm, - -- so that our lock can work properly. we keep old peers in - -- our local for later. - for i = 1, #old_peers do - local host = old_peers[i].host - old_peers[host] = old_peers[i] -- alias as a hash - self.shm:delete(_rec_key .. host) -- details - self.shm:delete(host) -- status bool - end - else - old_peers = {} -- empty shm +-- @treturn table `topology`: A table containing the topology changes if any. +-- This value will only be returned when the worker acquired the lock. +function _Cluster:refresh(timeout) + local ver_topo, ver_refresh = self.shm:get(_topo_version_key .. 'latest') + if not ver_topo then + ver_topo = 0 + ver_refresh = 0 end - local lock = resty_lock:new(self.dict_name, self.lock_opts) - local elapsed, err = lock:lock('refresh') - if not elapsed then return nil, 'failed to acquire refresh lock: '..err end + local topo_changes - -- did someone else got the hosts? - local peers, err = get_peers(self) - if err then return nil, err - elseif not peers then - -- we are the first ones to get there - local coordinator, err, local_cp = first_coordinator(self) - if not coordinator then return nil, err end + if self.topo_ver == ver_topo then + -- we already have the latest known cluster topology, try to + -- acquire lock to fetch a new one - local local_rows, err = coordinator:execute [[ - SELECT data_center,rpc_address,release_version FROM system.local - ]] - if not local_rows then return nil, err end + if not timeout then + timeout = self.lock_opts.timeout + end - assert(local_rows[1] ~= nil, 'local host could not be found') + log(DEBUG, _log_prefix, 'refresh: attempting to acquire lock...', + ' (ver_refresh=', ver_refresh, ', timeout=', timeout, ')') - local rows, err = coordinator:execute [[ - SELECT peer,data_center,rpc_address,release_version FROM system.peers - ]] - if not rows then return nil, err end + local lock = resty_lock:new(self.dict_name, { + timeout = timeout, + exptime = timeout and timeout + 1 + }) + local elapsed, err = lock:lock(_refresh_lock_key .. ver_refresh) + if not elapsed then + return nil, 'failed to acquire refresh lock: '..err.. + ' (ver_refresh='..ver_refresh..')' + end - coordinator:setkeepalive() + if elapsed == 0 then + ver_refresh = ver_refresh + 1 - local local_addr = local_rows[1].rpc_address - if local_addr == "0.0.0.0" or local_addr == "::" then - if self.logging then - log(WARN, _log_prefix, 'found contact point with \'', local_addr, '\' ', - 'as rpc_address, using \'', local_cp, '\' to ', - 'contact it instead. If this is incorrect ', - 'you should avoid using \'', local_addr, '\' ', - 'in rpc_address') - end + log(DEBUG, _log_prefix, 'refresh: lock acquired, fetching topology...', + ' (ver_refresh=', ver_refresh, ')') - local_addr = local_cp - end + local coordinator, err, local_cp = first_coordinator(self) + if not coordinator then return nil, err end - rows[#rows+1] = { -- local host - rpc_address = local_addr, - data_center = local_rows[1].data_center, - release_version = local_rows[1].release_version - } - - for i = 1, #rows do - local row = rows[i] - local host = row.rpc_address - if not host then - log(ERR, _log_prefix, 'no rpc_address found for host ', row.peer, - ' in ', coordinator.host, '\'s peers system ', - 'table. ', row.peer, ' will be ignored.') - else - if host == "0.0.0.0" or host == "::" then - if self.logging then - log(WARN, _log_prefix, 'found host with \'', host, '\' as ', - 'rpc_address, using \'', row.peer, '\' ', - 'to contact it instead. If this is ', - 'incorrect you should avoid using \'', host, - '\' in rpc_address') + coordinator:settimeout(self.timeout_read) + + local local_rows, err = coordinator:execute [[ + SELECT data_center,rpc_address,release_version FROM system.local + ]] + if not local_rows then return nil, err end + + assert(local_rows[1] ~= nil, 'local host could not be found') + + local rows, err = coordinator:execute [[ + SELECT peer,data_center,rpc_address,release_version FROM system.peers + ]] + if not rows then return nil, err end + + coordinator:setkeepalive() + + local local_addr = local_rows[1].rpc_address + if local_addr == "0.0.0.0" or local_addr == "::" then + if self.logging then + log(WARN, _log_prefix, 'found contact point with \'', local_addr, '\' ', + 'as rpc_address, using \'', local_cp, '\' to ', + 'contact it instead. If this is incorrect ', + 'you should avoid using \'', local_addr, '\' ', + 'in rpc_address') + end + + local_addr = local_cp + end + + rows[#rows+1] = { -- local host + rpc_address = local_addr, + data_center = local_rows[1].data_center, + release_version = local_rows[1].release_version + } + + log(DEBUG, _log_prefix, 'refresh: cluster topology fetched ', + '(ver_refresh=', ver_refresh, ', n_peers=', #rows, ')') + + for i = 1, #rows do + if rows[i].rpc_address then + if rows[i].rpc_address == "0.0.0.0" or rows[i].rpc_address == "::" then + if self.logging then + log(WARN, _log_prefix, 'found host with \'', rows[i].rpc_address, '\',', + ' as rpc_address, using \'', rows[i].peer, '\'', + ' to contact it instead. If this is ', + 'incorrect you should avoid using \'', + rows[i].rpc_address, '\' in rpc_address') + end + + rows[i].host = rows[i].peer + else + rows[i].host = rows[i].rpc_address end + end + end - host = row.peer + topo_changes = { + added = {}, + removed = {}, + } + + if ver_refresh == 1 then + for i = 1, #rows do + table.insert(topo_changes.added, rows[i].host) end + else + local old_peers, err = get_peers(self, ver_topo) + if err then return nil, err + elseif not old_peers then + log(WARN, _log_prefix, 'refresh: missing peers entry when comparing ', + 'topologies (ver_refresh=', ver_refresh, ')') + else + compare_peers(rows, old_peers, topo_changes.added) + compare_peers(old_peers, rows, topo_changes.removed) + end + end + + local rebuild = #topo_changes.added > 0 or #topo_changes.removed > 0 - local old_peer = old_peers[host] - local reconn_delay, unhealthy_at = 0, 0 - local up = true - if old_peer then - up = old_peer.up - reconn_delay = old_peer.reconn_delay - unhealthy_at = old_peer.unhealthy_at + log(DEBUG, _log_prefix, 'refresh: changes detected in topology: ', + rebuild and 'yes' or 'no', ' (ver_refresh=', ver_refresh, ')') + + if rebuild then + ver_topo = ver_topo + 1 + + for i = 1, #rows do + if not rows[i].rpc_address then + log(ERR, _log_prefix, 'no rpc_address found for host ', rows[i].peer, + ' in ', coordinator.host, '\'s peers system ', + 'table. ', rows[i].peer, ' will be ignored.') + else + local ok, err = add_peer(self, rows[i].host, true, 0, 0, + rows[i].data_center, nil, + rows[i].release_version) + if not ok then return nil, err end + end end - local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, - rows[i].data_center, nil, - rows[i].release_version) + local ok, err = set_peers(self, ver_topo, rows, + coordinator.protocol_version) if not ok then return nil, err end end + + local ok, err = self.shm:set(_topo_version_key .. 'latest', ver_topo, 0, ver_refresh) + if not ok then return nil, 'failed to set topo and refresh versions: '..err end end - peers, err = get_peers(self) - if err then return nil, err end + local ok, err = lock:unlock() + if not ok then return nil, 'failed to unlock refresh lock: '..err end - local ok, err = self.shm:safe_set(_protocol_version_key, coordinator.protocol_version) - if not ok then return nil, 'could not set protocol_version in shm: '..err end + ver_topo = self.shm:get(_topo_version_key .. 'latest') + elseif self.topo_ver < ver_topo then + log(DEBUG, _log_prefix, 'refresh: cluster topology already fetched, ', + 'rebuilding policies (ver_topo=', ver_topo, ')') + elseif self.topo_ver > ver_topo then + log(WARN, _log_prefix, 'refresh: cluster topology version ahead,', + ' rebuilding policies (cluster.topo_ver=', self.topo_ver, ')') end - local ok, err = lock:unlock() - if not ok then return nil, 'failed to unlock refresh lock: '..err end + if ver_topo ~= self.topo_ver then + local peers, err = get_peers(self, ver_topo) + if err then return nil, err + elseif not peers then return nil, 'no peers for topology version: ' .. ver_topo end - local protocol_version, err = self.shm:get(_protocol_version_key) - if not protocol_version then return nil, 'could not get protocol_version: '..err end + -- setting protocol_version early so we don't always attempt a connection + -- with an incompatible one, triggerring more round trips + self.peers_opts.protocol_version = peers.protocol_version - -- setting protocol_version early so we don't always attempt a connection - -- with an incompatible one, triggerring more round trips - self.peers_opts.protocol_version = protocol_version + -- initiate the load balancing policy + self.lb_policy:init(peers) - -- initiate the load balancing policy - self.lb_policy:init(peers) + self.topo_ver = ver_topo - -- cluster is ready to be queried - self.init = true + log(DEBUG, _log_prefix, 'refresh: cluster topology refreshed: yes', + ' (ver_refresh=', ver_refresh, ', ver_topo=', ver_topo, ')') + else + log(DEBUG, _log_prefix, 'refresh: cluster topology refreshed: no', + ' (ver_refresh=', ver_refresh, ', ver_topo=', ver_topo, ')') + end - return true + return true, nil, topo_changes end -------------------- @@ -865,7 +991,7 @@ do -- @treturn string `err`: String describing the error if failure. -- @treturn number `cql_err`: If a server-side error occurred, the CQL error code. function _Cluster:execute(query, args, options, coordinator_options) - if not self.init then + if self.topo_ver == 0 then local ok, err = self:refresh() if not ok then return nil, 'could not refresh cluster: '..err end end @@ -920,7 +1046,7 @@ do -- @treturn string `err`: String describing the error if failure. -- @treturn number `cql_err`: If a server-side error occurred, the CQL error code. function _Cluster:batch(queries, options, coordinator_options) - if not self.init then + if self.topo_ver == 0 then local ok, err = self:refresh() if not ok then return nil, 'could not refresh cluster: '..err end end @@ -977,6 +1103,7 @@ _Cluster.set_peer = set_peer _Cluster.get_peer = get_peer _Cluster.add_peer = add_peer _Cluster.get_peers = get_peers +_Cluster.set_peers = set_peers _Cluster.delete_peer = delete_peer _Cluster.set_peer_up = set_peer_up _Cluster.can_try_peer = can_try_peer diff --git a/t/06-cluster.t b/t/06-cluster.t index da08140..0aa9829 100644 --- a/t/06-cluster.t +++ b/t/06-cluster.t @@ -5,7 +5,7 @@ use t::Util; our $HttpConfig = $t::Util::HttpConfig; -plan tests => repeat_each() * blocks() * 3; +plan tests => repeat_each() * (blocks() * 3 + 3); run_tests(); @@ -364,13 +364,13 @@ GET /t return end - local protocol_version, err = ngx.shared.cassandra:get('protocol:version:') - if err then - ngx.log(ngx.ERR, 'could not get protocol_version: ', err) + local peers, err = cluster:get_peers() + if not peers then + ngx.log(ngx.ERR, 'could not get shm peers: ', err) return end - ngx.say('protocol_version: ', protocol_version) + ngx.say('protocol_version: ', peers.protocol_version) } } --- request @@ -382,7 +382,7 @@ protocol_version: \d -=== TEST 11: cluster.refresh() inits cluster +=== TEST 11: cluster.refresh() inits cluster topology --- http_config eval: $::HttpConfig --- config location /t { @@ -402,19 +402,19 @@ protocol_version: \d ngx.log(ngx.ERR, 'could not refresh: ', err) end - ngx.say('init: ', cluster.init) + ngx.say('topo_ver: ', cluster.topo_ver) } } --- request GET /t --- response_body -init: true +topo_ver: 1 --- no_error_log [error] -=== TEST 12: cluster.refresh() removes old peers details/status +=== TEST 12: cluster.refresh() does not remove old peers details/status --- http_config eval: $::HttpConfig --- config location /t { @@ -450,10 +450,10 @@ init: true ngx.say('status: ', cluster.shm:get('127.0.0.253')) ngx.say('status: ', cluster.shm:get('127.0.0.254')) - local _, err = cluster:get_peer('127.0.0.253') - ngx.say('info: ', err) - local _, err = cluster:get_peer('127.0.0.254') - ngx.say('info: ', err) + local peer, err = cluster:get_peer('127.0.0.253') + ngx.say('peer.host: ', peer.host) + local peer, err = cluster:get_peer('127.0.0.254') + ngx.say('peer.host: ', peer.host) } } --- request @@ -462,10 +462,10 @@ GET /t 127.0.0.3 true 127.0.0.2 true 127.0.0.1 true -status: nil -status: nil -info: no host details for 127.0.0.253 -info: no host details for 127.0.0.254 +status: true +status: true +peer.host: 127.0.0.253 +peer.host: 127.0.0.254 --- no_error_log [error] @@ -524,7 +524,67 @@ up: false -=== TEST 14: get_peers() corrupted shm +=== TEST 14: cluster.refresh() can be called while executing queries +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local Cluster = require 'resty.cassandra.cluster' + local cluster, err = Cluster.new() + if not cluster then + ngx.log(ngx.ERR, err) + end + + local ok, err = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + return + end + + ngx.say('topo_ver: ', cluster.topo_ver) + + local t, err = ngx.thread.spawn(function() + while true do + local res, err = cluster:execute("SELECT * FROM system.local") + if not res then + ngx.log(ngx.ERR, err) + end + ngx.sleep(0.001) + end + end) + if not t then + ngx.log(ngx.ERR, err) + return + end + + for i = 1, 3 do + local ok, err = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + return + end + end + + ngx.say('topo_ver: ', cluster.topo_ver) + + ngx.thread.kill(t) + } + } +--- request +GET /t +--- response_body +topo_ver: 1 +topo_ver: 1 +--- error_log +changes detected in topology: no (ver_refresh=2) +changes detected in topology: no (ver_refresh=3) +changes detected in topology: no (ver_refresh=4) +--- no_error_log +[error] + + + +=== TEST 15: cluster.refresh() returns topology changes --- http_config eval: $::HttpConfig --- config location /t { @@ -535,6 +595,54 @@ up: false ngx.log(ngx.ERR, err) end + local ok, err, topology = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + return + end + + ngx.say("added peers: ", #topology.added) + ngx.say("removed peers: ", #topology.removed) + + local ok, err, topology = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + return + end + + ngx.say("added peers: ", #topology.added) + ngx.say("removed peers: ", #topology.removed) + } + } +--- request +GET /t +--- response_body +added peers: 3 +removed peers: 0 +added peers: 0 +removed peers: 0 +--- no_error_log +[error] + + + +=== TEST 16: get_peers() corrupted shm +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local Cluster = require 'resty.cassandra.cluster' + local cluster, err = Cluster.new() + if not cluster then + ngx.log(ngx.ERR, err) + end + + local ok, err = cluster:refresh() + if not ok then + ngx.log(ngx.ERR, err) + return + end + cluster.shm:set('host:rec:127.0.0.1', false) local peers, err = cluster:get_peers() @@ -552,7 +660,7 @@ corrupted shm -=== TEST 15: get_peers() returns nil if no peers +=== TEST 17: get_peers() returns nil if no peers --- http_config eval: $::HttpConfig --- config location /t { @@ -582,7 +690,7 @@ is nil: true -=== TEST 16: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes status +=== TEST 18: set_peer_down()/set_peer_up()/can_try_peer() set shm booleans for nodes status --- http_config eval: $::HttpConfig --- config location /t { @@ -644,7 +752,7 @@ GET /t -=== TEST 17: set_peer_down()/set_peer_up() use existing host details if exists +=== TEST 19: set_peer_down()/set_peer_up() use existing host details if exists --- http_config eval: $::HttpConfig --- config location /t { @@ -713,7 +821,7 @@ GET /t -=== TEST 18: set_peer_down()/set_peer_up() defaults hosts details if not exists +=== TEST 20: set_peer_down()/set_peer_up() defaults hosts details if not exists --- http_config eval: $::HttpConfig --- config location /t { @@ -772,7 +880,7 @@ GET /t -=== TEST 19: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays) +=== TEST 21: set_peer_down()/set_peer_up() use reconnection policy (update peer_rec delays) --- http_config eval: $::HttpConfig --- config location /t { @@ -858,7 +966,7 @@ reconn_delay: true -=== TEST 20: can_try_peer() use reconnection policy to decide when node is down +=== TEST 22: can_try_peer() use reconnection policy to decide when node is down --- http_config eval: $::HttpConfig --- config location /t { @@ -922,7 +1030,7 @@ after delay: true true -=== TEST 21: next_coordinator() uses load balancing policy +=== TEST 23: next_coordinator() uses load balancing policy --- http_config eval: $::HttpConfig --- config location /t { @@ -959,7 +1067,7 @@ coordinator 3: 127.0.0.1 -=== TEST 22: next_coordinator() returns no host available errors +=== TEST 24: next_coordinator() returns no host available errors --- http_config eval: $::HttpConfig --- config location /t { @@ -1005,7 +1113,7 @@ all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down fo -=== TEST 23: next_coordinator() returns no host available errors with recorded errors +=== TEST 25: next_coordinator() returns no host available errors with recorded errors --- http_config eval: $::HttpConfig --- config location /t { @@ -1049,7 +1157,7 @@ all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down fo -=== TEST 24: next_coordinator() avoids down hosts +=== TEST 26: next_coordinator() avoids down hosts --- http_config eval: $::HttpConfig --- config location /t { @@ -1093,7 +1201,7 @@ GET /t -=== TEST 25: next_coordinator() marks nodes as down +=== TEST 27: next_coordinator() marks nodes as down --- http_config eval qq { lua_socket_log_errors off; @@ -1122,7 +1230,16 @@ qq { return end - local peers, err = cluster:get_peers() + local ok, err = cluster:set_peers(1, { + { host = "255.255.255.254" }, + { host = "255.255.255.253" }, + }) + if not ok then + ngx.log(ngx.ERR, err) + return + end + + local peers, err = cluster:get_peers(1) if not peers then ngx.log(ngx.ERR, err) return @@ -1130,7 +1247,6 @@ qq { -- init cluster as if it was refreshed cluster.lb_policy:init(peers) - cluster.init = true -- attempt to get next coordinator local coordinator, err = cluster:next_coordinator() @@ -1161,7 +1277,7 @@ can try peer 255.255.255.253: false -=== TEST 26: next_coordinator() retries down host as per reconnection policy and ups them back +=== TEST 28: next_coordinator() retries down host as per reconnection policy and ups them back --- http_config eval: $::HttpConfig --- config location /t { @@ -1236,7 +1352,7 @@ GET /t -=== TEST 27: next_coordinator() sets coordinator keyspace on connect +=== TEST 29: next_coordinator() sets coordinator keyspace on connect --- http_config eval: $::HttpConfig --- config location /t { diff --git a/t/08-execute.t b/t/08-execute.t index aad7206..6cf8f29 100644 --- a/t/08-execute.t +++ b/t/08-execute.t @@ -23,20 +23,23 @@ __DATA__ return end + ngx.say('topo_ver: ', cluster.topo_ver) + local rows, err = cluster:execute("SELECT * FROM system.peers") if not rows then ngx.log(ngx.ERR, err) return end - ngx.say('init: ', cluster.init) + ngx.say('topo_ver: ', cluster.topo_ver) ngx.say(#rows) } } --- request GET /t --- response_body -init: true +topo_ver: 0 +topo_ver: 1 2 --- no_error_log [error]