Skip to content

Commit

Permalink
Fix: use correct check for column prop in column schema (#1347)
Browse files Browse the repository at this point in the history
* fix: use correct check for column prop in column schema

* uses helper function to detect defaults when getting columns with props, adds exceptions and tests

* fixes incomplete column in relational is_complex_type

* applies NOT NULL when key prop is set when column exists

---------

Co-authored-by: Marcin Rudolf <[email protected]>
  • Loading branch information
z3z1ma and rudolfix authored May 12, 2024
1 parent e4d3647 commit d26c5d0
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 37 deletions.
2 changes: 1 addition & 1 deletion dlt/common/normalizers/json/relational.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo
table = schema.tables.get(table_name)
if table:
column = table["columns"].get(field_name)
if column is None:
if column is None or "data_type" not in column:
data_type = schema.get_preferred_type(field_name)
else:
data_type = column["data_type"]
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: Str
for column_name in table:
if column_name in row:
hint_value = table[column_name][column_prop]
if not utils.has_default_column_hint_value(column_prop, hint_value):
if not utils.has_default_column_prop_value(column_prop, hint_value):
rv_row[column_name] = row[column_name]
except KeyError:
for k, v in row.items():
Expand Down Expand Up @@ -702,7 +702,7 @@ def _infer_column(
for hint in COLUMN_HINTS:
column_prop = utils.hint_to_column_prop(hint)
hint_value = self._infer_hint(hint, v, k)
if not utils.has_default_column_hint_value(column_prop, hint_value):
if not utils.has_default_column_prop_value(column_prop, hint_value):
column_schema[column_prop] = hint_value

if is_variant:
Expand Down
28 changes: 15 additions & 13 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,26 @@ def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema:
return stored_schema


def has_default_column_hint_value(hint: str, value: Any) -> bool:
"""Checks if `value` is a default for `hint`. Only known column hints (COLUMN_HINTS) are checked"""
def has_default_column_prop_value(prop: str, value: Any) -> bool:
"""Checks if `value` is a default for `prop`."""
# remove all boolean hints that are False, except "nullable" which is removed when it is True
if hint in COLUMN_HINTS and value is False:
return True
if hint == "nullable" and value is True:
return True
return False
# TODO: merge column props and hints
if prop in COLUMN_HINTS:
return value in (False, None)
# TODO: type all the hints including default value so those exceptions may be removed
if prop == "nullable":
return value in (True, None)
if prop == "x-active-record-timestamp":
# None is a valid value so it is not a default
return False
return value in (None, False)


def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema:
"""Removes default values from `column_schema` in place, returns the input for chaining"""
# remove hints with default values
for h in list(column_schema.keys()):
if has_default_column_hint_value(h, column_schema[h]): # type: ignore
del column_schema[h] # type: ignore
elif column_schema[h] is None: # type: ignore
if has_default_column_prop_value(h, column_schema[h]): # type: ignore
del column_schema[h] # type: ignore
return column_schema

Expand Down Expand Up @@ -481,12 +484,11 @@ def hint_to_column_prop(h: TColumnHint) -> TColumnProp:
def get_columns_names_with_prop(
table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False
) -> List[str]:
# column_prop: TColumnProp = hint_to_column_prop(hint_type)
# default = column_prop != "nullable" # default is true, only for nullable false
return [
c["name"]
for c in table["columns"].values()
if (bool(c.get(column_prop, False)) or c.get(column_prop, False) is None)
if column_prop in c
and not has_default_column_prop_value(column_prop, c[column_prop]) # type: ignore[literal-required]
and (include_incomplete or is_complete_column(c))
]

Expand Down
1 change: 1 addition & 0 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def _compute_and_update_table(
computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key]
existing_table = self.schema._schema_tables.get(table_name, None)
if existing_table:
# TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely
diff_table = utils.diff_table(existing_table, computed_table)
else:
diff_table = computed_table
Expand Down
4 changes: 3 additions & 1 deletion dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche
keys = [keys]
for key in keys:
if key in partial["columns"]:
merge_column(partial["columns"][key], {hint: True}) # type: ignore
# set nullable to False if not set
nullable = partial["columns"][key].get("nullable", False)
merge_column(partial["columns"][key], {hint: True, "nullable": nullable}) # type: ignore
else:
partial["columns"][key] = new_column(key, nullable=False)
partial["columns"][key][hint] = True
Expand Down
79 changes: 63 additions & 16 deletions tests/common/schema/test_merges.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any
import pytest
from copy import copy, deepcopy

Expand All @@ -15,40 +16,82 @@
"foreign_key": True,
"data_type": "text",
"name": "test",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"nullable": False,
"x-special-bool-true": True,
"x-special-bool": False,
"prop": None,
}

COL_1_HINTS_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key]
COL_1_HINTS_NO_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key]
"foreign_key": True,
"data_type": "text",
"name": "test",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"nullable": False,
"x-special-bool": False,
"x-special-bool-true": True,
}

