Skip to content

Commit

Permalink
feat(cluster) better support for keyspace settings
Browse files Browse the repository at this point in the history
* do not set keyspace when spawning a coordinator from the load
balancing policy
* allow to override a cluster's `keyspace` setting from
`execute`/`batch`/`iterate`
* document new `coordinator_options` argument
* add the necessary tests
  • Loading branch information
thibaultcha committed Sep 22, 2016
1 parent cd604bb commit 53c5fc5
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 11 deletions.
9 changes: 8 additions & 1 deletion lib/cassandra/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ function _Host:connect()
if not ok then return nil, err end
end

local reused, err = self.sock:getreusedtimes()
local reused, err = self:getreusedtimes()
if not reused then return nil, err end

if reused < 1 then
Expand Down Expand Up @@ -265,6 +265,13 @@ function _Host:connect()
return true
end

function _Host:getreusedtimes(...)
if not self.sock then
return nil, 'no socket created'
end
return self.sock:getreusedtimes(...)
end

--- Set the timeout value.
-- @see https://github.com/openresty/lua-nginx-module#tcpsocksettimeout
-- @param[type=number] timeout Value in milliseconds (for connect/read/write).
Expand Down
49 changes: 43 additions & 6 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ local function spawn_peer(host, port, opts)
end

local function check_peer_health(self, host, retry)
-- TODO: maybe we ought not to care about the keyspace set in
-- peers_opts when simply checking the connction to a node.
local peer, err = spawn_peer(host, self.default_port, self.peers_opts)
if not peer then return nil, err
else
Expand Down Expand Up @@ -257,10 +255,9 @@ function _Cluster.new(opts)

for k, v in pairs(opts) do
if k == 'keyspace' then
if type(v) ~= 'string' then
if v and type(v) ~= 'string' then
return nil, 'keyspace must be a string'
end
peers_opts.keyspace = v
elseif k == 'ssl' then
if type(v) ~= 'boolean' then
return nil, 'ssl must be a boolean'
Expand Down Expand Up @@ -308,6 +305,7 @@ function _Cluster.new(opts)
dict_name = dict_name,
prepared_ids = {},
peers_opts = peers_opts,
keyspace = opts.keyspace,
default_port = opts.default_port or 9042,
contact_points = opts.contact_points or {'127.0.0.1'},
timeout_read = opts.timeout_read or 2000,
Expand Down Expand Up @@ -641,6 +639,38 @@ do
local batch_req = requests.batch.new
local prep_req = requests.execute_prepared.new

--- Coordinator options.
-- Options to pass to coordinators chosen by the load balancing policy
-- on `execute`/`batch`/`iterate`.
-- @field keyspace Keyspace to use for the current request.
-- (`string`, optional)
-- @table `coordinator_options`

local function prepare_coordinator(self, coordinator, coordinator_options)
local reused, err = coordinator:getreusedtimes()
if not reused then return nil, err end

local keyspace

if coordinator_options and coordinator_options.keyspace then
keyspace = coordinator_options.keyspace
elseif self.keyspace then
--elseif self.keyspace and reused < 1 then
-- Note: ideally we would not set the keyspace on each query, but for now there
-- is no way to know if a reused connection has its keyspace set or not,
-- so we must set the keyspace regardless of if the connection is coming
-- from the pool or is a new one.
keyspace = self.keyspace
end

if keyspace then
local res, err = coordinator:set_keyspace(keyspace)
if not res then return nil, err end
end

return true
end

