diff --git a/dlt/destinations/impl/sqlalchemy/db_api_client.py b/dlt/destinations/impl/sqlalchemy/db_api_client.py index 8c3df409aa..c6c8ba53d6 100644 --- a/dlt/destinations/impl/sqlalchemy/db_api_client.py +++ b/dlt/destinations/impl/sqlalchemy/db_api_client.py @@ -95,10 +95,9 @@ class DbApiProps: class SqlalchemyClient(SqlClientBase[Connection]): external_engine: bool = False - dialect: sa.engine.interfaces.Dialect - dialect_name: str dbapi = DbApiProps # type: ignore[assignment] migrations: Optional[MigrationMaker] = None # lazy init as needed + _engine: Optional[sa.engine.Engine] = None def __init__( self, @@ -111,20 +110,36 @@ def __init__( super().__init__(credentials.database, dataset_name, staging_dataset_name, capabilities) self.credentials = credentials + self.engine_args = engine_args or {} + if credentials.engine: - self.engine = credentials.engine + self._engine = credentials.engine self.external_engine = True else: - self.engine = sa.create_engine( - credentials.to_url().render_as_string(hide_password=False), - **(engine_args or {}), - ) + # Default to nullpool because we don't use connection pooling + self.engine_args.setdefault("poolclass", sa.pool.NullPool) self._current_connection: Optional[Connection] = None self._current_transaction: Optional[SqlaTransactionWrapper] = None self.metadata = sa.MetaData() - self.dialect = self.engine.dialect - self.dialect_name = self.dialect.name + + @property + def engine(self) -> sa.engine.Engine: + # Create engine lazily + if self._engine is not None: + return self._engine + self._engine = sa.create_engine( + self.credentials.to_url().render_as_string(hide_password=False), **self.engine_args + ) + return self._engine + + @property + def dialect(self) -> sa.engine.interfaces.Dialect: + return self.engine.dialect + + @property + def dialect_name(self) -> str: + return self.dialect.name def open_connection(self) -> Connection: if self._current_connection is None: @@ -136,6 +151,8 @@ def open_connection(self) -> Connection: def close_connection(self) -> None: if not self.external_engine: try: + if self._current_connection is not None: + self._current_connection.close() self.engine.dispose() finally: self._current_connection = None