COL_2_HINTS: TColumnSchema = {"nullable": True, "name": "test_2", "primary_key": False}


def test_check_column_defaults() -> None:
assert utils.has_default_column_hint_value("data_type", "text") is False
assert utils.has_default_column_hint_value("name", 123) is False
assert utils.has_default_column_hint_value("nullable", True) is True
assert utils.has_default_column_hint_value("nullable", False) is False
assert utils.has_default_column_hint_value("x-special", False) is False
assert utils.has_default_column_hint_value("unique", False) is True
assert utils.has_default_column_hint_value("unique", True) is False
@pytest.mark.parametrize(
"prop,value,is_default",
(
("data_type", "text", False),
("data_type", None, True),
("name", "xyz", False),
("name", None, True),
("nullable", True, True),
("nullable", False, False),
("nullable", None, True),
("x-special", False, True),
("x-special", True, False),
("x-special", None, True),
("unique", False, True),
("unique", True, False),
("dedup_sort", "asc", False),
("dedup_sort", None, True),
("x-active-record-timestamp", None, False),
("x-active-record-timestamp", "2100-01-01", False),
),
)
def test_check_column_with_props(prop: str, value: Any, is_default: bool) -> None:
# check default
assert utils.has_default_column_prop_value(prop, value) is is_default
# check if column with prop is found
if prop == "name" and is_default:
# do not check name not present
return
column: TColumnSchema = {"name": "column_a"}
column[prop] = value # type: ignore[literal-required]
table = utils.new_table("test", columns=[column])
expected_columns = [column["name"]] if not is_default else []
expected_column = column["name"] if not is_default else None
assert (
utils.get_columns_names_with_prop(table, prop, include_incomplete=True) == expected_columns
)
assert (
utils.get_first_column_name_with_prop(table, prop, include_incomplete=True)
== expected_column
)
assert utils.has_column_with_prop(table, prop, include_incomplete=True) is not is_default
# if data_type is set, column is complete
if prop == "data_type" and not is_default:
assert (
utils.get_columns_names_with_prop(table, prop, include_incomplete=False)
== expected_columns
)
else:
assert utils.get_columns_names_with_prop(table, prop, include_incomplete=False) == []


def test_column_remove_defaults() -> None:
clean = utils.remove_column_defaults(copy(COL_1_HINTS))
# mind that nullable default is False and Nones will be removed
assert clean == COL_1_HINTS_DEFAULTS
assert clean == COL_1_HINTS_NO_DEFAULTS
# check nullable True
assert utils.remove_column_defaults(copy(COL_2_HINTS)) == {"name": "test_2"}

Expand All @@ -59,9 +102,11 @@ def test_column_add_defaults() -> None:
assert full["unique"] is False
# remove defaults from full
clean = utils.remove_column_defaults(copy(full))
assert clean == COL_1_HINTS_DEFAULTS
assert clean == COL_1_HINTS_NO_DEFAULTS
# prop is None and will be removed
del full["prop"] # type: ignore[typeddict-item]
# same for x-special-bool
del full["x-special-bool"] # type: ignore[typeddict-item]
assert utils.add_column_defaults(copy(clean)) == full

# test incomplete
Expand Down Expand Up @@ -140,9 +185,10 @@ def test_merge_columns() -> None:
"cluster": False,
"foreign_key": True,
"data_type": "text",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"x-special-bool": False,
"x-special-bool-true": True,
"prop": None,
}

