From 2ff33b2bf04b7c1f505ef420478d31a1a7c7d257 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 17 Jan 2025 15:08:04 +0000 Subject: [PATCH 1/5] Add deployment related attributes Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 40 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 826acd1b13..8f14bd0c14 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -369,6 +369,46 @@ def grouped_nodes(self) -> list[list[Node]]: return [list(group) for group in self._toposorted_groups] + @property + def grouped_by_namespace(self): + """Return a dictionary of the pipeline nodes grouped by namespace. + + Returns: + The pipeline nodes grouped by namespace. + """ + nodes_by_namespace = defaultdict(list) + for node in self.nodes: + if node.namespace: + nodes_by_namespace[node.namespace].append(node) + else: + nodes_by_namespace[node.name].append(node) + return nodes_by_namespace + + @property + def node_dependencies_by_namespace(self): + """Return a dictionary of the pipeline nodes dependencies grouped by namespace. + + Returns: + The pipeline nodes dependencies grouped by namespace. + """ + node_dependencies_by_namespace = defaultdict(dict) + for node in self.nodes: + key = node.namespace if node.namespace else node.name + for parent in self.node_dependencies[node]: + if key not in node_dependencies_by_namespace: + node_dependencies_by_namespace[key] = [] + if parent.namespace and parent.namespace != key: + node_dependencies_by_namespace[key].append(parent.namespace) + elif parent.namespace and parent.namespace == key: + continue + else: + node_dependencies_by_namespace[key].append(parent.name) + + node_dependencies_by_namespace = { + key: set(value) for key, value in node_dependencies_by_namespace.items() + } + return node_dependencies_by_namespace + def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified nodes by name. From ef4d664e4c1946363a3883bb528acc68e0a4dd8c Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Wed, 29 Jan 2025 13:49:06 +0000 Subject: [PATCH 2/5] Update with feedback Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 46 +++++++++++++------------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 8f14bd0c14..42ce40b63e 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -370,44 +370,28 @@ def grouped_nodes(self) -> list[list[Node]]: return [list(group) for group in self._toposorted_groups] @property - def grouped_by_namespace(self): - """Return a dictionary of the pipeline nodes grouped by namespace. - - Returns: - The pipeline nodes grouped by namespace. - """ - nodes_by_namespace = defaultdict(list) - for node in self.nodes: - if node.namespace: - nodes_by_namespace[node.namespace].append(node) - else: - nodes_by_namespace[node.name].append(node) - return nodes_by_namespace - - @property - def node_dependencies_by_namespace(self): - """Return a dictionary of the pipeline nodes dependencies grouped by namespace. - - Returns: - The pipeline nodes dependencies grouped by namespace. - """ - node_dependencies_by_namespace = defaultdict(dict) + def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: + grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: - key = node.namespace if node.namespace else node.name + key = node.namespace or node.name + if key not in grouped_nodes: + grouped_nodes[key] = {} + grouped_nodes[key]["name"] = key + grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" + grouped_nodes[key]["nodes"] = [*grouped_nodes[key].get("nodes", []), node] + deps = set() for parent in self.node_dependencies[node]: - if key not in node_dependencies_by_namespace: - node_dependencies_by_namespace[key] = [] if parent.namespace and parent.namespace != key: - node_dependencies_by_namespace[key].append(parent.namespace) + deps.add(parent.namespace) elif parent.namespace and parent.namespace == key: continue else: - node_dependencies_by_namespace[key].append(parent.name) + deps.add(parent.name) + grouped_nodes[key]["dependencies"] = ( + grouped_nodes[key].get("dependencies", set()) | deps + ) - node_dependencies_by_namespace = { - key: set(value) for key, value in node_dependencies_by_namespace.items() - } - return node_dependencies_by_namespace + return grouped_nodes def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified From a299ef36304a69bf753e81329332bbff6658ac85 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Fri, 31 Jan 2025 16:48:28 +0000 Subject: [PATCH 3/5] Minor formatting Signed-off-by: Ankita Katiyar --- kedro/pipeline/pipeline.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 42ce40b63e..ff032fbf69 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -371,6 +371,8 @@ def grouped_nodes(self) -> list[list[Node]]: @property def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: + """Return a dictionary of the pipeline nodes grouped by namespace with + information about the nodes, their type, and dependencies.""" grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) for node in self.nodes: key = node.namespace or node.name @@ -379,18 +381,17 @@ def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: grouped_nodes[key]["name"] = key grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" grouped_nodes[key]["nodes"] = [*grouped_nodes[key].get("nodes", []), node] - deps = set() + dependencies = set() for parent in self.node_dependencies[node]: if parent.namespace and parent.namespace != key: - deps.add(parent.namespace) + dependencies.add(parent.namespace) elif parent.namespace and parent.namespace == key: continue else: - deps.add(parent.name) + dependencies.add(parent.name) grouped_nodes[key]["dependencies"] = ( - grouped_nodes[key].get("dependencies", set()) | deps + grouped_nodes[key].get("dependencies", set()) | dependencies ) - return grouped_nodes def only_nodes(self, *node_names: str) -> Pipeline: From b79f1f55ef6e6b9c3b467d9ed100054a7d260bc9 Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 3 Feb 2025 15:45:48 +0000 Subject: [PATCH 4/5] Add test Signed-off-by: Ankita Katiyar --- tests/pipeline/test_pipeline.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 746ea4794a..13e079269e 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -376,6 +376,34 @@ def test_node_dependencies(self, complex_pipeline): } assert actual == expected + def test_node_grouping_by_namespace(self): + pipeline = modular_pipeline( + [ + node(identity, "A", "B", name="node1", namespace="name_1"), + node(identity, "B", "C", name="node2", namespace="name_1"), + node(identity, "C", "D", name="node3", namespace="name_2"), + node(identity, "D", "E", name="node4", namespace="name_2"), + node(identity, "E", "G", name="node5"), + ] + ) + grouped = pipeline.grouped_nodes_by_namespace + # Validate keys for namespace groups + for key in ["name_1", "name_2"]: + assert key in grouped + assert grouped[key]["name"] == key + assert grouped[key]["type"] == "namespace" + assert len(grouped[key]["nodes"]) == 2 + + # Validate dependencies for namespace groups + assert grouped["name_1"]["dependencies"] == set() + assert grouped["name_2"]["dependencies"] == {"name_1"} + + # Validate nodes for namespace groups + assert grouped["node5"]["type"] == "node" + assert grouped["node5"]["name"] == "node5" + assert len(grouped["node5"]["nodes"]) == 1 + assert grouped["node5"]["dependencies"] == {"name_2"} + @pytest.fixture def pipeline_with_circle(): From a8b1f45c200a74c52e9cb455b86e203bb38973cd Mon Sep 17 00:00:00 2001 From: Ankita Katiyar Date: Mon, 3 Feb 2025 15:51:51 +0000 Subject: [PATCH 5/5] Update test Signed-off-by: Ankita Katiyar --- tests/pipeline/test_pipeline.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 13e079269e..985874cc3b 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -384,6 +384,7 @@ def test_node_grouping_by_namespace(self): node(identity, "C", "D", name="node3", namespace="name_2"), node(identity, "D", "E", name="node4", namespace="name_2"), node(identity, "E", "G", name="node5"), + node(identity, "G", "H", name="node6"), ] ) grouped = pipeline.grouped_nodes_by_namespace @@ -403,6 +404,8 @@ def test_node_grouping_by_namespace(self): assert grouped["node5"]["name"] == "node5" assert len(grouped["node5"]["nodes"]) == 1 assert grouped["node5"]["dependencies"] == {"name_2"} + # Validate when node depends on node + assert grouped["node6"]["dependencies"] == {"node5"} @pytest.fixture