Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

per-thread connections #1336

Merged
merged 2 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 110 additions & 152 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import abc
import multiprocessing
import os

import six

import dbt.exceptions
import dbt.flags
from dbt.api import APIObject
from dbt.compat import abstractclassmethod
from dbt.compat import abstractclassmethod, get_ident
from dbt.contracts.connection import Connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import translate_aliases
Expand Down Expand Up @@ -71,6 +72,7 @@ class BaseConnectionManager(object):
- open
- begin
- commit
- clear_transaction
- execute

You must also set the 'TYPE' class attribute with a class-unique constant
Expand All @@ -80,83 +82,95 @@ class BaseConnectionManager(object):

def __init__(self, profile):
self.profile = profile
self.in_use = {}
self.available = []
self.thread_connections = {}
self.lock = multiprocessing.RLock()
self._set_initial_connections()

def _set_initial_connections(self):
self.available = []
# set up the array of connections in the 'init' state.
# we add a magic number, 2 because there are overhead connections,
# one for pre- and post-run hooks and other misc operations that occur
# before the run starts, and one for integration tests.
for idx in range(self.profile.threads + 2):
self.available.append(self._empty_connection())

def _empty_connection(self):
return Connection(
type=self.TYPE,
name=None,
state='init',
transaction_open=False,
handle=None,
credentials=self.profile.credentials
)

@staticmethod
def get_thread_identifier():
# note that get_ident() may be re-used, but we should never experience
# that within a single process
return (os.getpid(), get_ident())

def get_thread_connection(self):
key = self.get_thread_identifier()
with self.lock:
if key not in self.thread_connections:
raise RuntimeError(
'connection never acquired for thread {}, have {}'
.format(key, list(self.thread_connections))
)
return self.thread_connections[key]

def get_if_exists(self):
key = self.get_thread_identifier()
with self.lock:
return self.thread_connections.get(key)

def clear_thread_connection(self):
key = self.get_thread_identifier()
with self.lock:
if key in self.thread_connections:
del self.thread_connections[key]

def clear_transaction(self):
"""Clear any existing transactions."""
conn = self.get_thread_connection()
if conn is not None:
if conn.transaction_open:
self._rollback(conn)
self.begin()
self.commit()

@abc.abstractmethod
def exception_handler(self, sql, connection_name='master'):
def exception_handler(self, sql):
"""Create a context manager that handles exceptions caused by database
interactions.

:param str sql: The SQL string that the block inside the context
manager is executing.
:param str connection_name: The name of the connection being used
:return: A context manager that handles exceptions raised by the
underlying database.
"""
raise dbt.exceptions.NotImplementedException(
'`exception_handler` is not implemented for this adapter!')

def get(self, name=None):
"""This is thread-safe as long as two threads don't use the same
"name".
"""
def set_connection_name(self, name=None):
if name is None:
# if a name isn't specified, we'll re-use a single handle
# named 'master'
name = 'master'

with self.lock:
if name in self.in_use:
return self.in_use[name]
conn = self.get_if_exists()
thread_id_key = self.get_thread_identifier()

logger.debug('Acquiring new {} connection "{}".'
.format(self.TYPE, name))
if conn is None:
conn = Connection(
type=self.TYPE,
name=None,
state='init',
transaction_open=False,
handle=None,
credentials=self.profile.credentials
)
self.thread_connections[thread_id_key] = conn

if not self.available:
raise dbt.exceptions.InternalException(
'Tried to request a new connection "{}" but '
'the maximum number of connections are already '
'allocated!'.format(name)
)
if conn.name == name and conn.state == 'open':
return conn

connection = self.available.pop()
# connection is temporarily neither in use nor available, but both
# collections are in a sane state, so we can release the lock.
logger.debug('Acquiring new {} connection "{}".'
.format(self.TYPE, name))

# this potentially calls open(), but does so without holding the lock
connection = self.assign(connection, name)

with self.lock:
if name in self.in_use:
raise dbt.exceptions.InternalException(
'Two threads concurrently tried to get the same name: {}'
.format(name)
)
self.in_use[name] = connection
if conn.state == 'open':
logger.debug(
'Re-using an available connection from the pool (formerly {}).'
.format(conn.name))
else:
logger.debug('Opening a new connection, currently in state {}'
.format(conn.state))
self.open(conn)

return connection
conn.name = name
return conn

@abc.abstractmethod
def cancel_open(self):
Expand All @@ -183,81 +197,39 @@ def open(cls, connection):
'`open` is not implemented for this adapter!'
)

