Skip to content

Commit

Permalink
Merge pull request #72 from thibaultcha/fix/rpc_adress_bind_all
Browse files Browse the repository at this point in the history
fix(cluster) fallback on listen_address when rpc_address is "bind all"
  • Loading branch information
thibaultcha authored Oct 27, 2016
2 parents 3125e4c + 0c4a421 commit 68bee73
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ local requests = cql.requests
local tonumber = tonumber
local concat = table.concat
local shared = ngx.shared
local assert = assert
local pairs = pairs
local sub = string.sub
local now = ngx.now
local type = type
local log = ngx.log
local ERR = ngx.ERR
local WARN = ngx.WARN
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE
Expand All @@ -31,6 +33,7 @@ local _log_prefix = '[lua-cassandra] '
local _rec_key = 'host:rec:'
local _prepared_key = 'prepared:id:'
local _protocol_version_key = 'protocol:version:'
local _bind_all_address = '0.0.0.0'

ffi.cdef [[
size_t strlen(const char *str);
Expand Down Expand Up @@ -475,6 +478,8 @@ function _Cluster:refresh()
]]
if not local_rows then return nil, err end

assert(local_rows[1] ~= nil, 'local host could not be found')

local rows, err = coordinator:execute [[
SELECT peer,data_center,rpc_address,release_version FROM system.peers
]]
Expand All @@ -483,25 +488,41 @@ function _Cluster:refresh()
coordinator:setkeepalive()

rows[#rows+1] = { -- local host
rpc_address = local_rows[1].rpc_address,
rpc_address = coordinator.host,
data_center = local_rows[1].data_center,
release_version = local_rows[1].release_version
}

for i = 1, #rows do
local host = rows[i].rpc_address
local old_peer = old_peers[host]
local reconn_delay, unhealthy_at = 0, 0
local up = true
if old_peer then
up = old_peer.up
reconn_delay = old_peer.reconn_delay
unhealthy_at = old_peer.unhealthy_at
end
local row = rows[i]
local host = row.rpc_address
if not host then
log(ERR, _log_prefix, 'no rpc_address found for host ', row.peer,
' in ', coordinator.host, '\'s peers system ',
'table. ', row.peer, ' will be ignored.')
else
if host == _bind_all_address then
log(WARN, _log_prefix, 'found host with 0.0.0.0 as rpc_address, ',
'using listen_address ', row.peer, ' to ',
'contact it instead. If this is ',
'incorrect you should avoid using 0.0.0.0 ',
'server-side.')
host = row.peer
end

local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at,
rows[i].data_center, rows[i].release_version)
if not ok then return nil, err end
local old_peer = old_peers[host]
local reconn_delay, unhealthy_at = 0, 0
local up = true
if old_peer then
up = old_peer.up
reconn_delay = old_peer.reconn_delay
unhealthy_at = old_peer.unhealthy_at
end

local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at,
rows[i].data_center, rows[i].release_version)
if not ok then return nil, err end
end
end

peers, err = get_peers(self)
Expand Down

0 comments on commit 68bee73

Please sign in to comment.