Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster) release lock on errors #140

Merged
merged 1 commit into from
Dec 18, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ local function compare_peers(t1, t2, tc)
end
end

local function err_with_unlock(lock, err)
local ok, unlock_err = lock:unlock()
if not ok then
err = err .. ' (failed to unlock refresh lock: '..unlock_err..')'
end
return nil, err
end

--- Refresh the list of nodes in the cluster.
-- Queries one of the specified `contact_points` to retrieve the list of
-- available nodes in the cluster, and update the configured policies.
Expand Down Expand Up @@ -573,21 +581,29 @@ function _Cluster:refresh(timeout)
' (ver_refresh=', ver_refresh, ')')

local coordinator, err, local_cp = first_coordinator(self)
if not coordinator then return nil, err end
if not coordinator then
return err_with_unlock(lock, err)
end

coordinator:settimeout(self.timeout_read)

local local_rows, err = coordinator:execute [[
SELECT data_center,rpc_address,release_version FROM system.local
]]
if not local_rows then return nil, err end
if not local_rows then
return err_with_unlock(lock, err)
end

assert(local_rows[1] ~= nil, 'local host could not be found')
if local_rows[1] == nil then
assert(err_with_unlock(lock, 'local host could not be found'))
end

local rows, err = coordinator:execute [[
SELECT peer,data_center,rpc_address,release_version FROM system.peers
]]
if not rows then return nil, err end
if not rows then
return err_with_unlock(lock, err)
end

coordinator:setkeepalive()

Expand Down Expand Up @@ -642,7 +658,7 @@ function _Cluster:refresh(timeout)
end
else
local old_peers, err = get_peers(self, ver_topo)
if err then return nil, err
if err then return err_with_unlock(lock, err)
elseif not old_peers then
log(WARN, _log_prefix, 'refresh: missing peers entry when comparing ',
'topologies (ver_refresh=', ver_refresh, ')')
Expand All @@ -669,17 +685,17 @@ function _Cluster:refresh(timeout)
local ok, err = add_peer(self, rows[i].host, true, 0, 0,
rows[i].data_center, nil,
rows[i].release_version)
if not ok then return nil, err end
if not ok then return err_with_unlock(lock, err) end
end
end

local ok, err = set_peers(self, ver_topo, rows,
coordinator.protocol_version)
if not ok then return nil, err end
if not ok then return err_with_unlock(lock, err) end
end

local ok, err = self.shm:set(_topo_version_key .. 'latest', ver_topo, 0, ver_refresh)
if not ok then return nil, 'failed to set topo and refresh versions: '..err end
if not ok then return err_with_unlock(lock, 'failed to set topo and refresh versions: '..err) end
end

local ok, err = lock:unlock()
Expand Down