Skip to content

Commit

Permalink
Fix state, schema restore
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Aug 30, 2024
1 parent 6b62d37 commit aee8ada
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 8 deletions.
10 changes: 5 additions & 5 deletions dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema import Schema, TTableSchema, TColumnSchema, TSchemaTables
from dlt.common.schema.typing import TColumnType, TTableFormat, TTableSchemaColumns
from dlt.common.schema.utils import pipeline_state_table, normalize_table_identifiers
from dlt.common.storages import FileStorage
from dlt.common.json import json, PY_DATETIME_DECODERS
from dlt.common.utils import chunks
from dlt.destinations.exceptions import DatabaseUndefinedRelation


Expand Down Expand Up @@ -360,7 +360,7 @@ def update_stored_schema(
self.schema.stored_version_hash,
schema_info.inserted_at,
)
return None
return {}
else:
logger.info(
"Schema with hash %s not found in storage, upgrading",
Expand Down Expand Up @@ -462,9 +462,9 @@ def get_stored_schema(self) -> Optional[StorageSchemaInfo]:
return self._get_stored_schema(schema_name=self.schema.name)

def get_stored_state(self, pipeline_name: str) -> StateInfo:
state_table = self.schema.tables.get(self.schema.state_table_name)
if state_table is None:
return None
state_table = self.schema.tables.get(
self.schema.state_table_name
) or normalize_table_identifiers(pipeline_state_table(), self.schema.naming)
state_table_obj = self._to_table_object(state_table)
loads_table = self.schema.tables[self.schema.loads_table_name]
loads_table_obj = self._to_table_object(loads_table)
Expand Down
4 changes: 3 additions & 1 deletion tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,9 @@ def _get_shuffled_events(shuffle: bool = dlt.secrets.value):


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, exclude=["sqlalchemy"]),
ids=lambda x: x.name,
)
@pytest.mark.parametrize("github_resource", [github_repo_events, github_repo_events_table_meta])
def test_merge_with_dispatch_and_incremental(
Expand Down
4 changes: 3 additions & 1 deletion tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,9 @@ def table_3(make_data=False):


@pytest.mark.parametrize(
"destination_config", destinations_configs(default_sql_configs=True), ids=lambda x: x.name
"destination_config",
destinations_configs(default_sql_configs=True, exclude=["sqlalchemy"]),
ids=lambda x: x.name,
)
def test_query_all_info_tables_fallback(destination_config: DestinationTestConfiguration) -> None:
pipeline = destination_config.setup_pipeline(
Expand Down
3 changes: 2 additions & 1 deletion tests/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ def destinations_configs(
destination_configs += [
DestinationTestConfiguration(destination=destination)
for destination in SQL_DESTINATIONS
if destination not in ("athena", "synapse", "databricks", "dremio", "clickhouse")
if destination
not in ("athena", "synapse", "databricks", "dremio", "clickhouse", "sqlalchemy")
]
destination_configs += [
DestinationTestConfiguration(destination="duckdb", file_format="parquet"),
Expand Down

0 comments on commit aee8ada

Please sign in to comment.