From d26c5d0dae10ff4a5eaba756047d69068e6fd06c Mon Sep 17 00:00:00 2001 From: Alexander Butler <41213451+z3z1ma@users.noreply.github.com> Date: Sun, 12 May 2024 22:23:42 +0100 Subject: [PATCH] Fix: use correct check for column prop in column schema (#1347) * fix: use correct check for column prop in column schema * uses helper function to detect defaults when getting columns with props, adds exceptions and tests * fixes incomplete column in relational is_complex_type * applies NOT NULL when key prop is set when column exists --------- Co-authored-by: Marcin Rudolf --- dlt/common/normalizers/json/relational.py | 2 +- dlt/common/schema/schema.py | 4 +- dlt/common/schema/utils.py | 28 ++++--- dlt/extract/extractors.py | 1 + dlt/extract/hints.py | 4 +- tests/common/schema/test_merges.py | 79 +++++++++++++++---- tests/extract/test_sources.py | 66 ++++++++++++++++ tests/load/pipeline/test_merge_disposition.py | 15 +++- 8 files changed, 162 insertions(+), 37 deletions(-) diff --git a/dlt/common/normalizers/json/relational.py b/dlt/common/normalizers/json/relational.py index 572bff14a5..bad275ca4f 100644 --- a/dlt/common/normalizers/json/relational.py +++ b/dlt/common/normalizers/json/relational.py @@ -95,7 +95,7 @@ def _is_complex_type(self, table_name: str, field_name: str, _r_lvl: int) -> boo table = schema.tables.get(table_name) if table: column = table["columns"].get(field_name) - if column is None: + if column is None or "data_type" not in column: data_type = schema.get_preferred_type(field_name) else: data_type = column["data_type"] diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index 740e578ef2..fb7ad226f1 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -446,7 +446,7 @@ def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: Str for column_name in table: if column_name in row: hint_value = table[column_name][column_prop] - if not utils.has_default_column_hint_value(column_prop, hint_value): + if not utils.has_default_column_prop_value(column_prop, hint_value): rv_row[column_name] = row[column_name] except KeyError: for k, v in row.items(): @@ -702,7 +702,7 @@ def _infer_column( for hint in COLUMN_HINTS: column_prop = utils.hint_to_column_prop(hint) hint_value = self._infer_hint(hint, v, k) - if not utils.has_default_column_hint_value(column_prop, hint_value): + if not utils.has_default_column_prop_value(column_prop, hint_value): column_schema[column_prop] = hint_value if is_variant: diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index b9358d7de8..10b7607a95 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -115,23 +115,26 @@ def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema: return stored_schema -def has_default_column_hint_value(hint: str, value: Any) -> bool: - """Checks if `value` is a default for `hint`. Only known column hints (COLUMN_HINTS) are checked""" +def has_default_column_prop_value(prop: str, value: Any) -> bool: + """Checks if `value` is a default for `prop`.""" # remove all boolean hints that are False, except "nullable" which is removed when it is True - if hint in COLUMN_HINTS and value is False: - return True - if hint == "nullable" and value is True: - return True - return False + # TODO: merge column props and hints + if prop in COLUMN_HINTS: + return value in (False, None) + # TODO: type all the hints including default value so those exceptions may be removed + if prop == "nullable": + return value in (True, None) + if prop == "x-active-record-timestamp": + # None is a valid value so it is not a default + return False + return value in (None, False) def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema: """Removes default values from `column_schema` in place, returns the input for chaining""" # remove hints with default values for h in list(column_schema.keys()): - if has_default_column_hint_value(h, column_schema[h]): # type: ignore - del column_schema[h] # type: ignore - elif column_schema[h] is None: # type: ignore + if has_default_column_prop_value(h, column_schema[h]): # type: ignore del column_schema[h] # type: ignore return column_schema @@ -481,12 +484,11 @@ def hint_to_column_prop(h: TColumnHint) -> TColumnProp: def get_columns_names_with_prop( table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False ) -> List[str]: - # column_prop: TColumnProp = hint_to_column_prop(hint_type) - # default = column_prop != "nullable" # default is true, only for nullable false return [ c["name"] for c in table["columns"].values() - if (bool(c.get(column_prop, False)) or c.get(column_prop, False) is None) + if column_prop in c + and not has_default_column_prop_value(column_prop, c[column_prop]) # type: ignore[literal-required] and (include_incomplete or is_complete_column(c)) ] diff --git a/dlt/extract/extractors.py b/dlt/extract/extractors.py index b4afc5b1f8..bf4879ea4a 100644 --- a/dlt/extract/extractors.py +++ b/dlt/extract/extractors.py @@ -176,6 +176,7 @@ def _compute_and_update_table( computed_table["x-normalizer"] = {"evolve-columns-once": True} # type: ignore[typeddict-unknown-key] existing_table = self.schema._schema_tables.get(table_name, None) if existing_table: + # TODO: revise this. computed table should overwrite certain hints (ie. primary and merge keys) completely diff_table = utils.diff_table(existing_table, computed_table) else: diff_table = computed_table diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 75ad02e3fe..3f17fd64f4 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -392,7 +392,9 @@ def _merge_key(hint: TColumnProp, keys: TColumnNames, partial: TPartialTableSche keys = [keys] for key in keys: if key in partial["columns"]: - merge_column(partial["columns"][key], {hint: True}) # type: ignore + # set nullable to False if not set + nullable = partial["columns"][key].get("nullable", False) + merge_column(partial["columns"][key], {hint: True, "nullable": nullable}) # type: ignore else: partial["columns"][key] = new_column(key, nullable=False) partial["columns"][key][hint] = True diff --git a/tests/common/schema/test_merges.py b/tests/common/schema/test_merges.py index fe9e4b1476..1159e1a126 100644 --- a/tests/common/schema/test_merges.py +++ b/tests/common/schema/test_merges.py @@ -1,3 +1,4 @@ +from typing import Any import pytest from copy import copy, deepcopy @@ -15,40 +16,82 @@ "foreign_key": True, "data_type": "text", "name": "test", - "x-special": True, + "x-special": "value", "x-special-int": 100, "nullable": False, + "x-special-bool-true": True, "x-special-bool": False, "prop": None, } -COL_1_HINTS_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key] +COL_1_HINTS_NO_DEFAULTS: TColumnSchema = { # type: ignore[typeddict-unknown-key] "foreign_key": True, "data_type": "text", "name": "test", - "x-special": True, + "x-special": "value", "x-special-int": 100, "nullable": False, - "x-special-bool": False, + "x-special-bool-true": True, } COL_2_HINTS: TColumnSchema = {"nullable": True, "name": "test_2", "primary_key": False} -def test_check_column_defaults() -> None: - assert utils.has_default_column_hint_value("data_type", "text") is False - assert utils.has_default_column_hint_value("name", 123) is False - assert utils.has_default_column_hint_value("nullable", True) is True - assert utils.has_default_column_hint_value("nullable", False) is False - assert utils.has_default_column_hint_value("x-special", False) is False - assert utils.has_default_column_hint_value("unique", False) is True - assert utils.has_default_column_hint_value("unique", True) is False +@pytest.mark.parametrize( + "prop,value,is_default", + ( + ("data_type", "text", False), + ("data_type", None, True), + ("name", "xyz", False), + ("name", None, True), + ("nullable", True, True), + ("nullable", False, False), + ("nullable", None, True), + ("x-special", False, True), + ("x-special", True, False), + ("x-special", None, True), + ("unique", False, True), + ("unique", True, False), + ("dedup_sort", "asc", False), + ("dedup_sort", None, True), + ("x-active-record-timestamp", None, False), + ("x-active-record-timestamp", "2100-01-01", False), + ), +) +def test_check_column_with_props(prop: str, value: Any, is_default: bool) -> None: + # check default + assert utils.has_default_column_prop_value(prop, value) is is_default + # check if column with prop is found + if prop == "name" and is_default: + # do not check name not present + return + column: TColumnSchema = {"name": "column_a"} + column[prop] = value # type: ignore[literal-required] + table = utils.new_table("test", columns=[column]) + expected_columns = [column["name"]] if not is_default else [] + expected_column = column["name"] if not is_default else None + assert ( + utils.get_columns_names_with_prop(table, prop, include_incomplete=True) == expected_columns + ) + assert ( + utils.get_first_column_name_with_prop(table, prop, include_incomplete=True) + == expected_column + ) + assert utils.has_column_with_prop(table, prop, include_incomplete=True) is not is_default + # if data_type is set, column is complete + if prop == "data_type" and not is_default: + assert ( + utils.get_columns_names_with_prop(table, prop, include_incomplete=False) + == expected_columns + ) + else: + assert utils.get_columns_names_with_prop(table, prop, include_incomplete=False) == [] def test_column_remove_defaults() -> None: clean = utils.remove_column_defaults(copy(COL_1_HINTS)) # mind that nullable default is False and Nones will be removed - assert clean == COL_1_HINTS_DEFAULTS + assert clean == COL_1_HINTS_NO_DEFAULTS # check nullable True assert utils.remove_column_defaults(copy(COL_2_HINTS)) == {"name": "test_2"} @@ -59,9 +102,11 @@ def test_column_add_defaults() -> None: assert full["unique"] is False # remove defaults from full clean = utils.remove_column_defaults(copy(full)) - assert clean == COL_1_HINTS_DEFAULTS + assert clean == COL_1_HINTS_NO_DEFAULTS # prop is None and will be removed del full["prop"] # type: ignore[typeddict-item] + # same for x-special-bool + del full["x-special-bool"] # type: ignore[typeddict-item] assert utils.add_column_defaults(copy(clean)) == full # test incomplete @@ -140,9 +185,10 @@ def test_merge_columns() -> None: "cluster": False, "foreign_key": True, "data_type": "text", - "x-special": True, + "x-special": "value", "x-special-int": 100, "x-special-bool": False, + "x-special-bool-true": True, "prop": None, } @@ -154,9 +200,10 @@ def test_merge_columns() -> None: "cluster": False, "foreign_key": True, "data_type": "text", - "x-special": True, + "x-special": "value", "x-special-int": 100, "x-special-bool": False, + "x-special-bool-true": True, "prop": None, "primary_key": False, } diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index 5dd3d6c3ca..308b65bd37 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -9,6 +9,8 @@ from dlt.common.exceptions import DictValidationException, PipelineStateNotAvailable from dlt.common.pipeline import StateInjectableContext, source_state from dlt.common.schema import Schema +from dlt.common.schema.typing import TColumnProp, TColumnSchema +from dlt.common.schema import utils from dlt.common.typing import TDataItems from dlt.extract import DltResource, DltSource, Incremental @@ -1314,6 +1316,7 @@ def empty_gen(): assert empty_r.compute_table_schema()["columns"]["tags"] == { "data_type": "complex", "name": "tags", + "nullable": False, # NOT NULL because `tags` do not define it "primary_key": True, "merge_key": True, } @@ -1436,6 +1439,69 @@ def empty_gen(): ) +@pytest.mark.parametrize("key_prop", ("primary_key", "merge_key")) +def test_apply_hints_keys(key_prop: TColumnProp) -> None: + def empty_gen(): + yield [1, 2, 3] + + key_columns = ["id_1", "id_2"] + + empty = DltResource.from_data(empty_gen) + # apply compound key + empty.apply_hints(**{key_prop: key_columns}) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns + # nullable is false + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns + + # apply new key + key_columns_2 = ["id_1", "id_3"] + empty.apply_hints(**{key_prop: key_columns_2}) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns_2 + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns_2 + + # if column is present for a key, it get merged and nullable should be preserved + id_2_col: TColumnSchema = { + "name": "id_2", + "data_type": "bigint", + } + + empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert set(actual_keys) == set(key_columns) + # nullable not set in id_2_col so NOT NULL is set + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert set(actual_keys) == set(key_columns) + + id_2_col["nullable"] = True + empty.apply_hints(**{key_prop: key_columns}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert set(actual_keys) == set(key_columns) + # id_2 set to NULL + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert set(actual_keys) == {"id_1"} + + # apply key via schema + key_columns_3 = ["id_2", "id_1", "id_3"] + id_2_col[key_prop] = True + + empty = DltResource.from_data(empty_gen) + empty.apply_hints(**{key_prop: key_columns_2}, columns=[id_2_col]) # type: ignore + table = empty.compute_table_schema() + # all 3 columns have the compound key. we do not prevent setting keys via schema + actual_keys = utils.get_columns_names_with_prop(table, key_prop, include_incomplete=True) + assert actual_keys == key_columns_3 + actual_keys = utils.get_columns_names_with_prop(table, "nullable", include_incomplete=True) + assert actual_keys == key_columns_2 + + def test_resource_no_template() -> None: empty = DltResource.from_data([1, 2, 3], name="table") assert empty.write_disposition == "append" diff --git a/tests/load/pipeline/test_merge_disposition.py b/tests/load/pipeline/test_merge_disposition.py index 14840896df..f4e039ee81 100644 --- a/tests/load/pipeline/test_merge_disposition.py +++ b/tests/load/pipeline/test_merge_disposition.py @@ -807,6 +807,7 @@ def r(): info = p.run(r(), loader_file_format=destination_config.file_format) +@pytest.mark.essential @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, supports_merge=True), @@ -819,7 +820,10 @@ def test_dedup_sort_hint(destination_config: DestinationTestConfiguration) -> No name=table_name, write_disposition="merge", primary_key="id", # sort hints only have effect when a primary key is provided - columns={"sequence": {"dedup_sort": "desc"}}, + columns={ + "sequence": {"dedup_sort": "desc", "nullable": False}, + "val": {"dedup_sort": None}, + }, ) def data_resource(data): yield data @@ -847,7 +851,7 @@ def data_resource(data): assert sorted(observed, key=lambda d: d["id"]) == expected # now test "asc" sorting - data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc"}}) + data_resource.apply_hints(columns={"sequence": {"dedup_sort": "asc", "nullable": False}}) info = p.run(data_resource(data), loader_file_format=destination_config.file_format) assert_load_info(info) @@ -866,7 +870,7 @@ def data_resource(data): table_name = "test_dedup_sort_hint_complex" data_resource.apply_hints( table_name=table_name, - columns={"sequence": {"dedup_sort": "desc"}}, + columns={"sequence": {"dedup_sort": "desc", "nullable": False}}, ) # three records with same primary key @@ -890,7 +894,10 @@ def data_resource(data): table_name = "test_dedup_sort_hint_with_hard_delete" data_resource.apply_hints( table_name=table_name, - columns={"sequence": {"dedup_sort": "desc"}, "deleted": {"hard_delete": True}}, + columns={ + "sequence": {"dedup_sort": "desc", "nullable": False}, + "deleted": {"hard_delete": True}, + }, ) # three records with same primary key