From 0c4a421b054b9dfa6a2cd2390df8f34b2302eb4f Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 27 Oct 2016 11:53:57 -0700 Subject: [PATCH] fix(cluster) fallback on listen_address when rpc_address is "bind all" --- lib/resty/cassandra/cluster.lua | 47 ++++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/resty/cassandra/cluster.lua b/lib/resty/cassandra/cluster.lua index b592e59..923d865 100644 --- a/lib/resty/cassandra/cluster.lua +++ b/lib/resty/cassandra/cluster.lua @@ -16,11 +16,13 @@ local requests = cql.requests local tonumber = tonumber local concat = table.concat local shared = ngx.shared +local assert = assert local pairs = pairs local sub = string.sub local now = ngx.now local type = type local log = ngx.log +local ERR = ngx.ERR local WARN = ngx.WARN local DEBUG = ngx.DEBUG local NOTICE = ngx.NOTICE @@ -31,6 +33,7 @@ local _log_prefix = '[lua-cassandra] ' local _rec_key = 'host:rec:' local _prepared_key = 'prepared:id:' local _protocol_version_key = 'protocol:version:' +local _bind_all_address = '0.0.0.0' ffi.cdef [[ size_t strlen(const char *str); @@ -475,6 +478,8 @@ function _Cluster:refresh() ]] if not local_rows then return nil, err end + assert(local_rows[1] ~= nil, 'local host could not be found') + local rows, err = coordinator:execute [[ SELECT peer,data_center,rpc_address,release_version FROM system.peers ]] @@ -483,25 +488,41 @@ function _Cluster:refresh() coordinator:setkeepalive() rows[#rows+1] = { -- local host - rpc_address = local_rows[1].rpc_address, + rpc_address = coordinator.host, data_center = local_rows[1].data_center, release_version = local_rows[1].release_version } for i = 1, #rows do - local host = rows[i].rpc_address - local old_peer = old_peers[host] - local reconn_delay, unhealthy_at = 0, 0 - local up = true - if old_peer then - up = old_peer.up - reconn_delay = old_peer.reconn_delay - unhealthy_at = old_peer.unhealthy_at - end + local row = rows[i] + local host = row.rpc_address + if not host then + log(ERR, _log_prefix, 'no rpc_address found for host ', row.peer, + ' in ', coordinator.host, '\'s peers system ', + 'table. ', row.peer, ' will be ignored.') + else + if host == _bind_all_address then + log(WARN, _log_prefix, 'found host with 0.0.0.0 as rpc_address, ', + 'using listen_address ', row.peer, ' to ', + 'contact it instead. If this is ', + 'incorrect you should avoid using 0.0.0.0 ', + 'server-side.') + host = row.peer + end - local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, - rows[i].data_center, rows[i].release_version) - if not ok then return nil, err end + local old_peer = old_peers[host] + local reconn_delay, unhealthy_at = 0, 0 + local up = true + if old_peer then + up = old_peer.up + reconn_delay = old_peer.reconn_delay + unhealthy_at = old_peer.unhealthy_at + end + + local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at, + rows[i].data_center, rows[i].release_version) + if not ok then return nil, err end + end end peers, err = get_peers(self)