diff --git a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py index 7dfbaa9015..3cd6133d47 100644 --- a/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py +++ b/dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py @@ -21,9 +21,9 @@ from dlt.common.destination.capabilities import DestinationCapabilitiesContext from dlt.common.schema import Schema, TTableSchema, TColumnSchema, TSchemaTables from dlt.common.schema.typing import TColumnType, TTableFormat, TTableSchemaColumns +from dlt.common.schema.utils import pipeline_state_table, normalize_table_identifiers from dlt.common.storages import FileStorage from dlt.common.json import json, PY_DATETIME_DECODERS -from dlt.common.utils import chunks from dlt.destinations.exceptions import DatabaseUndefinedRelation @@ -360,7 +360,7 @@ def update_stored_schema( self.schema.stored_version_hash, schema_info.inserted_at, ) - return None + return {} else: logger.info( "Schema with hash %s not found in storage, upgrading", @@ -462,9 +462,9 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]: return self._get_stored_schema(schema_name=self.schema.name) def get_stored_state(self, pipeline_name: str) -> StateInfo: - state_table = self.schema.tables.get(self.schema.state_table_name) - if state_table is None: - return None + state_table = self.schema.tables.get( + self.schema.state_table_name + ) or normalize_table_identifiers(pipeline_state_table(), self.schema.naming) state_table_obj = self._to_table_object(state_table) loads_table = self.schema.tables[self.schema.loads_table_name] loads_table_obj = self._to_table_object(loads_table) diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index b2197dd273..900e2b77bd 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -535,7 +535,9 @@ def _get_shuffled_events(shuffle: bool = dlt.secrets.value): @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, exclude=["sqlalchemy"]), + ids=lambda x: x.name, ) @pytest.mark.parametrize("github_resource", [github_repo_events, github_repo_events_table_meta]) def test_merge_with_dispatch_and_incremental( diff --git a/tests/load/pipeline/test_pipelines.py b/tests/load/pipeline/test_pipelines.py index 81c9292570..711b961896 100644 --- a/tests/load/pipeline/test_pipelines.py +++ b/tests/load/pipeline/test_pipelines.py @@ -1059,7 +1059,9 @@ def table_3(make_data=False): @pytest.mark.parametrize( - "destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name + "destination_config", + destinations_configs(default_sql_configs=True, exclude=["sqlalchemy"]), + ids=lambda x: x.name, ) def test_query_all_info_tables_fallback(destination_config: DestinationTestConfiguration) -> None: pipeline = destination_config.setup_pipeline( diff --git a/tests/load/utils.py b/tests/load/utils.py index a20049ae4e..fc130329e9 100644 --- a/tests/load/utils.py +++ b/tests/load/utils.py @@ -262,7 +262,8 @@ def destinations_configs( destination_configs += [ DestinationTestConfiguration(destination=destination) for destination in SQL_DESTINATIONS - if destination not in ("athena", "synapse", "databricks", "dremio", "clickhouse") + if destination + not in ("athena", "synapse", "databricks", "dremio", "clickhouse", "sqlalchemy") ] destination_configs += [ DestinationTestConfiguration(destination="duckdb", file_format="parquet"),