Skip to content

Commit

Permalink
test/fix truncate mode
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Apr 16, 2024
1 parent ed386a2 commit 800c3cd
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 11 deletions.
8 changes: 1 addition & 7 deletions dlt/load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ def init_client(
# get dlt/internal tables
dlt_tables = set(schema.dlt_table_names())

all_tables = set(schema.tables.keys())
# tables without data (TODO: normalizer removes such jobs, write tests and remove the line below)
tables_no_data = set(
table["name"] for table in schema.data_tables() if not has_table_seen_data(table)
Expand All @@ -99,17 +98,12 @@ def init_client(
# get tables to truncate by extending tables with jobs with all their child tables

if refresh == "drop_data":
truncate_filter = lambda t: True
truncate_filter = lambda t: t["name"] in tables_with_jobs - dlt_tables

truncate_tables = set(
_extend_tables_with_table_chain(schema, tables_with_jobs, tables_with_jobs, truncate_filter)
)

# if refresh in ("drop_dataset", "drop_tables"):
# drop_tables = all_tables - dlt_tables - tables_no_data
# else:
# drop_tables = set()

applied_update = _init_dataset_and_update_schema(
job_client,
expected_update,
Expand Down
85 changes: 81 additions & 4 deletions tests/pipeline/test_refresh_modes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from tests.pipeline.utils import assert_load_info


def test_refresh_full():
def test_refresh_drop_dataset():
first_run = True

@dlt.source
Expand Down Expand Up @@ -73,15 +73,14 @@ def some_data_3():
assert result == [(1,), (2,)]


def test_refresh_replace():
def test_refresh_drop_tables():
first_run = True

@dlt.source
def my_source():
@dlt.resource
def some_data_1():
# Set some source and resource state
state = dlt.state()
if first_run:
dlt.state()["source_key_1"] = "source_value_1"
resource_state("some_data_1")["resource_key_1"] = "resource_value_1"
Expand Down Expand Up @@ -121,7 +120,7 @@ def some_data_3():
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="replace",
refresh="drop_tables",
dataset_name="refresh_full_test",
)

Expand All @@ -140,3 +139,81 @@ def some_data_3():
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT id FROM some_data_1 ORDER BY id")
assert result == [(1,), (2,)]


def test_refresh_drop_data_only():
"""Refresh drop_data should truncate all selected tables before load"""
first_run = True

@dlt.source
def my_source():
@dlt.resource
def some_data_1():
# Set some source and resource state
if first_run:
dlt.state()["source_key_1"] = "source_value_1"
resource_state("some_data_1")["resource_key_1"] = "resource_value_1"
resource_state("some_data_1")["resource_key_2"] = "resource_value_2"
else:
# State is cleared for all resources on second run
assert "source_key_1" in dlt.state()
assert "source_key_2" in dlt.state()
assert "source_key_3" in dlt.state()
# Resource 3 is not wiped
assert dlt.state()["resources"] == {
"some_data_3": {"resource_key_5": "resource_value_5"}
}
yield {"id": 1, "name": "John"}
yield {"id": 2, "name": "Jane"}

@dlt.resource
def some_data_2():
if first_run:
dlt.state()["source_key_2"] = "source_value_2"
resource_state("some_data_2")["resource_key_3"] = "resource_value_3"
resource_state("some_data_2")["resource_key_4"] = "resource_value_4"
yield {"id": 3, "name": "Joe"}
yield {"id": 4, "name": "Jill"}

@dlt.resource
def some_data_3():
if first_run:
dlt.state()["source_key_3"] = "source_value_3"
resource_state("some_data_3")["resource_key_5"] = "resource_value_5"
yield {"id": 5, "name": "Jack"}
yield {"id": 6, "name": "Jill"}

return [some_data_1, some_data_2, some_data_3]

# First run pipeline with load to destination so tables are created
pipeline = dlt.pipeline(
"refresh_full_test",
destination="duckdb",
refresh="drop_data",
dataset_name="refresh_full_test",
)

info = pipeline.run(my_source(), write_disposition="append")
assert_load_info(info)

# Second run of pipeline with only selected resources
first_run = False
info = pipeline.run(
my_source().with_resources("some_data_1", "some_data_2"), write_disposition="append"
)

# Tables selected in second run are truncated and should only have data from second run
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT id FROM some_data_2 ORDER BY id")
assert result == [(3,), (4,)]

with pipeline.sql_client() as client:
result = client.execute_sql("SELECT id FROM some_data_1 ORDER BY id")
assert result == [(1,), (2,)]

# TODO: Test tables were truncated , not dropped

# Tables not selected in second run are not truncated, still have data from first run
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT id FROM some_data_3 ORDER BY id")
assert result == [(5,), (6,)]

0 comments on commit 800c3cd

Please sign in to comment.