diff --git a/lib/resty/cassandra/cluster.lua b/lib/resty/cassandra/cluster.lua index c8c6e75..1558995 100644 --- a/lib/resty/cassandra/cluster.lua +++ b/lib/resty/cassandra/cluster.lua @@ -289,6 +289,7 @@ function _Cluster.new(opts) end local peers_opts = {} + local lock_opts = {} local dict_name = opts.shm or 'cassandra' if type(dict_name) ~= 'string' then return nil, 'shm must be a string' @@ -340,6 +341,11 @@ function _Cluster.new(opts) if type(v) ~= 'boolean' then return nil, 'retry_on_timeout must be a boolean' end + elseif k == 'lock_timeout' then + if type(v) ~= 'number' then + return nil, 'lock_timeout must be a number' + end + lock_opts.timeout = v elseif k == 'silent' then if type(v) ~= 'boolean' then return nil, 'silent must be a boolean' @@ -359,6 +365,7 @@ function _Cluster.new(opts) timeout_connect = opts.timeout_connect or 1000, retry_on_timeout = opts.retry_on_timeout == nil and true or opts.retry_on_timeout, max_schema_consensus_wait = opts.max_schema_consensus_wait or 10000, + lock_opts = lock_opts, logging = not opts.silent, lb_policy = opts.lb_policy @@ -446,7 +453,7 @@ function _Cluster:refresh() old_peers = {} -- empty shm end - local lock = resty_lock:new(self.dict_name) + local lock = resty_lock:new(self.dict_name, self.lock_opts) local elapsed, err = lock:lock('refresh') if not elapsed then return nil, 'failed to acquire refresh lock: '..err end @@ -588,7 +595,7 @@ local function get_or_prepare(self, coordinator, query) elseif not query_id then -- shm cache miss -- query not prepared yet, must prepare in mutex - local lock = resty_lock:new(self.dict_name) + local lock = resty_lock:new(self.dict_name, self.lock_opts) local elapsed, err = lock:lock('prepare:' .. query) if not elapsed then return nil, 'failed to acquire lock: '..err end