diff --git a/RELEASE.md b/RELEASE.md index bd95ac471f..e974dd5bcc 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -2,6 +2,7 @@ ## Major features and improvements * Added `KedroDataCatalog.filter()` to filter datasets by name and type. +* Added `Pipeline.grouped_nodes_by_namespace` property which returns a dictionary of nodes grouped by namespace, intended to be used by plugins to facilitate deployment of namespaced nodes together. ## Bug fixes and other changes * Updated `_LazyDataset` representation when printing `KedroDataCatalog`. diff --git a/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md b/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md index 31b5ba92fe..cf32ffd647 100644 --- a/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md +++ b/docs/source/notebooks_and_ipython/notebook-example/add_kedro_to_a_notebook.md @@ -152,7 +152,7 @@ When writing exploratory code, it’s tempting to hard code values to save time, X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=3) ``` -[Good software engineering practice](https://towardsdatascience.com/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml. +[Good software engineering practice](https://medium.com/towards-data-science/five-software-engineering-principles-for-collaborative-data-science-ab26667a311) suggests that we extract *‘magic numbers’* into named constants. These could be defined at the top of a file or in a utility file, in a format such as yaml. ```yaml diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index 826acd1b13..b434bae49e 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -369,6 +369,35 @@ def grouped_nodes(self) -> list[list[Node]]: return [list(group) for group in self._toposorted_groups] + @property + def grouped_nodes_by_namespace(self) -> dict[str, dict[str, Any]]: + """Return a dictionary of the pipeline nodes grouped by namespace with + information about the nodes, their type, and dependencies. The structure of the dictionary is: + {'node_name/namespace_name' : {'name': 'node_name/namespace_name','type': 'namespace' or 'node','nodes': [list of nodes],'dependencies': [list of dependencies]}} + This property is intended to be used by deployment plugins to group nodes by namespace. + + """ + grouped_nodes: dict[str, dict[str, Any]] = defaultdict(dict) + for node in self.nodes: + key = node.namespace or node.name + if key not in grouped_nodes: + grouped_nodes[key] = {} + grouped_nodes[key]["name"] = key + grouped_nodes[key]["type"] = "namespace" if node.namespace else "node" + grouped_nodes[key]["nodes"] = [] + grouped_nodes[key]["dependencies"] = set() + grouped_nodes[key]["nodes"].append(node) + dependencies = set() + for parent in self.node_dependencies[node]: + if parent.namespace and parent.namespace != key: + dependencies.add(parent.namespace) + elif parent.namespace and parent.namespace == key: + continue + else: + dependencies.add(parent.name) + grouped_nodes[key]["dependencies"].update(dependencies) + return grouped_nodes + def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified nodes by name. diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 746ea4794a..1315e75938 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -376,6 +376,118 @@ def test_node_dependencies(self, complex_pipeline): } assert actual == expected + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ("pipeline_with_namespace_simple", ["namespace_1", "namespace_2"]), + ( + "pipeline_with_namespace_partial", + ["namespace_1", "node_3", "namespace_2", "node_6"], + ), + ], + ) + def test_node_grouping_by_namespace_name_type( + self, request, pipeline_name, expected + ): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'name' and 'type' keys in the dictionary. + """ + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + assert set(grouped.keys()) == set(expected) + for key in expected: + assert grouped[key]["name"] == key + assert key.startswith(grouped[key]["type"]) + + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ( + "pipeline_with_namespace_simple", + { + "namespace_1": [ + "namespace_1.node_1", + "namespace_1.node_2", + "namespace_1.node_3", + ], + "namespace_2": [ + "namespace_2.node_4", + "namespace_2.node_5", + "namespace_2.node_6", + ], + }, + ), + ( + "pipeline_with_namespace_partial", + { + "namespace_1": ["namespace_1.node_1", "namespace_1.node_2"], + "node_3": ["node_3"], + "namespace_2": ["namespace_2.node_4", "namespace_2.node_5"], + "node_6": ["node_6"], + }, + ), + ], + ) + def test_node_grouping_by_namespace_nodes(self, request, pipeline_name, expected): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'nodes' key in the dictionary which should be a list of nodes. + """ + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + for key, value in grouped.items(): + names = [node.name for node in value["nodes"]] + assert set(names) == set(expected[key]) + + @pytest.mark.parametrize( + "pipeline_name, expected", + [ + ( + "pipeline_with_namespace_simple", + {"namespace_1": set(), "namespace_2": {"namespace_1"}}, + ), + ( + "pipeline_with_namespace_partial", + { + "namespace_1": set(), + "node_3": {"namespace_1"}, + "namespace_2": {"node_3"}, + "node_6": {"namespace_2"}, + }, + ), + ], + ) + def test_node_grouping_by_namespace_dependencies( + self, request, pipeline_name, expected + ): + """Test for pipeline.grouped_nodes_by_namespace which returns a dictionary with the following structure: + { + 'node_name/namespace_name' : { + 'name': 'node_name/namespace_name', + 'type': 'namespace' or 'node', + 'nodes': [list of nodes], + 'dependencies': [list of dependencies]} + } + This test checks for the 'dependencies' in the dictionary which is a list of nodes/namespaces the group depends on. + """ + p = request.getfixturevalue(pipeline_name) + grouped = p.grouped_nodes_by_namespace + for key, value in grouped.items(): + assert set(value["dependencies"]) == set(expected[key]) + @pytest.fixture def pipeline_with_circle(): @@ -758,6 +870,34 @@ def pipeline_with_namespaces(): ) +@pytest.fixture +def pipeline_with_namespace_simple(): + return modular_pipeline( + [ + node(identity, "A", "B", name="node_1", namespace="namespace_1"), + node(identity, "B", "C", name="node_2", namespace="namespace_1"), + node(identity, "C", "D", name="node_3", namespace="namespace_1"), + node(identity, "D", "E", name="node_4", namespace="namespace_2"), + node(identity, "E", "F", name="node_5", namespace="namespace_2"), + node(identity, "F", "G", name="node_6", namespace="namespace_2"), + ] + ) + + +@pytest.fixture +def pipeline_with_namespace_partial(): + return modular_pipeline( + [ + node(identity, "A", "B", name="node_1", namespace="namespace_1"), + node(identity, "B", "C", name="node_2", namespace="namespace_1"), + node(identity, "C", "D", name="node_3"), + node(identity, "D", "E", name="node_4", namespace="namespace_2"), + node(identity, "E", "F", name="node_5", namespace="namespace_2"), + node(identity, "F", "G", name="node_6"), + ] + ) + + class TestPipelineFilter: def test_no_filters(self, complex_pipeline): filtered_pipeline = complex_pipeline.filter()