--- Execute a query.
-- Sends a request to the coordinator chosen by the configured load
-- balancing policy. The policy always chooses nodes that are considered
Expand Down Expand Up @@ -672,12 +702,13 @@ do
--
-- @param[type=string] query CQL query to execute.
-- @param[type=table] args (optional) Arguments to bind to the query.
-- @param[type=table] options (optional) Options from `query_options`
-- @param[type=table] options (optional) Options from `query_options`.
-- @param[type=table] coordinator_options (optional) Options from `coordinator_options`.
-- for this query.
-- @treturn table `res`: Table holding the query result if success, `nil` if failure.
-- @treturn string `err`: String describing the error if failure.
-- @treturn number `cql_err`: If a server-side error occurred, the CQL error code.
function _Cluster:execute(query, args, options)
function _Cluster:execute(query, args, options, coordinator_options)
if not self.init then
local ok, err = self:refresh()
if not ok then return nil, 'could not refresh cluster: '..err end
Expand All @@ -686,6 +717,9 @@ do
local coordinator, err = next_coordinator(self)
if not coordinator then return nil, err end

local ok, err = prepare_coordinator(self, coordinator, coordinator_options)
if not ok then return nil, err end

local request
local opts = get_request_opts(options)

Expand Down Expand Up @@ -735,6 +769,9 @@ do
local coordinator, err = next_coordinator(self)
if not coordinator then return nil, err end

local ok, err = prepare_coordinator(self, coordinator, options)
if not ok then return nil, err end

local opts = get_request_opts(options)

if opts.prepared then
Expand Down
10 changes: 8 additions & 2 deletions t/06-cluster.t
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ GET /t
if not cluster then
ngx.say(err)
end

cluster, err = Cluster.new({keyspace = 123})
if not cluster then
ngx.say(err)
end
}
}
--- request
Expand All @@ -59,6 +64,7 @@ GET /t
opts must be a table
shm must be a string
no shared dict invalid_shm
keyspace must be a string
--- no_error_log
[error]

Expand Down Expand Up @@ -125,7 +131,7 @@ true opt: true



=== TEST 5: cluster.new() peers opts
=== TEST 5: cluster.new() peers opts and keyspace
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand All @@ -143,7 +149,7 @@ true opt: true
return
end

ngx.say('keyspace: ', cluster.peers_opts.keyspace)
ngx.say('keyspace: ', cluster.keyspace)
ngx.say('ssl: ', cluster.peers_opts.ssl)
ngx.say('verify: ', cluster.peers_opts.verify)
ngx.say('auth: ', type(cluster.peers_opts.auth))
Expand Down
92 changes: 90 additions & 2 deletions t/08-execute.t
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,95 @@ local



=== TEST 6: opts.prepared: prepares a query
=== TEST 6: keyspace is optional
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new()
if not cluster then
ngx.log(ngx.ERR, err)
return
end

local rows, err = cluster:execute('SELECT * FROM local WHERE key = ?', {
'local'
})
if not rows then
ngx.log(ngx.ERR, err)
return
end

ngx.say(rows[1].key)
}
}
--- request
GET /t
--- response_body

--- error_log eval
qr{\[error\] .*? \[Invalid\] No keyspace has been specified\. USE a keyspace, or explicitly specify keyspace\.tablename}



=== TEST 7: opts.keyspace overrides the cluster's keyspace
--- http_config eval
qq{
$::HttpConfig
init_worker_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new {
timeout_read = 10000
}
if not cluster then
ngx.log(ngx.ERR, 'could not create cluster: ', err)
return
end
assert(cluster:execute [[
CREATE KEYSPACE IF NOT EXISTS lua_resty_tests WITH REPLICATION = {
'class': 'SimpleStrategy',
'replication_factor': 1
}
]])
}
}
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new {
keyspace = 'lua_resty_tests'
}
if not cluster then
ngx.log(ngx.ERR, 'could not create cluster: ', err)
return
end

local rows, err = cluster:execute('SELECT * FROM local WHERE key = ?', {
'local'
}, nil, {
keyspace = 'system'
})
if not rows then
ngx.log(ngx.ERR, 'could not select local: ', err)
return
end

ngx.say(rows[1].key)
}
}
--- request
GET /t
--- response_body
local
--- no_error_log
[error]



=== TEST 8: opts.prepared: prepares a query
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -204,7 +292,7 @@ has shm cache: true



=== TEST 7: opts.prepared: returns errors
=== TEST 9: opts.prepared: returns errors
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down

0 comments on commit 53c5fc5

Please sign in to comment.