From 31d9ed7a0d5ffbf1e1e089de32048d04f735677b Mon Sep 17 00:00:00 2001 From: Aapo Talvensaari Date: Thu, 3 May 2018 23:16:53 +0300 Subject: [PATCH] fix(db) ensure postgres connector reentrancy Because the connector currently stores its connection in an attribute, it is shared between all requests in a worker, which will throw cosocket errors. Instead, `:query()` is now reentrant by incoming requests (it instantiates and keepalive its own connections). The connector `:connect()` and `:setkeepalive()` methods are left untouched for now, but need further consideration given their limitation with regards to their lack of reentrancy at runtime. From #3423 --- kong/db/strategies/postgres/connector.lua | 149 ++++++++++------------ 1 file changed, 67 insertions(+), 82 deletions(-) diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index f3355526638a..9340606ef785 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -93,22 +93,11 @@ local function iterator(rows) end -local _mt = {} - - -_mt.__index = _mt - +local setkeepalive -function _mt:connect() - local connection = self.connection - if connection then - return true - end - - local config = self.config +local function connect(config) local phase = get_phase() - if phase == "init" or phase == "init_worker" or ngx.IS_CLI then config.socket_type = "luasocket" @@ -116,113 +105,122 @@ function _mt:connect() config.socket_type = "nginx" end - local db = pgmoon.new(config) + local connection = pgmoon.new(config) - db.convert_null = true - db.NULL = null + connection.convert_null = true + connection.NULL = null - local ok, err = db:connect() + local ok, err = connection:connect() if not ok then return nil, err end - self.connection = db - - if db.sock:getreusedtimes() == 0 then - ok, err = self:query("SET TIME ZONE 'UTC';"); + if connection.sock:getreusedtimes() == 0 then + ok, err = connection:query("SET TIME ZONE 'UTC';"); if not ok then + setkeepalive(connection) return nil, err end end - return true + return connection end -function _mt:setkeepalive() - local connection = self.connection - - self.connection = nil - - if not connection then +setkeepalive = function(connection) + if not connection or not connection.sock then return nil, "no active connection" end local ok, err - if connection.sock_type == "luasocket" then ok, err = connection:disconnect() + if not ok then + if err then + log(WARN, "unable to close postgres connection (", err, ")") + + else + log(WARN, "unable to close postgres connection") + end + + return nil, err + end else ok, err = connection:keepalive() - end + if not ok then + if err then + log(WARN, "unable to set keepalive for postgres connection (", err, ")") - if not ok then - return nil, err + else + log(WARN, "unable to set keepalive for postgres connection") + end + + return nil, err + end end return true end -function _mt:query(sql) - local connection = self.connection +local _mt = {} - if connection then - return connection:query(sql) - end - local ok, err = self:connect() - if not ok then - return nil, err - end +_mt.__index = _mt - local res, exc, partial, num_queries = self.connection:query(sql) - ok, err = self:setkeepalive() - if not ok then - log(WARN, err) +function _mt:connect() + if self.connection and self.connection.sock then + return true end - if not res then - return nil, exc, partial, num_queries + local connection, err = connect(self.config) + if not connection then + return nil, err end - return res, exc + self.connection = connection + + return true end -function _mt:iterate(sql) - local connection = self.connection +function _mt:setkeepalive() + local ok, err = setkeepalive(self.connection) - if connection then - local res, err, partial, num_queries = connection:query(sql) - if not res then - return nil, err, partial, num_queries - end + self.connection = nil - if res == true then - return iterator { true } - end + if not ok then + return nil, err + end + + return true +end - return iterator(res) + +function _mt:query(sql) + if self.connection and self.connection.sock then + return self.connection:query(sql) end - local ok, err = self:connect() - if not ok then + local connection, err = connect(self.config) + if not connection then return nil, err end - local res, exc, partial, num_queries = self.connection:query(sql) + local res, exc, partial, num_queries = connection:query(sql) - ok, err = self:setkeepalive() + setkeepalive(connection) - if not ok then - log(WARN, err) - end + return res, exc, partial, num_queries +end + +function _mt:iterate(sql) + local res, err, partial, num_queries = self:query(sql) if not res then - return nil, exc, partial, num_queries + return nil, err, partial, num_queries end if res == true then @@ -235,13 +233,7 @@ end function _mt:reset() local user = self:escape_identifier(self.config.user) - - local ok, err = self:connect() - if not ok then - return nil, err - end - - ok, err = self:query(concat { + local ok, err = self:query(concat { "BEGIN;\n", "DROP SCHEMA IF EXISTS public CASCADE;\n", "CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION " .. user .. ";\n", @@ -249,13 +241,7 @@ function _mt:reset() "COMMIT;\n", }) - local success, exc = self:setkeepalive() - if not success then - log(WARN, exc) - end - if not ok then - return nil, err end @@ -312,13 +298,12 @@ function _mt:truncate() return true end - local truncate_statement = { + local truncate_statement = { "TRUNCATE TABLE ", concat(table_names, ", "), " RESTART IDENTITY CASCADE;" } local ok, err = self:query(truncate_statement) if not ok then - return nil, err end