Skip to content

Commit

Permalink
Use nullpool, lazy create engine, close current connection
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Sep 13, 2024
1 parent eec4e22 commit dc4c29c
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ class DbApiProps:

class SqlalchemyClient(SqlClientBase[Connection]):
external_engine: bool = False
dialect: sa.engine.interfaces.Dialect
dialect_name: str
dbapi = DbApiProps # type: ignore[assignment]
migrations: Optional[MigrationMaker] = None # lazy init as needed
_engine: Optional[sa.engine.Engine] = None

def __init__(
self,
Expand All @@ -111,20 +110,36 @@ def __init__(
super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities)
self.credentials = credentials

self.engine_args = engine_args or {}

if credentials.engine:
self.engine = credentials.engine
self._engine = credentials.engine
self.external_engine = True
else:
self.engine = sa.create_engine(
credentials.to_url().render_as_string(hide_password=False),
**(engine_args or {}),
)
# Default to nullpool because we don't use connection pooling
self.engine_args.setdefault("poolclass", sa.pool.NullPool)

self._current_connection: Optional[Connection] = None
self._current_transaction: Optional[SqlaTransactionWrapper] = None
self.metadata = sa.MetaData()
self.dialect = self.engine.dialect
self.dialect_name = self.dialect.name

@property
def engine(self) -> sa.engine.Engine:
# Create engine lazily
if self._engine is not None:
return self._engine
self._engine = sa.create_engine(
self.credentials.to_url().render_as_string(hide_password=False), **self.engine_args
)
return self._engine

@property
def dialect(self) -> sa.engine.interfaces.Dialect:
return self.engine.dialect

@property
def dialect_name(self) -> str:
return self.dialect.name

def open_connection(self) -> Connection:
if self._current_connection is None:
Expand All @@ -136,6 +151,8 @@ def open_connection(self) -> Connection:
def close_connection(self) -> None:
if not self.external_engine:
try:
if self._current_connection is not None:
self._current_connection.close()
self.engine.dispose()
finally:
self._current_connection = None
Expand Down

0 comments on commit dc4c29c

Please sign in to comment.