From 48503c9e7e22dce969940aac92d759e69673e53b Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Fri, 21 Apr 2023 10:57:19 +0200 Subject: [PATCH 01/10] Add edge attribute for tests descendants injected for build --- core/dbt/compilation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index e5503af462e..6ec381f5b3f 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -474,7 +474,7 @@ def add_test_edges(self, linker: Linker, manifest: Manifest) -> None: # is a subset of all upstream nodes of the current node, # add an edge from the upstream test to the current node. if test_depends_on.issubset(upstream_nodes): - linker.graph.add_edge(upstream_test, node_id) + linker.graph.add_edge(upstream_test, node_id, edge_type="parent_test") def compile(self, manifest: Manifest, write=True, add_test_edges=False) -> Graph: self.initialize() From 2c58449928b41191ac8f324267d9b8c226d877c3 Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Fri, 21 Apr 2023 11:00:41 +0200 Subject: [PATCH 02/10] Ignore injected tests children for descendants selection --- core/dbt/graph/graph.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index 29f24cae734..cd37480b53b 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -33,11 +33,18 @@ def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[Uniq for _, child in nx.bfs_edges(self.graph, node, reverse=True, depth_limit=max_depth) } - def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]: + def descendants( + self, node: UniqueId, max_depth: Optional[int] = None, include_tests_children: bool = False + ) -> Set[UniqueId]: """Returns all nodes reachable from `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") - return {child for _, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth)} + return { + child + for parent, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth) + if include_tests_children + or self.graph[parent][child].get("edge_type") != "parent_test" + } def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]: ancestors_for = self.select_children(selected) | selected From bddcdef1188141375aac319c191b5042b87ce374 Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Fri, 21 Apr 2023 17:07:31 +0200 Subject: [PATCH 03/10] Need to filter the graph prior searching through edges... It didn't work when applying the test after the fact :shrug: --- core/dbt/graph/graph.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index cd37480b53b..dcd008e3c8e 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -39,12 +39,17 @@ def descendants( """Returns all nodes reachable from `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") - return { - child - for parent, child in nx.bfs_edges(self.graph, node, depth_limit=max_depth) - if include_tests_children - or self.graph[parent][child].get("edge_type") != "parent_test" - } + filtered_graph = nx.restricted_view( + self.graph, + nodes=[], + edges=( + (a, b) + for a, b in self.graph.edges + if not include_tests_children + and self.graph[a][b].get("edge_type") == "parent_test" + ), + ) + return {child for _, child in nx.bfs_edges(filtered_graph, node, depth_limit=max_depth)} def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]: ancestors_for = self.select_children(selected) | selected From ca2959fe721fdc73151e6ede6f227fa4018b0d52 Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Fri, 21 Apr 2023 17:24:56 +0200 Subject: [PATCH 04/10] Add changelog entry --- .changes/unreleased/Fixes-20230421-172428.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20230421-172428.yaml diff --git a/.changes/unreleased/Fixes-20230421-172428.yaml b/.changes/unreleased/Fixes-20230421-172428.yaml new file mode 100644 index 00000000000..82c489918b1 --- /dev/null +++ b/.changes/unreleased/Fixes-20230421-172428.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: dbt build selection of tests' descendants +time: 2023-04-21T17:24:28.335866975+02:00 +custom: + Author: b-luu + Issue: "7289" From 7e8e334fc4619c997834beeb973374f73d27f577 Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Sat, 22 Apr 2023 20:49:10 +0200 Subject: [PATCH 05/10] Add the matching test case --- tests/functional/build/test_build.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/functional/build/test_build.py b/tests/functional/build/test_build.py index eb9529be102..35aaa238497 100644 --- a/tests/functional/build/test_build.py +++ b/tests/functional/build/test_build.py @@ -196,3 +196,18 @@ def test_interdependent_models_fail(self, project): actual = [str(r.status) for r in results] expected = ["error"] * 4 + ["skipped"] * 7 + ["pass"] * 2 + ["success"] * 3 assert sorted(actual) == sorted(expected) + + +class TestDownstreamSelection: + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": models_simple_blocking__model_a_sql, + "model_b.sql": models_simple_blocking__model_b_sql, + "test.yml": models_simple_blocking__test_yml, + } + + def test_downstream_selection(self, project): + """Ensure that selecting test+ does not select model_a's other children""" + results = run_dbt(["build", "--select", "model_a not_null_model_a_id+"], expect_pass=True) + assert len(results) == 2 From baf82676cf6e03b2f05abad4aa21cd15cdabd6c9 Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Mon, 24 Apr 2023 07:45:33 +0200 Subject: [PATCH 06/10] Factorize parent_tests filtering and also use it for ancestors --- core/dbt/graph/graph.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index dcd008e3c8e..d5d26f30c3a 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -28,9 +28,10 @@ def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[Uniq """Returns all nodes having a path to `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") + filtered_graph = self.exclude_edge_type("parent_test") return { child - for _, child in nx.bfs_edges(self.graph, node, reverse=True, depth_limit=max_depth) + for _, child in nx.bfs_edges(filtered_graph, node, reverse=True, depth_limit=max_depth) } def descendants( @@ -39,17 +40,19 @@ def descendants( """Returns all nodes reachable from `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!") - filtered_graph = nx.restricted_view( + filtered_graph = self.exclude_edge_type("parent_test") + return {child for _, child in nx.bfs_edges(filtered_graph, node, depth_limit=max_depth)} + + def exclude_edge_type(self, edge_type_to_exclude): + return nx.restricted_view( self.graph, nodes=[], edges=( (a, b) for a, b in self.graph.edges - if not include_tests_children - and self.graph[a][b].get("edge_type") == "parent_test" + if self.graph[a][b].get("edge_type") == edge_type_to_exclude ), ) - return {child for _, child in nx.bfs_edges(filtered_graph, node, depth_limit=max_depth)} def select_childrens_parents(self, selected: Set[UniqueId]) -> Set[UniqueId]: ancestors_for = self.select_children(selected) | selected From 956b26890146badfefb6a26e5a9e082202d1a5ee Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Mon, 24 Apr 2023 07:49:14 +0200 Subject: [PATCH 07/10] Add the matching test --- tests/functional/build/fixtures.py | 21 +++++++++++++++++++++ tests/functional/build/test_build.py | 17 +++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/tests/functional/build/fixtures.py b/tests/functional/build/fixtures.py index 7c4d93e6186..e6f8dd4f0b3 100644 --- a/tests/functional/build/fixtures.py +++ b/tests/functional/build/fixtures.py @@ -206,6 +206,27 @@ - not_null """ +models_triple_blocking__test_yml = """ +version: 2 + +models: + - name: model_a + columns: + - name: id + tests: + - not_null + - name: model_b + columns: + - name: id + tests: + - not_null + - name: model_c + columns: + - name: id + tests: + - not_null +""" + models_interdependent__model_a_sql = """ select 1 as id """ diff --git a/tests/functional/build/test_build.py b/tests/functional/build/test_build.py index 35aaa238497..fb909d69f4b 100644 --- a/tests/functional/build/test_build.py +++ b/tests/functional/build/test_build.py @@ -18,6 +18,7 @@ models_simple_blocking__model_a_sql, models_simple_blocking__model_b_sql, models_simple_blocking__test_yml, + models_triple_blocking__test_yml, models_interdependent__test_yml, models_interdependent__model_a_sql, models_interdependent__model_b_sql, @@ -211,3 +212,19 @@ def test_downstream_selection(self, project): """Ensure that selecting test+ does not select model_a's other children""" results = run_dbt(["build", "--select", "model_a not_null_model_a_id+"], expect_pass=True) assert len(results) == 2 + + +class TestLimitedUpstreamSelection: + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": models_interdependent__model_a_sql, + "model_b.sql": models_interdependent__model_b_sql, + "model_c.sql": models_interdependent__model_c_sql, + "test.yml": models_triple_blocking__test_yml, + } + + def test_limited_upstream_selection(self, project): + """Ensure that selecting 1+model_c only selects up to model_b (+ tests of both)""" + results = run_dbt(["build", "--select", "1+model_c"], expect_pass=True) + assert len(results) == 4 From a97072e0e91d90421507fedd2583fe4f96d66ebc Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Mon, 24 Apr 2023 14:02:58 +0200 Subject: [PATCH 08/10] Fix failing false positive tests: These are now fixed: we no longer run models downstream of tests if only the tests were selected, not the parent model... --- tests/functional/defer_state/test_run_results_state.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/functional/defer_state/test_run_results_state.py b/tests/functional/defer_state/test_run_results_state.py index aa1dc549272..217cbe43dff 100644 --- a/tests/functional/defer_state/test_run_results_state.py +++ b/tests/functional/defer_state/test_run_results_state.py @@ -216,9 +216,9 @@ def test_build_run_results_state(self, project): results = run_dbt( ["build", "--select", "result:fail+", "--state", "./state"], expect_pass=False ) - assert len(results) == 2 + assert len(results) == 1 nodes = set([elem.node.name for elem in results]) - assert nodes == {"table_model", "unique_view_model_id"} + assert nodes == {"unique_view_model_id"} results = run_dbt(["ls", "--select", "result:fail+", "--state", "./state"]) assert len(results) == 1 @@ -483,12 +483,11 @@ def test_concurrent_selectors_build_run_results_state(self, project): ], expect_pass=False, ) - assert len(results) == 5 + assert len(results) == 4 nodes = set([elem.node.name for elem in results]) assert nodes == { "error_model", "downstream_of_error_model", "table_model_modified_example", - "table_model", "unique_view_model_id", } From 799f09c74b939a36ecaaba260f60edef0ac10afc Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Tue, 25 Apr 2023 11:53:42 +0200 Subject: [PATCH 09/10] Fix the remaining test so it also matches the dbt ls command selection. --- tests/functional/defer_state/test_run_results_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional/defer_state/test_run_results_state.py b/tests/functional/defer_state/test_run_results_state.py index 217cbe43dff..69dc77a1dd3 100644 --- a/tests/functional/defer_state/test_run_results_state.py +++ b/tests/functional/defer_state/test_run_results_state.py @@ -240,9 +240,9 @@ def test_build_run_results_state(self, project): results = run_dbt( ["build", "--select", "result:warn+", "--state", "./state"], expect_pass=True ) - assert len(results) == 2 # includes table_model to be run + assert len(results) == 1 nodes = set([elem.node.name for elem in results]) - assert nodes == {"table_model", "unique_view_model_id"} + assert nodes == {"unique_view_model_id"} results = run_dbt(["ls", "--select", "result:warn+", "--state", "./state"]) assert len(results) == 1 From 65af13b57c0a2acc14a4bb5467ed8908ee813c7c Mon Sep 17 00:00:00 2001 From: Brice Luu Date: Tue, 25 Apr 2023 11:55:22 +0200 Subject: [PATCH 10/10] Remove no longer used feature flag to activate tests children filtering or not --- core/dbt/graph/graph.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index d5d26f30c3a..69a2f21258a 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -34,9 +34,7 @@ def ancestors(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[Uniq for _, child in nx.bfs_edges(filtered_graph, node, reverse=True, depth_limit=max_depth) } - def descendants( - self, node: UniqueId, max_depth: Optional[int] = None, include_tests_children: bool = False - ) -> Set[UniqueId]: + def descendants(self, node: UniqueId, max_depth: Optional[int] = None) -> Set[UniqueId]: """Returns all nodes reachable from `node` in `graph`""" if not self.graph.has_node(node): raise DbtInternalError(f"Node {node} not found in the graph!")