Skip to content

Commit

Permalink
fix(db) ensure postgres connector reentrancy
Browse files Browse the repository at this point in the history
Because the connector currently stores its connection in an
attribute, it is shared between all requests in a worker, which
will throw cosocket errors. Instead, `:query()` is now reentrant
by incoming requests (it instantiates and keepalive its own
connections). The connector `:connect()` and `:setkeepalive()`
methods are left untouched for now, but need further consideration
given their limitation with regards to their lack of reentrancy at runtime.

From #3423
  • Loading branch information
bungle authored and thibaultcha committed May 3, 2018
1 parent 88539c0 commit 31d9ed7
Showing 1 changed file with 67 additions and 82 deletions.
149 changes: 67 additions & 82 deletions kong/db/strategies/postgres/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -93,136 +93,134 @@ local function iterator(rows)
end


local _mt = {}


_mt.__index = _mt

local setkeepalive

function _mt:connect()
local connection = self.connection

if connection then
return true
end

local config = self.config
local function connect(config)
local phase = get_phase()

if phase == "init" or phase == "init_worker" or ngx.IS_CLI then
config.socket_type = "luasocket"

else
config.socket_type = "nginx"
end

local db = pgmoon.new(config)
local connection = pgmoon.new(config)

db.convert_null = true
db.NULL = null
connection.convert_null = true
connection.NULL = null

local ok, err = db:connect()
local ok, err = connection:connect()
if not ok then
return nil, err
end

self.connection = db

if db.sock:getreusedtimes() == 0 then
ok, err = self:query("SET TIME ZONE 'UTC';");
if connection.sock:getreusedtimes() == 0 then
ok, err = connection:query("SET TIME ZONE 'UTC';");
if not ok then
setkeepalive(connection)
return nil, err
end
end

return true
return connection
end


function _mt:setkeepalive()
local connection = self.connection

self.connection = nil

if not connection then
setkeepalive = function(connection)
if not connection or not connection.sock then
return nil, "no active connection"
end

local ok, err

if connection.sock_type == "luasocket" then
ok, err = connection:disconnect()
if not ok then
if err then
log(WARN, "unable to close postgres connection (", err, ")")

else
log(WARN, "unable to close postgres connection")
end

return nil, err
end

else
ok, err = connection:keepalive()
end
if not ok then
if err then
log(WARN, "unable to set keepalive for postgres connection (", err, ")")

if not ok then
return nil, err
else
log(WARN, "unable to set keepalive for postgres connection")
end

return nil, err
end
end

return true
end


function _mt:query(sql)
local connection = self.connection
local _mt = {}

if connection then
return connection:query(sql)
end

local ok, err = self:connect()
if not ok then
return nil, err
end
_mt.__index = _mt

local res, exc, partial, num_queries = self.connection:query(sql)
ok, err = self:setkeepalive()

if not ok then
log(WARN, err)
function _mt:connect()
if self.connection and self.connection.sock then
return true
end

if not res then
return nil, exc, partial, num_queries
local connection, err = connect(self.config)
if not connection then
return nil, err
end

return res, exc
self.connection = connection

return true
end


function _mt:iterate(sql)
local connection = self.connection
function _mt:setkeepalive()
local ok, err = setkeepalive(self.connection)

if connection then
local res, err, partial, num_queries = connection:query(sql)
if not res then
return nil, err, partial, num_queries
end
self.connection = nil

if res == true then
return iterator { true }
end
if not ok then
return nil, err
end

return true
end

return iterator(res)

function _mt:query(sql)
if self.connection and self.connection.sock then
return self.connection:query(sql)
end

local ok, err = self:connect()
if not ok then
local connection, err = connect(self.config)
if not connection then
return nil, err
end

local res, exc, partial, num_queries = self.connection:query(sql)
local res, exc, partial, num_queries = connection:query(sql)

ok, err = self:setkeepalive()
setkeepalive(connection)

if not ok then
log(WARN, err)
end
return res, exc, partial, num_queries
end


function _mt:iterate(sql)
local res, err, partial, num_queries = self:query(sql)
if not res then
return nil, exc, partial, num_queries
return nil, err, partial, num_queries
end

if res == true then
Expand All @@ -235,27 +233,15 @@ end

function _mt:reset()
local user = self:escape_identifier(self.config.user)

local ok, err = self:connect()
if not ok then
return nil, err
end

ok, err = self:query(concat {
local ok, err = self:query(concat {
"BEGIN;\n",
"DROP SCHEMA IF EXISTS public CASCADE;\n",
"CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION " .. user .. ";\n",
"GRANT ALL ON SCHEMA public TO " .. user .. ";\n",
"COMMIT;\n",
})

local success, exc = self:setkeepalive()
if not success then
log(WARN, exc)
end

if not ok then

return nil, err
end

Expand Down Expand Up @@ -312,13 +298,12 @@ function _mt:truncate()
return true
end

local truncate_statement = {
local truncate_statement = {
"TRUNCATE TABLE ", concat(table_names, ", "), " RESTART IDENTITY CASCADE;"
}

local ok, err = self:query(truncate_statement)
if not ok then

return nil, err
end

Expand Down

0 comments on commit 31d9ed7

Please sign in to comment.