Expand All @@ -154,9 +200,10 @@ def test_merge_columns() -> None:
"cluster": False,
"foreign_key": True,
"data_type": "text",
"x-special": True,
"x-special": "value",
"x-special-int": 100,
"x-special-bool": False,
"x-special-bool-true": True,
"prop": None,
"primary_key": False,
}
Expand Down
66 changes: 66 additions & 0 deletions tests/extract/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from dlt.common.exceptions import DictValidationException, PipelineStateNotAvailable
from dlt.common.pipeline import StateInjectableContext, source_state
from dlt.common.schema import Schema
from dlt.common.schema.typing import TColumnProp, TColumnSchema
from dlt.common.schema import utils
from dlt.common.typing import TDataItems

from dlt.extract import DltResource, DltSource, Incremental
Expand Down Expand Up @@ -1314,6 +1316,7 @@ def empty_gen():
assert empty_r.compute_table_schema()["columns"]["tags"] == {
"data_type": "complex",
"name": "tags",
"nullable": False, # NOT NULL because `tags` do not define it
"primary_key": True,
"merge_key": True,
}
Expand Down Expand Up @@ -1436,6 +1439,69 @@ def empty_gen():
)


@pytest.mark.parametrize("key_prop", ("primary_key", "merge_key"))
def test_apply_hints_keys(key_prop: TColumnProp) -> None:
def empty_gen():
yield [1, 2, 3]

key_columns = ["id_1", "id_2"]

empty = DltResource.from_data(empty_gen)
# apply compound key
empty.apply_hints(**{key_prop: key_columns}) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns
# nullable is false
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns

# apply new key
key_columns_2 = ["id_1", "id_3"]
empty.apply_hints(**{key_prop: key_columns_2}) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns_2
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns_2

# if column is present for a key, it get merged and nullable should be preserved
id_2_col: TColumnSchema = {
"name": "id_2",
"data_type": "bigint",
}

empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert set(actual_keys) == set(key_columns)
# nullable not set in id_2_col so NOT NULL is set
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert set(actual_keys) == set(key_columns)

id_2_col["nullable"] = True
empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert set(actual_keys) == set(key_columns)
# id_2 set to NULL
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert set(actual_keys) == {"id_1"}

# apply key via schema
key_columns_3 = ["id_2", "id_1", "id_3"]
id_2_col[key_prop] = True

empty = DltResource.from_data(empty_gen)
empty.apply_hints(**{key_prop: key_columns_2}, columns=[id_2_col]) # type: ignore
table = empty.compute_table_schema()
# all 3 columns have the compound key. we do not prevent setting keys via schema
actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True)
assert actual_keys == key_columns_3
actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True)
assert actual_keys == key_columns_2


def test_resource_no_template() -> None:
empty = DltResource.from_data([1, 2, 3], name="table")
assert empty.write_disposition == "append"
Expand Down
15 changes: 11 additions & 4 deletions tests/load/pipeline/test_merge_disposition.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,7 @@ def r():
info = p.run(r(), loader_file_format=destination_config.file_format)


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(default_sql_configs=True, supports_merge=True),
Expand All @@ -819,7 +820,10 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No
name=table_name,
write_disposition="merge",
primary_key="id", # sort hints only have effect when a primary key is provided
columns={"sequence": {"dedup_sort": "desc"}},
columns={
"sequence": {"dedup_sort": "desc", "nullable": False},
"val": {"dedup_sort": None},
},
)
def data_resource(data):
yield data
Expand Down Expand Up @@ -847,7 +851,7 @@ def data_resource(data):
assert sorted(observed, key=lambda d: d["id"]) == expected

# now test "asc" sorting
data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc"}})
data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc", "nullable": False}})

info = p.run(data_resource(data), loader_file_format=destination_config.file_format)
assert_load_info(info)
Expand All @@ -866,7 +870,7 @@ def data_resource(data):
table_name = "test_dedup_sort_hint_complex"
data_resource.apply_hints(
table_name=table_name,
columns={"sequence": {"dedup_sort": "desc"}},
columns={"sequence": {"dedup_sort": "desc", "nullable": False}},
)

# three records with same primary key
Expand All @@ -890,7 +894,10 @@ def data_resource(data):
table_name = "test_dedup_sort_hint_with_hard_delete"
data_resource.apply_hints(
table_name=table_name,
columns={"sequence": {"dedup_sort": "desc"}, "deleted": {"hard_delete": True}},
columns={
"sequence": {"dedup_sort": "desc", "nullable": False},
"deleted": {"hard_delete": True},
},
)

# three records with same primary key
Expand Down

0 comments on commit d26c5d0

Please sign in to comment.