def assign(self, conn, name):
"""Open a connection if it's not already open, and assign it name
regardless.

The caller is responsible for putting the assigned connection into the
in_use collection.

:param Connection conn: A connection, in any state.
:param str name: The name of the connection to set.
"""
if name is None:
name = 'master'

conn.name = name

if conn.state == 'open':
logger.debug('Re-using an available connection from the pool.')
else:
logger.debug('Opening a new connection, currently in state {}'
.format(conn.state))
conn = self.open(conn)

return conn

def _release_connection(self, conn):
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
conn.name = None
else:
self.close(conn)

def release(self, name):
def release(self):
with self.lock:
if name not in self.in_use:
conn = self.get_if_exists()
if conn is None:
return

to_release = self.in_use.pop(name)
# to_release is temporarily neither in use nor available, but both
# collections are in a sane state, so we can release the lock.

try:
self._release_connection(to_release)
except:
# if rollback or close failed, replace our busted connection with
# a new one
to_release = self._empty_connection()
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
else:
self.close(conn)
except Exception:
# if rollback or close failed, remove our busted connection
self.clear_thread_connection()
raise
finally:
# now that this connection has been rolled back and the name reset,
# or the connection has been closed, put it back on the available
# list
with self.lock:
self.available.append(to_release)

def cleanup_all(self):
with self.lock:
for name, connection in self.in_use.items():
if connection.state != 'closed':
for connection in self.thread_connections.values():
if connection.state not in {'closed', 'init'}:
logger.debug("Connection '{}' was left open."
.format(name))
.format(connection.name))
else:
logger.debug("Connection '{}' was properly closed."
.format(name))

conns_in_use = list(self.in_use.values())
for conn in conns_in_use + self.available:
self.close(conn)
.format(connection.name))
self.close(connection)

# garbage collect these connections
self.in_use.clear()
self._set_initial_connections()
self.thread_connections.clear()

@abc.abstractmethod
def begin(self, name):
def begin(self):
"""Begin a transaction. (passable)

:param str name: The name of the connection to use.
Expand All @@ -266,34 +238,32 @@ def begin(self, name):
'`begin` is not implemented for this adapter!'
)

def get_if_exists(self, name):
if name is None:
name = 'master'

if self.in_use.get(name) is None:
return

return self.get(name)

@abc.abstractmethod
def commit(self, connection):
"""Commit a transaction. (passable)

:param str name: The name of the connection to use.
"""
def commit(self):
"""Commit a transaction. (passable)"""
raise dbt.exceptions.NotImplementedException(
'`commit` is not implemented for this adapter!'
)

def _rollback_handle(self, connection):
@classmethod
def _rollback_handle(cls, connection):
"""Perform the actual rollback operation."""
connection.handle.rollback()

def _rollback(self, connection):
"""Roll back the given connection.
@classmethod
def _close_handle(cls, connection):
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug('On {}: Close'.format(connection.name))
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))

The connection does not have to be in in_use or available, so this
operation does not require the lock.
@classmethod
def _rollback(cls, connection):
"""Roll back the given connection.
"""
if dbt.flags.STRICT_MODE:
assert isinstance(connection, Connection)
Expand All @@ -304,7 +274,7 @@ def _rollback(self, connection):
'it does not have one open!'.format(connection.name))

logger.debug('On {}: ROLLBACK'.format(connection.name))
self._rollback_handle(connection)
cls._rollback_handle(connection)

connection.transaction_open = False

Expand All @@ -320,40 +290,28 @@ def close(cls, connection):
return connection

if connection.transaction_open and connection.handle:
connection.handle.rollback()
cls._rollback_handle(connection)
connection.transaction_open = False

# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))

cls._close_handle(connection)
connection.state = 'closed'

return connection

def commit_if_has_connection(self, name):
def commit_if_has_connection(self):
"""If the named connection exists, commit the current transaction.

:param str name: The name of the connection to use.
"""
connection = self.in_use.get(name)
connection = self.get_if_exists()
if connection:
self.commit(connection)

def clear_transaction(self, conn_name='master'):
conn = self.begin(conn_name)
self.commit(conn)
return conn_name
self.commit()

@abc.abstractmethod
def execute(self, sql, name=None, auto_begin=False, fetch=False):
def execute(self, sql, auto_begin=False, fetch=False):
"""Execute the given SQL.

:param str sql: The sql to execute.
:param Optional[str] name: The name to use for the connection.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
Expand Down
Loading